cesium-native 0.43.0
Loading...
Searching...
No Matches
AsyncSystem.h
1#pragma once
2
3#include "Future.h"
4#include "Impl/ContinuationFutureType.h"
5#include "Impl/RemoveFuture.h"
6#include "Impl/WithTracing.h"
7#include "Impl/cesium-async++.h"
8#include "Library.h"
9#include "Promise.h"
10#include "ThreadPool.h"
11
12#include <CesiumUtility/Tracing.h>
13
14#include <memory>
15#include <type_traits>
16
17namespace CesiumAsync {
18class ITaskProcessor;
19
20class AsyncSystem;
21
36class CESIUMASYNC_API AsyncSystem final {
37public:
44 AsyncSystem(const std::shared_ptr<ITaskProcessor>& pTaskProcessor) noexcept;
45
65 template <typename T, typename Func> Future<T> createFuture(Func&& f) const {
66 std::shared_ptr<async::event_task<T>> pEvent =
67 std::make_shared<async::event_task<T>>();
68
69 Promise<T> promise(this->_pSchedulers, pEvent);
70
71 try {
72 f(promise);
73 } catch (...) {
74 promise.reject(std::current_exception());
75 }
76
77 return Future<T>(this->_pSchedulers, pEvent->get_task());
78 }
79
93 template <typename T> Promise<T> createPromise() const {
94 return Promise<T>(
95 this->_pSchedulers,
96 std::make_shared<async::event_task<T>>());
97 }
98
114 template <typename Func>
115 CesiumImpl::ContinuationFutureType_t<Func, void>
116 runInWorkerThread(Func&& f) const {
117 static const char* tracingName = "waiting for worker thread";
118
119 CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
120
121 return CesiumImpl::ContinuationFutureType_t<Func, void>(
122 this->_pSchedulers,
123 async::spawn(
124 this->_pSchedulers->workerThread.immediate,
125 CesiumImpl::WithTracing<void>::end(
126 tracingName,
127 std::forward<Func>(f))));
128 }
129
144 template <typename Func>
145 CesiumImpl::ContinuationFutureType_t<Func, void>
146 runInMainThread(Func&& f) const {
147 static const char* tracingName = "waiting for main thread";
148
149 CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
150
151 return CesiumImpl::ContinuationFutureType_t<Func, void>(
152 this->_pSchedulers,
153 async::spawn(
154 this->_pSchedulers->mainThread.immediate,
155 CesiumImpl::WithTracing<void>::end(
156 tracingName,
157 std::forward<Func>(f))));
158 }
159
169 template <typename Func>
170 CesiumImpl::ContinuationFutureType_t<Func, void>
171 runInThreadPool(const ThreadPool& threadPool, Func&& f) const {
172 static const char* tracingName = "waiting for thread pool";
173
174 CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
175
176 return CesiumImpl::ContinuationFutureType_t<Func, void>(
177 this->_pSchedulers,
178 async::spawn(
179 threadPool._pScheduler->immediate,
180 CesiumImpl::WithTracing<void>::end(
181 tracingName,
182 std::forward<Func>(f))));
183 }
184
193 template <typename T>
195 std::conditional_t<std::is_void_v<T>, void, std::vector<T>>;
196
219 template <typename T>
220 Future<AllValueType<T>> all(std::vector<Future<T>>&& futures) const {
221 return this->all<T, Future<T>>(
222 std::forward<std::vector<Future<T>>>(futures));
223 }
224
247 template <typename T>
248 Future<AllValueType<T>> all(std::vector<SharedFuture<T>>&& futures) const {
249 return this->all<T, SharedFuture<T>>(
250 std::forward<std::vector<SharedFuture<T>>>(futures));
251 }
252
260 template <typename T> Future<T> createResolvedFuture(T&& value) const {
261 return Future<T>(
262 this->_pSchedulers,
263 async::make_task<T>(std::forward<T>(value)));
264 }
265
272 return Future<void>(this->_pSchedulers, async::make_task());
273 }
274
281
293
300 ThreadPool createThreadPool(int32_t numberOfThreads) const;
301
307 bool operator==(const AsyncSystem& rhs) const noexcept;
308
314 bool operator!=(const AsyncSystem& rhs) const noexcept;
315
316private:
317 // Common implementation of 'all' for both Future and SharedFuture.
318 template <typename T, typename TFutureType>
319 Future<AllValueType<T>> all(std::vector<TFutureType>&& futures) const {
320 using TTaskType = decltype(TFutureType::_task);
321 std::vector<TTaskType> tasks;
322 tasks.reserve(futures.size());
323
324 for (auto it = futures.begin(); it != futures.end(); ++it) {
325 tasks.emplace_back(std::move(it->_task));
326 }
327
328 futures.clear();
329
330 async::task<AllValueType<T>> task =
331 async::when_all(tasks.begin(), tasks.end())
332 .then(
333 async::inline_scheduler(),
334 [](std::vector<TTaskType>&& tasks) {
335 if constexpr (std::is_void_v<T>) {
336 // Tasks return void. "Get" each task so that error
337 // information is propagated.
338 for (auto it = tasks.begin(); it != tasks.end(); ++it) {
339 it->get();
340 }
341 } else {
342 // Get all the results. If any tasks rejected, we'll bail
343 // with an exception.
344 std::vector<T> results;
345 results.reserve(tasks.size());
346
347 for (auto it = tasks.begin(); it != tasks.end(); ++it) {
348 results.emplace_back(std::move(it->get()));
349 }
350 return results;
351 }
352 });
353 return Future<AllValueType<T>>(this->_pSchedulers, std::move(task));
354 }
355
356 std::shared_ptr<CesiumImpl::AsyncSystemSchedulers> _pSchedulers;
357
358 template <typename T> friend class Future;
359};
360} // namespace CesiumAsync
A system for managing asynchronous requests and tasks.
Definition AsyncSystem.h:36
Future< T > createResolvedFuture(T &&value) const
Creates a future that is already resolved.
bool dispatchOneMainThreadTask()
Runs a single waiting task that is currently queued for the main thread. If there are no tasks waitin...
Future< AllValueType< T > > all(std::vector< SharedFuture< T > > &&futures) const
Creates a Future that resolves when every Future in a vector resolves, and rejects when any Future in...
bool operator==(const AsyncSystem &rhs) const noexcept
std::conditional_t< std::is_void_v< T >, void, std::vector< T > > AllValueType
The value type of the Future returned by all.
void dispatchMainThreadTasks()
Runs all tasks that are currently queued for the main thread.
bool operator!=(const AsyncSystem &rhs) const noexcept
AsyncSystem(const std::shared_ptr< ITaskProcessor > &pTaskProcessor) noexcept
Constructs a new instance.
Future< void > createResolvedFuture() const
Creates a future that is already resolved and resolves to no value.
CesiumImpl::ContinuationFutureType_t< Func, void > runInThreadPool(const ThreadPool &threadPool, Func &&f) const
Runs a function in a thread pool, returning a Future that resolves when the function completes.
CesiumImpl::ContinuationFutureType_t< Func, void > runInWorkerThread(Func &&f) const
Runs a function in a worker thread, returning a Future that resolves when the function completes.
Promise< T > createPromise() const
Create a Promise that can be used at a later time to resolve or reject a Future.
Definition AsyncSystem.h:93
Future< T > createFuture(Func &&f) const
Creates a new Future by immediately invoking a function and giving it the opportunity to resolve or r...
Definition AsyncSystem.h:65
CesiumImpl::ContinuationFutureType_t< Func, void > runInMainThread(Func &&f) const
Runs a function in the main thread, returning a Future that resolves when the function completes.
ThreadPool createThreadPool(int32_t numberOfThreads) const
Creates a new thread pool that can be used to run continuations.
Future< AllValueType< T > > all(std::vector< Future< T > > &&futures) const
Creates a Future that resolves when every Future in a vector resolves, and rejects when any Future in...
A value that will be available in the future, as produced by AsyncSystem.
Definition Promise.h:11
A promise that can be resolved or rejected by an asynchronous task.
Definition Promise.h:18
A value that will be available in the future, as produced by AsyncSystem. Unlike Future,...
A thread pool created by AsyncSystem::createThreadPool.
Definition ThreadPool.h:18
Classes that support asynchronous operations.