cesium-native  0.41.0
AsyncSystem.h
1 #pragma once
2 
3 #include "Future.h"
4 #include "Impl/ContinuationFutureType.h"
5 #include "Impl/RemoveFuture.h"
6 #include "Impl/WithTracing.h"
7 #include "Impl/cesium-async++.h"
8 #include "Library.h"
9 #include "Promise.h"
10 #include "ThreadPool.h"
11 
12 #include <CesiumUtility/Tracing.h>
13 
14 #include <memory>
15 #include <type_traits>
16 
17 namespace CesiumAsync {
18 class ITaskProcessor;
19 
20 class AsyncSystem;
21 
36 class CESIUMASYNC_API AsyncSystem final {
37 public:
44  AsyncSystem(const std::shared_ptr<ITaskProcessor>& pTaskProcessor) noexcept;
45 
65  template <typename T, typename Func> Future<T> createFuture(Func&& f) const {
66  std::shared_ptr<async::event_task<T>> pEvent =
67  std::make_shared<async::event_task<T>>();
68 
69  Promise<T> promise(this->_pSchedulers, pEvent);
70 
71  try {
72  f(promise);
73  } catch (...) {
74  promise.reject(std::current_exception());
75  }
76 
77  return Future<T>(this->_pSchedulers, pEvent->get_task());
78  }
79 
93  template <typename T> Promise<T> createPromise() const {
94  return Promise<T>(
95  this->_pSchedulers,
96  std::make_shared<async::event_task<T>>());
97  }
98 
114  template <typename Func>
115  CesiumImpl::ContinuationFutureType_t<Func, void>
116  runInWorkerThread(Func&& f) const {
117  static const char* tracingName = "waiting for worker thread";
118 
119  CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
120 
121  return CesiumImpl::ContinuationFutureType_t<Func, void>(
122  this->_pSchedulers,
123  async::spawn(
124  this->_pSchedulers->workerThread.immediate,
125  CesiumImpl::WithTracing<void>::end(
126  tracingName,
127  std::forward<Func>(f))));
128  }
129 
144  template <typename Func>
145  CesiumImpl::ContinuationFutureType_t<Func, void>
146  runInMainThread(Func&& f) const {
147  static const char* tracingName = "waiting for main thread";
148 
149  CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
150 
151  return CesiumImpl::ContinuationFutureType_t<Func, void>(
152  this->_pSchedulers,
153  async::spawn(
154  this->_pSchedulers->mainThread.immediate,
155  CesiumImpl::WithTracing<void>::end(
156  tracingName,
157  std::forward<Func>(f))));
158  }
159 
169  template <typename Func>
170  CesiumImpl::ContinuationFutureType_t<Func, void>
171  runInThreadPool(const ThreadPool& threadPool, Func&& f) const {
172  static const char* tracingName = "waiting for thread pool";
173 
174  CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
175 
176  return CesiumImpl::ContinuationFutureType_t<Func, void>(
177  this->_pSchedulers,
178  async::spawn(
179  threadPool._pScheduler->immediate,
180  CesiumImpl::WithTracing<void>::end(
181  tracingName,
182  std::forward<Func>(f))));
183  }
184 
193  template <typename T>
194  using AllValueType =
195  std::conditional_t<std::is_void_v<T>, void, std::vector<T>>;
196 
219  template <typename T>
220  Future<AllValueType<T>> all(std::vector<Future<T>>&& futures) const {
221  return this->all<T, Future<T>>(
222  std::forward<std::vector<Future<T>>>(futures));
223  }
224 
247  template <typename T>
248  Future<AllValueType<T>> all(std::vector<SharedFuture<T>>&& futures) const {
249  return this->all<T, SharedFuture<T>>(
250  std::forward<std::vector<SharedFuture<T>>>(futures));
251  }
252 
260  template <typename T> Future<T> createResolvedFuture(T&& value) const {
261  return Future<T>(
262  this->_pSchedulers,
263  async::make_task<T>(std::forward<T>(value)));
264  }
265 
272  return Future<void>(this->_pSchedulers, async::make_task());
273  }
274 
281 
293 
300  ThreadPool createThreadPool(int32_t numberOfThreads) const;
301 
307  bool operator==(const AsyncSystem& rhs) const noexcept;
308 
314  bool operator!=(const AsyncSystem& rhs) const noexcept;
315 
316 private:
317  // Common implementation of 'all' for both Future and SharedFuture.
318  template <typename T, typename TFutureType>
319  Future<AllValueType<T>> all(std::vector<TFutureType>&& futures) const {
320  using TTaskType = decltype(TFutureType::_task);
321  std::vector<TTaskType> tasks;
322  tasks.reserve(futures.size());
323 
324  for (auto it = futures.begin(); it != futures.end(); ++it) {
325  tasks.emplace_back(std::move(it->_task));
326  }
327 
328  futures.clear();
329 
330  async::task<AllValueType<T>> task =
331  async::when_all(tasks.begin(), tasks.end())
332  .then(
333  async::inline_scheduler(),
334  [](std::vector<TTaskType>&& tasks) {
335  if constexpr (std::is_void_v<T>) {
336  // Tasks return void. "Get" each task so that error
337  // information is propagated.
338  for (auto it = tasks.begin(); it != tasks.end(); ++it) {
339  it->get();
340  }
341  } else {
342  // Get all the results. If any tasks rejected, we'll bail
343  // with an exception.
344  std::vector<T> results;
345  results.reserve(tasks.size());
346 
347  for (auto it = tasks.begin(); it != tasks.end(); ++it) {
348  results.emplace_back(std::move(it->get()));
349  }
350  return results;
351  }
352  });
353  return Future<AllValueType<T>>(this->_pSchedulers, std::move(task));
354  }
355 
356  std::shared_ptr<CesiumImpl::AsyncSystemSchedulers> _pSchedulers;
357 
358  template <typename T> friend class Future;
359 };
360 } // namespace CesiumAsync
A system for managing asynchronous requests and tasks.
Definition: AsyncSystem.h:36
CesiumImpl::ContinuationFutureType_t< Func, void > runInThreadPool(const ThreadPool &threadPool, Func &&f) const
Runs a function in a thread pool, returning a Future that resolves when the function completes.
Definition: AsyncSystem.h:171
CesiumImpl::ContinuationFutureType_t< Func, void > runInMainThread(Func &&f) const
Runs a function in the main thread, returning a Future that resolves when the function completes.
Definition: AsyncSystem.h:146
bool dispatchOneMainThreadTask()
Runs a single waiting task that is currently queued for the main thread. If there are no tasks waitin...
Future< AllValueType< T > > all(std::vector< Future< T >> &&futures) const
Creates a Future that resolves when every Future in a vector resolves, and rejects when any Future in...
Definition: AsyncSystem.h:220
Future< T > createFuture(Func &&f) const
Creates a new Future by immediately invoking a function and giving it the opportunity to resolve or r...
Definition: AsyncSystem.h:65
CesiumImpl::ContinuationFutureType_t< Func, void > runInWorkerThread(Func &&f) const
Runs a function in a worker thread, returning a Future that resolves when the function completes.
Definition: AsyncSystem.h:116
Future< void > createResolvedFuture() const
Creates a future that is already resolved and resolves to no value.
Definition: AsyncSystem.h:271
Future< AllValueType< T > > all(std::vector< SharedFuture< T >> &&futures) const
Creates a Future that resolves when every Future in a vector resolves, and rejects when any Future in...
Definition: AsyncSystem.h:248
bool operator==(const AsyncSystem &rhs) const noexcept
void dispatchMainThreadTasks()
Runs all tasks that are currently queued for the main thread.
bool operator!=(const AsyncSystem &rhs) const noexcept
AsyncSystem(const std::shared_ptr< ITaskProcessor > &pTaskProcessor) noexcept
Constructs a new instance.
Promise< T > createPromise() const
Create a Promise that can be used at a later time to resolve or reject a Future.
Definition: AsyncSystem.h:93
Future< T > createResolvedFuture(T &&value) const
Creates a future that is already resolved.
Definition: AsyncSystem.h:260
std::conditional_t< std::is_void_v< T >, void, std::vector< T > > AllValueType
The value type of the Future returned by all.
Definition: AsyncSystem.h:195
ThreadPool createThreadPool(int32_t numberOfThreads) const
Creates a new thread pool that can be used to run continuations.
A value that will be available in the future, as produced by AsyncSystem.
Definition: Future.h:29
A promise that can be resolved or rejected by an asynchronous task.
Definition: Promise.h:18
A value that will be available in the future, as produced by AsyncSystem. Unlike Future,...
Definition: SharedFuture.h:31
A thread pool created by AsyncSystem::createThreadPool.
Definition: ThreadPool.h:18
Classes that support asynchronous operations.