cesium-native  0.41.0
SharedAssetDepot.h
1 #pragma once
2 
3 #include <CesiumAsync/AsyncSystem.h>
4 #include <CesiumAsync/Future.h>
5 #include <CesiumAsync/IAssetAccessor.h>
6 #include <CesiumUtility/DoublyLinkedList.h>
7 #include <CesiumUtility/IDepotOwningAsset.h>
8 #include <CesiumUtility/IntrusivePointer.h>
9 #include <CesiumUtility/ReferenceCounted.h>
10 #include <CesiumUtility/Result.h>
11 
12 #include <cstddef>
13 #include <functional>
14 #include <memory>
15 #include <mutex>
16 #include <optional>
17 #include <string>
18 #include <unordered_map>
19 
20 namespace CesiumUtility {
21 template <typename T> class SharedAsset;
22 }
23 
24 namespace CesiumAsync {
25 
32 template <typename TAssetType, typename TAssetKey>
33 class CESIUMASYNC_API SharedAssetDepot
35  SharedAssetDepot<TAssetType, TAssetKey>>,
36  public CesiumUtility::IDepotOwningAsset<TAssetType> {
37 public:
50  int64_t inactiveAssetSizeLimitBytes = 16 * 1024 * 1024;
51 
52  using FactorySignature =
54  const AsyncSystem& asyncSystem,
55  const std::shared_ptr<IAssetAccessor>& pAssetAccessor,
56  const TAssetKey& key);
57 
58  SharedAssetDepot(std::function<FactorySignature> factory)
59  : _assets(),
60  _assetsByPointer(),
61  _deletionCandidates(),
62  _totalDeletionCandidateMemoryUsage(0),
63  _mutex(),
64  _factory(std::move(factory)),
65  _pKeepAlive(nullptr) {}
66 
67  virtual ~SharedAssetDepot() {
68  // Ideally, when the depot is destroyed, all the assets it owns would become
69  // independent assets. But this is extremely difficult to manage in a
70  // thread-safe manner.
71 
72  // Since we're in the destructor, we can be sure no one has a reference to
73  // this instance anymore. That means that no other thread can be executing
74  // `getOrCreate`, and no async asset creations are in progress.
75 
76  // However, if assets owned by this depot are still alive, then other
77  // threads can still be calling addReference / releaseReference on some of
78  // our assets even while we're running the depot's destructor. Which means
79  // that we can end up in `markDeletionCandidate` at the same time the
80  // destructor is running. And in fact it's possible for a `SharedAsset` with
81  // especially poor timing to call into a `SharedAssetDepot` just after it is
82  // destroyed.
83 
84  // To avoid this, we use the _pKeepAlive field to maintain an artificial
85  // reference to this depot whenever it owns live assets. This should keep
86  // this destructor from being called except when all of its assets are also
87  // in the _deletionCandidates list.
88 
89  CESIUM_ASSERT(this->_assets.size() == this->_deletionCandidates.size());
90  }
91 
103  const AsyncSystem& asyncSystem,
104  const std::shared_ptr<IAssetAccessor>& pAssetAccessor,
105  const TAssetKey& assetKey) {
106  // We need to take care here to avoid two assets starting to load before the
107  // first asset has added an entry and set its maybePendingAsset field.
108  std::unique_lock lock(this->_mutex);
109 
110  auto existingIt = this->_assets.find(assetKey);
111  if (existingIt != this->_assets.end()) {
112  // We've already loaded (or are loading) an asset with this ID - we can
113  // just use that.
114  const AssetEntry& entry = *existingIt->second;
115  if (entry.maybePendingAsset) {
116  // Asset is currently loading.
117  return *entry.maybePendingAsset;
118  } else {
119  return asyncSystem.createResolvedFuture(entry.toResultUnderLock())
120  .share();
121  }
122  }
123 
124  // Calling the factory function while holding the mutex unnecessarily
125  // limits parallelism. It can even lead to a bug in the scenario where the
126  // `thenInWorkerThread` continuation is invoked immediately in the current
127  // thread, before `thenInWorkerThread` itself returns. That would result
128  // in an attempt to lock the mutex recursively, which is not allowed.
129 
130  // So we jump through some hoops here to publish "this thread is working
131  // on it", then unlock the mutex, and _then_ actually call the factory
132  // function.
133  Promise<void> promise = asyncSystem.createPromise<void>();
134 
135  // We haven't loaded or started to load this asset yet.
136  // Let's do that now.
138  pDepot = this;
140  new AssetEntry(assetKey);
141 
142  auto future =
143  promise.getFuture()
144  .thenImmediately([pDepot, pEntry, asyncSystem, pAssetAccessor]() {
145  return pDepot->_factory(asyncSystem, pAssetAccessor, pEntry->key);
146  })
147  .catchImmediately([](std::exception&& e) {
148  return CesiumUtility::Result<
151  std::string("Error creating asset: ") + e.what()));
152  })
153  .thenInWorkerThread(
154  [pDepot, pEntry](
157  std::lock_guard lock(pDepot->_mutex);
158 
159  if (result.pValue) {
160  result.pValue->_pDepot = pDepot.get();
161  pDepot->_assetsByPointer[result.pValue.get()] =
162  pEntry.get();
163  }
164 
165  // Now that this asset is owned by the depot, we exclusively
166  // control its lifetime with a std::unique_ptr.
167  pEntry->pAsset =
168  std::unique_ptr<TAssetType>(result.pValue.get());
169  pEntry->errorsAndWarnings = std::move(result.errors);
170  pEntry->maybePendingAsset.reset();
171 
172  // The asset is initially live because we have an
173  // IntrusivePointer to it right here. So make sure the depot
174  // stays alive, too.
175  pDepot->_pKeepAlive = pDepot;
176 
177  return pEntry->toResultUnderLock();
178  });
179 
181  std::move(future).share();
182 
183  pEntry->maybePendingAsset = sharedFuture;
184 
185  auto [it, added] = this->_assets.emplace(assetKey, pEntry);
186 
187  // Should always be added successfully, because we checked above that the
188  // asset key doesn't exist in the map yet.
189  CESIUM_ASSERT(added);
190 
191  // Unlock the mutex and then call the factory function.
192  lock.unlock();
193  promise.resolve();
194 
195  return sharedFuture;
196  }
197 
202  size_t getAssetCount() const {
203  std::lock_guard lock(this->_mutex);
204  return this->_assets.size();
205  }
206 
211  size_t getActiveAssetCount() const {
212  std::lock_guard lock(this->_mutex);
213  return this->_assets.size() - this->_deletionCandidates.size();
214  }
215 
220  size_t getInactiveAssetCount() const {
221  std::lock_guard lock(this->_mutex);
222  return this->_deletionCandidates.size();
223  }
224 
230  std::lock_guard lock(this->_mutex);
231  return this->_totalDeletionCandidateMemoryUsage;
232  }
233 
234 private:
235  // Disable copy
236  void operator=(const SharedAssetDepot<TAssetType, TAssetKey>& other) = delete;
237 
246  void markDeletionCandidate(const TAssetType& asset, bool threadOwnsDepotLock)
247  override {
248  if (threadOwnsDepotLock) {
249  this->markDeletionCandidateUnderLock(asset);
250  } else {
251  std::lock_guard lock(this->_mutex);
252  this->markDeletionCandidateUnderLock(asset);
253  }
254  }
255 
256  void markDeletionCandidateUnderLock(const TAssetType& asset) {
257  auto it = this->_assetsByPointer.find(const_cast<TAssetType*>(&asset));
258  CESIUM_ASSERT(it != this->_assetsByPointer.end());
259  if (it == this->_assetsByPointer.end()) {
260  return;
261  }
262 
263  CESIUM_ASSERT(it->second != nullptr);
264 
265  AssetEntry& entry = *it->second;
266  entry.sizeInDeletionList = asset.getSizeBytes();
267  this->_totalDeletionCandidateMemoryUsage += entry.sizeInDeletionList;
268 
269  this->_deletionCandidates.insertAtTail(entry);
270 
271  if (this->_totalDeletionCandidateMemoryUsage >
272  this->inactiveAssetSizeLimitBytes) {
273  // Delete the deletion candidates until we're below the limit.
274  while (this->_deletionCandidates.size() > 0 &&
275  this->_totalDeletionCandidateMemoryUsage >
276  this->inactiveAssetSizeLimitBytes) {
277  AssetEntry* pOldEntry = this->_deletionCandidates.head();
278  this->_deletionCandidates.remove(*pOldEntry);
279 
280  this->_totalDeletionCandidateMemoryUsage -=
281  pOldEntry->sizeInDeletionList;
282 
283  CESIUM_ASSERT(
284  pOldEntry->pAsset == nullptr ||
285  pOldEntry->pAsset->_referenceCount == 0);
286 
287  if (pOldEntry->pAsset) {
288  this->_assetsByPointer.erase(pOldEntry->pAsset.get());
289  }
290 
291  // This will actually delete the asset.
292  this->_assets.erase(pOldEntry->key);
293  }
294  }
295 
296  // If this depot is not managing any live assets, then we no longer need to
297  // keep it alive.
298  if (this->_assets.size() == this->_deletionCandidates.size()) {
299  this->_pKeepAlive.reset();
300  }
301  }
302 
311  void unmarkDeletionCandidate(
312  const TAssetType& asset,
313  bool threadOwnsDepotLock) override {
314  if (threadOwnsDepotLock) {
315  this->unmarkDeletionCandidateUnderLock(asset);
316  } else {
317  std::lock_guard lock(this->_mutex);
318  this->unmarkDeletionCandidateUnderLock(asset);
319  }
320  }
321 
322  void unmarkDeletionCandidateUnderLock(const TAssetType& asset) {
323  auto it = this->_assetsByPointer.find(const_cast<TAssetType*>(&asset));
324  CESIUM_ASSERT(it != this->_assetsByPointer.end());
325  if (it == this->_assetsByPointer.end()) {
326  return;
327  }
328 
329  CESIUM_ASSERT(it->second != nullptr);
330 
331  AssetEntry& entry = *it->second;
332  bool isFound = this->_deletionCandidates.contains(entry);
333 
334  CESIUM_ASSERT(isFound);
335 
336  if (isFound) {
337  this->_totalDeletionCandidateMemoryUsage -= entry.sizeInDeletionList;
338  this->_deletionCandidates.remove(entry);
339  }
340 
341  // This depot is now managing at least one live asset, so keep it alive.
342  this->_pKeepAlive = this;
343  }
344 
349  struct AssetEntry
350  : public CesiumUtility::ReferenceCountedThreadSafe<AssetEntry> {
351  AssetEntry(TAssetKey&& key_)
352  : CesiumUtility::ReferenceCountedThreadSafe<AssetEntry>(),
353  key(std::move(key_)),
354  pAsset(),
355  maybePendingAsset(),
356  errorsAndWarnings(),
357  sizeInDeletionList(0),
358  deletionListPointers() {}
359 
360  AssetEntry(const TAssetKey& key_) : AssetEntry(TAssetKey(key_)) {}
361 
365  TAssetKey key;
366 
371  std::unique_ptr<TAssetType> pAsset;
372 
378  std::optional<SharedFuture<CesiumUtility::ResultPointer<TAssetType>>>
379  maybePendingAsset;
380 
386  CesiumUtility::ErrorList errorsAndWarnings;
387 
394  int64_t sizeInDeletionList;
395 
401 
402  CesiumUtility::ResultPointer<TAssetType> toResultUnderLock() const {
403  // This method is called while the calling thread already owns the depot
404  // mutex. So we must take care not to lock it again, which could happen if
405  // the asset is currently unreferenced and we naively create an
406  // IntrusivePointer for it.
407  pAsset->addReference(true);
409  pAsset->releaseReference(true);
410  return CesiumUtility::ResultPointer<TAssetType>(p, errorsAndWarnings);
411  }
412  };
413 
414  // Maps asset keys to AssetEntry instances. This collection owns the asset
415  // entries.
416  std::unordered_map<TAssetKey, CesiumUtility::IntrusivePointer<AssetEntry>>
417  _assets;
418 
419  // Maps asset pointers to AssetEntry instances. The values in this map refer
420  // to instances owned by the _assets map.
421  std::unordered_map<TAssetType*, AssetEntry*> _assetsByPointer;
422 
423  // List of assets that are being considered for deletion, in the order that
424  // they became unused.
426  _deletionCandidates;
427 
428  // The total amount of memory used by all assets in the _deletionCandidates
429  // list.
430  int64_t _totalDeletionCandidateMemoryUsage;
431 
432  // Mutex serializing access to _assets, _assetsByPointer, _deletionCandidates,
433  // and any AssetEntry owned by this depot.
434  mutable std::mutex _mutex;
435 
436  // The factory used to create new AssetType instances.
437  std::function<FactorySignature> _factory;
438 
439  // This instance keeps a reference to itself whenever it is managing active
440  // assets, preventing it from being destroyed even if all other references to
441  // it are dropped.
443  _pKeepAlive;
444 };
445 
446 } // namespace CesiumAsync
A system for managing asynchronous requests and tasks.
Definition: AsyncSystem.h:36
Promise< T > createPromise() const
Create a Promise that can be used at a later time to resolve or reject a Future.
Definition: AsyncSystem.h:93
Future< T > createResolvedFuture(T &&value) const
Creates a future that is already resolved.
Definition: AsyncSystem.h:260
A value that will be available in the future, as produced by AsyncSystem.
Definition: Future.h:29
A depot for SharedAsset instances, which are potentially shared between multiple objects.
size_t getActiveAssetCount() const
Gets the number of assets owned by this depot that are active, meaning that they are currently being ...
size_t getInactiveAssetCount() const
Gets the number of assets owned by this depot that are inactive, meaning that they are not currently ...
size_t getAssetCount() const
Returns the total number of distinct assets contained in this depot, including both active and inacti...
int64_t getInactiveAssetTotalSizeBytes() const
Gets the total bytes used by inactive (unused) assets owned by this depot.
SharedFuture< CesiumUtility::ResultPointer< TAssetType > > getOrCreate(const AsyncSystem &asyncSystem, const std::shared_ptr< IAssetAccessor > &pAssetAccessor, const TAssetKey &assetKey)
Gets an asset from the depot if it already exists, or creates it using the depot's factory if it does...
A value that will be available in the future, as produced by AsyncSystem. Unlike Future,...
Definition: SharedFuture.h:31
An interface representing the depot that owns a SharedAsset. This interface is an implementation deta...
A smart pointer that calls addReference and releaseReference on the controlled object.
T * get() const noexcept
Returns the internal pointer.
void reset()
Reset this pointer to nullptr.
A reference-counted base class, meant to be used with IntrusivePointer.
Classes that support asynchronous operations.
Utility classes for Cesium.
ReferenceCounted< T, true > ReferenceCountedThreadSafe
A reference-counted base class, meant to be used with IntrusivePointer. The reference count is thread...
The container to store the error and warning list when loading a tile or glTF content.
Definition: ErrorList.h:17
static ErrorList error(std::string errorMessage)
Creates an ErrorList containing a single error.
Holds the result of an operation. If the operation succeeds, it will provide a value....
Definition: Result.h:16