36using namespace std::literals;
40 namespace discord_core_internal {
49 DCA_INLINE worker_thread(){};
51 DCA_INLINE worker_thread& operator=(worker_thread&& other)
noexcept {
52 areWeCurrentlyWorking.store(other.areWeCurrentlyWorking.load(std::memory_order_acquire), std::memory_order_release);
53 std::swap(
thread, other.thread);
54 tasks = std::move(other.tasks);
58 DCA_INLINE worker_thread(worker_thread&& other)
noexcept {
59 *
this = std::move(other);
62 DCA_INLINE ~worker_thread() =
default;
64 unbounded_message_block<std::coroutine_handle<>>
tasks{};
72 using map_type = std::unordered_map<uint64_t, unique_ptr<worker_thread>>;
78 uint64_t indexNew = x;
80 std::this_thread::sleep_for(150ms);
81 getMap().at(indexNew)->thread = std::jthread{ [=,
this](std::stop_token tokenNew)
mutable {
82 auto indexNewer = indexNew;
90 DCA_INLINE
void submitTask(std::coroutine_handle<> coro) {
91 uint64_t currentLowestIndex{ std::numeric_limits<uint64_t>::max() };
92 while (currentLowestIndex == std::numeric_limits<uint64_t>::max()) {
93 currentLowestIndex = areWeAllBusy();
94 std::this_thread::sleep_for(1ms);
97 if (std ::this_thread::get_id() == getMap()[currentLowestIndex]->thread.get_id()) {
100 getMap()[currentLowestIndex]->tasks.send(std::move(coro));
104 doWeQuit.store(
true, std::memory_order_release);
115 DCA_INLINE
void threadFunction(worker_thread* thread, std::stop_token tokenNew) {
116 while (!
doWeQuit.load(std::memory_order_acquire) && !tokenNew.stop_requested()) {
117 std::coroutine_handle<> coroHandle{};
118 if (thread->tasks.tryReceive(coroHandle)) {
119 thread->areWeCurrentlyWorking.store(
true, std::memory_order_release);
122 while (!coroHandle.done()) {
123 std::this_thread::sleep_for(1ms);
125 }
catch (
const std::runtime_error& error) {
128 thread->areWeCurrentlyWorking.store(
false, std::memory_order_release);
130 std::this_thread::sleep_for(std::chrono::nanoseconds{ 100000 });
134 DCA_INLINE uint64_t areWeAllBusy() {
135 for (uint64_t x = 0; x < this->size(); ++x) {
136 if (this->contains(x)) {
137 if (!this->
operator[](x)->areWeCurrentlyWorking.load(std::memory_order_acquire)) {
142 return std::numeric_limits<uint64_t>::max();
145 DCA_INLINE map_type& getMap() {
A class representing a coroutine-based thread pool.
DCA_INLINE co_routine_thread_pool()
Constructor to create a coroutine thread pool. initializes the worker threads.
std::shared_mutex workerAccessMutex
Shared mutex for worker thread access.
std::atomic_bool doWeQuit
Whether or not we're quitting.
const uint64_t threadCount
Total thread count.
DCA_INLINE void threadFunction(worker_thread *thread, std::stop_token tokenNew)
Thread function for each worker thread.
DCA_INLINE void submitTask(std::coroutine_handle<> coro)
Submit a coroutine task to the thread pool.
static DCA_INLINE void printError(const string_type &what, std::source_location where=std::source_location::current())
Print an error message of the specified type.
DCA_INLINE unique_ptr< value_type, deleter > makeUnique(arg_types &&... args)
Helper function to create a unique_ptr for a non-array object.
The main namespace for the forward-facing interfaces.
A struct representing a worker thread for coroutine-based tasks.
std::jthread thread
Joinable thread.
unbounded_message_block< std::coroutine_handle<> > tasks
Queue of coroutine tasks.
std::atomic_bool areWeCurrentlyWorking
Atomic flag indicating if the thread is working.