cesium-native 0.57.0
Loading...
Searching...
No Matches
AsyncSystem.h
1#pragma once
2
3#include "Impl/ContinuationFutureType.h"
4#include "Impl/RemoveFuture.h"
5#include "Impl/WithTracing.h"
6#include "Impl/cesium-async++.h"
7
8#include <CesiumAsync/Future.h>
9#include <CesiumAsync/Library.h>
10#include <CesiumAsync/Promise.h>
11#include <CesiumAsync/ThreadPool.h>
12#include <CesiumUtility/Tracing.h>
13#include <CesiumUtility/transformTuple.h>
14
15#include <memory>
16#include <type_traits>
17
18namespace CesiumAsync {
19class ITaskProcessor;
20
21class AsyncSystem;
22
37class CESIUMASYNC_API AsyncSystem final {
38public:
45 AsyncSystem(const std::shared_ptr<ITaskProcessor>& pTaskProcessor) noexcept;
46
66 template <typename T, typename Func> Future<T> createFuture(Func&& f) const {
67 std::shared_ptr<async::event_task<T>> pEvent =
68 std::make_shared<async::event_task<T>>();
69
70 Promise<T> promise(this->_pSchedulers, pEvent);
71
72 try {
73 f(promise);
74 } catch (...) {
75 promise.reject(std::current_exception());
76 }
77
78 return Future<T>(this->_pSchedulers, pEvent->get_task());
79 }
80
94 template <typename T> Promise<T> createPromise() const {
95 return Promise<T>(
96 this->_pSchedulers,
97 std::make_shared<async::event_task<T>>());
98 }
99
115 template <typename Func>
116 CesiumImpl::ContinuationFutureType_t<Func, void>
117 runInWorkerThread(Func&& f) const {
118 static const char* tracingName = "waiting for worker thread";
119
120 CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
121
122 return CesiumImpl::ContinuationFutureType_t<Func, void>(
123 this->_pSchedulers,
124 async::spawn(
125 this->_pSchedulers->workerThread.immediate,
126 CesiumImpl::WithTracing<void>::end(
127 tracingName,
128 std::forward<Func>(f))));
129 }
130
145 template <typename Func>
146 CesiumImpl::ContinuationFutureType_t<Func, void>
147 runInMainThread(Func&& f) const {
148 static const char* tracingName = "waiting for main thread";
149
150 CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
151
152 return CesiumImpl::ContinuationFutureType_t<Func, void>(
153 this->_pSchedulers,
154 async::spawn(
155 this->_pSchedulers->mainThread.immediate,
156 CesiumImpl::WithTracing<void>::end(
157 tracingName,
158 std::forward<Func>(f))));
159 }
160
170 template <typename Func>
171 CesiumImpl::ContinuationFutureType_t<Func, void>
172 runInThreadPool(const ThreadPool& threadPool, Func&& f) const {
173 static const char* tracingName = "waiting for thread pool";
174
175 CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
176
177 return CesiumImpl::ContinuationFutureType_t<Func, void>(
178 this->_pSchedulers,
179 async::spawn(
180 threadPool._pScheduler->immediate,
181 CesiumImpl::WithTracing<void>::end(
182 tracingName,
183 std::forward<Func>(f))));
184 }
185
194 template <typename T>
196 std::conditional_t<std::is_void_v<T>, void, std::vector<T>>;
197
220 template <typename T>
221 Future<AllValueType<T>> all(std::vector<Future<T>>&& futures) const {
222 return this->all<T, Future<T>>(
223 std::forward<std::vector<Future<T>>>(futures));
224 }
225
248 template <typename T>
249 Future<AllValueType<T>> all(std::vector<SharedFuture<T>>&& futures) const {
250 return this->all<T, SharedFuture<T>>(
251 std::forward<std::vector<SharedFuture<T>>>(futures));
252 }
253
263 template <typename... Futures>
265 std::tuple<typename CesiumImpl::RemoveFuture<Futures>::type...>;
266
290 template <typename... Futures>
291 Future<AllTupleType<Futures...>> all(Futures&&... futures) const {
292 // Helper function so that we can get the `_task` member of each future and
293 // pass as a variadic pack.
294 auto getTaskFromFuture = [](auto&& future) {
295 return std::move(future._task);
296 };
297
298 using AggregateTaskType = std::tuple<
299 async::task<typename CesiumImpl::RemoveFuture<Futures>::type>...>;
300
301 async::task<AllTupleType<Futures...>> task =
302 async::when_all(getTaskFromFuture(std::forward<Futures>(futures))...)
303 .then(async::inline_scheduler(), [](AggregateTaskType&& tasks) {
305 std::move(tasks),
306 [](auto&& task) { return task.get(); });
307 });
308
309 return Future<AllTupleType<Futures...>>(
310 this->_pSchedulers,
311 std::move(task));
312 }
313
321 template <typename T> Future<T> createResolvedFuture(T&& value) const {
322 return Future<T>(
323 this->_pSchedulers,
324 async::make_task<T>(std::forward<T>(value)));
325 }
326
332 Future<void> createResolvedFuture() const {
333 return Future<void>(this->_pSchedulers, async::make_task());
334 }
335
342
354
361 using MainThreadScope = CesiumImpl::ImmediateScheduler<
362 CesiumImpl::QueuedScheduler>::SchedulerScope;
363
371
378 ThreadPool createThreadPool(int32_t numberOfThreads) const;
379
385 bool operator==(const AsyncSystem& rhs) const noexcept;
386
392 bool operator!=(const AsyncSystem& rhs) const noexcept;
393
394private:
395 // Common implementation of 'all' for both Future and SharedFuture.
396 template <typename T, typename TFutureType>
397 Future<AllValueType<T>> all(std::vector<TFutureType>&& futures) const {
398 using TTaskType = decltype(TFutureType::_task);
399 std::vector<TTaskType> tasks;
400 tasks.reserve(futures.size());
401
402 for (auto it = futures.begin(); it != futures.end(); ++it) {
403 tasks.emplace_back(std::move(it->_task));
404 }
405
406 futures.clear();
407
408 async::task<AllValueType<T>> task =
409 async::when_all(tasks.begin(), tasks.end())
410 .then(
411 async::inline_scheduler(),
412 [](std::vector<TTaskType>&& tasks) {
413 if constexpr (std::is_void_v<T>) {
414 // Tasks return void. "Get" each task so that error
415 // information is propagated.
416 for (auto it = tasks.begin(); it != tasks.end(); ++it) {
417 it->get();
418 }
419 } else {
420 // Get all the results. If any tasks rejected, we'll bail
421 // with an exception.
422 std::vector<T> results;
423 results.reserve(tasks.size());
424
425 for (auto it = tasks.begin(); it != tasks.end(); ++it) {
426 results.emplace_back(std::move(it->get()));
427 }
428 return results;
429 }
430 });
431 return Future<AllValueType<T>>(this->_pSchedulers, std::move(task));
432 }
433
434 std::shared_ptr<CesiumImpl::AsyncSystemSchedulers> _pSchedulers;
435
436 template <typename T> friend class Future;
437};
438} // namespace CesiumAsync
A system for managing asynchronous requests and tasks.
Definition AsyncSystem.h:37
std::tuple< typename CesiumImpl::RemoveFuture< Futures >::type... > AllTupleType
The value type of the Future returned by AsyncSystem::all(Futures&&... futures) const.
Future< T > createResolvedFuture(T &&value) const
Creates a future that is already resolved.
bool dispatchOneMainThreadTask()
Runs a single waiting task that is currently queued for the main thread. If there are no tasks waitin...
CesiumImpl::ImmediateScheduler< CesiumImpl::QueuedScheduler >::SchedulerScope MainThreadScope
An object that denotes a scope for the current thread acting as the "main thread"....
Future< AllValueType< T > > all(std::vector< SharedFuture< T > > &&futures) const
Creates a Future that resolves when every Future in a vector resolves, and rejects when any Future in...
Future< AllTupleType< Futures... > > all(Futures &&... futures) const
Creates a Future that resolves when every Future in a variadic parameter pack resolves,...
bool operator==(const AsyncSystem &rhs) const noexcept
std::conditional_t< std::is_void_v< T >, void, std::vector< T > > AllValueType
The value type of the Future returned by AsyncSystem::all(std::vector<Future<T>>&&) const.
void dispatchMainThreadTasks()
Runs all tasks that are currently queued for the main thread.
bool operator!=(const AsyncSystem &rhs) const noexcept
AsyncSystem(const std::shared_ptr< ITaskProcessor > &pTaskProcessor) noexcept
Constructs a new instance.
Future< void > createResolvedFuture() const
Creates a future that is already resolved and resolves to no value.
CesiumImpl::ContinuationFutureType_t< Func, void > runInThreadPool(const ThreadPool &threadPool, Func &&f) const
Runs a function in a thread pool, returning a Future that resolves when the function completes.
CesiumImpl::ContinuationFutureType_t< Func, void > runInWorkerThread(Func &&f) const
Runs a function in a worker thread, returning a Future that resolves when the function completes.
Promise< T > createPromise() const
Create a Promise that can be used at a later time to resolve or reject a Future.
Definition AsyncSystem.h:94
Future< T > createFuture(Func &&f) const
Creates a new Future by immediately invoking a function and giving it the opportunity to resolve or r...
Definition AsyncSystem.h:66
CesiumImpl::ContinuationFutureType_t< Func, void > runInMainThread(Func &&f) const
Runs a function in the main thread, returning a Future that resolves when the function completes.
MainThreadScope enterMainThread() const
Enters a scope in which the current thread is treated as the "mainthread". It is essential that no ot...
ThreadPool createThreadPool(int32_t numberOfThreads) const
Creates a new thread pool that can be used to run continuations.
Future< AllValueType< T > > all(std::vector< Future< T > > &&futures) const
Creates a Future that resolves when every Future in a vector resolves, and rejects when any Future in...
A value that will be available in the future, as produced by AsyncSystem.
Definition Future.h:29
When implemented by a rendering engine, allows tasks to be asynchronously executed in background thre...
A promise that can be resolved or rejected by an asynchronous task.
Definition Promise.h:19
A value that will be available in the future, as produced by AsyncSystem. Unlike Future,...
A thread pool created by AsyncSystem::createThreadPool.
Definition ThreadPool.h:19
Classes that support asynchronous operations.
auto transformTuple(Tuple &&tuple, Func &&f)
Transforms each element of a tuple by applying a function to it.