diff options
| author | Pedro J. Estébanez <pedrojrulez@gmail.com> | 2024-04-09 17:26:45 +0200 |
|---|---|---|
| committer | Pedro J. Estébanez <pedrojrulez@gmail.com> | 2024-04-10 18:47:42 +0200 |
| commit | 1b104ffcd8bc4573924754552508f5416573a7a1 (patch) | |
| tree | dbe714f1a13ca22d34404a1e297ba0859b47b930 /core/object | |
| parent | 71facbaa882494f5c3dccf6a799ce84dcd12e4c9 (diff) | |
| download | redot-engine-1b104ffcd8bc4573924754552508f5416573a7a1.tar.gz | |
WorkerThreadPool: Support daemon-like tasks (via yield semantics)
Diffstat (limited to 'core/object')
| -rw-r--r-- | core/object/worker_thread_pool.cpp | 154 | ||||
| -rw-r--r-- | core/object/worker_thread_pool.h | 19 |
2 files changed, 111 insertions, 62 deletions
diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 4ec0ab4df2..c10f491a11 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -35,6 +35,8 @@ #include "core/os/thread_safe.h" #include "core/templates/command_queue_mt.h" +WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1; + void WorkerThreadPool::Task::free_template_userdata() { ERR_FAIL_NULL(template_userdata); ERR_FAIL_NULL(native_func_userdata); @@ -391,83 +393,117 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { task_mutex.unlock(); if (caller_pool_thread) { - while (true) { - Task *task_to_process = nullptr; - { - MutexLock lock(task_mutex); - bool was_signaled = caller_pool_thread->signaled; - caller_pool_thread->signaled = false; - - if (task->completed) { - // This thread was awaken also for some reason, but it's about to exit. - // Let's find out what may be pending and forward the requests. - if (!exit_threads && was_signaled) { - uint32_t to_process = task_queue.first() ? 1 : 0; - uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0; - if (to_process || to_promote) { - // This thread must be left alone since it won't loop again. - caller_pool_thread->signaled = true; - _notify_threads(caller_pool_thread, to_process, to_promote); - } - } + _wait_collaboratively(caller_pool_thread, task); + task->waiting_pool--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); + } + } else { + task->done_semaphore.wait(); + task_mutex.lock(); + task->waiting_user--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); + } + task_mutex.unlock(); + } - task->waiting_pool--; - if (task->waiting_pool == 0 && task->waiting_user == 0) { - tasks.erase(p_task_id); - task_allocator.free(task); - } + return OK; +} - break; - } +void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) { + // Keep processing tasks until the condition to stop waiting is met. - if (!exit_threads) { - // This is a thread from the pool. It shouldn't just idle. - // Let's try to process other tasks while we wait. +#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed) - if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { - if (_try_promote_low_priority_task()) { - _notify_threads(caller_pool_thread, 1, 0); - } + while (true) { + Task *task_to_process = nullptr; + { + MutexLock lock(task_mutex); + bool was_signaled = p_caller_pool_thread->signaled; + p_caller_pool_thread->signaled = false; + + if (IS_WAIT_OVER) { + p_caller_pool_thread->yield_is_over = false; + if (!exit_threads && was_signaled) { + // This thread was awaken for some additional reason, but it's about to exit. + // Let's find out what may be pending and forward the requests. + uint32_t to_process = task_queue.first() ? 1 : 0; + uint32_t to_promote = p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0; + if (to_process || to_promote) { + // This thread must be left alone since it won't loop again. + p_caller_pool_thread->signaled = true; + _notify_threads(p_caller_pool_thread, to_process, to_promote); } + } + + break; + } - if (singleton->task_queue.first()) { - task_to_process = task_queue.first()->self(); - task_queue.remove(task_queue.first()); + if (!exit_threads) { + if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { + if (_try_promote_low_priority_task()) { + _notify_threads(p_caller_pool_thread, 1, 0); } + } - if (!task_to_process) { - caller_pool_thread->awaited_task = task; + if (singleton->task_queue.first()) { + task_to_process = task_queue.first()->self(); + task_queue.remove(task_queue.first()); + } - if (flushing_cmd_queue) { - flushing_cmd_queue->unlock(); - } - caller_pool_thread->cond_var.wait(lock); - if (flushing_cmd_queue) { - flushing_cmd_queue->lock(); - } + if (!task_to_process) { + p_caller_pool_thread->awaited_task = p_task; - DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed); - caller_pool_thread->awaited_task = nullptr; + if (flushing_cmd_queue) { + flushing_cmd_queue->unlock(); + } + p_caller_pool_thread->cond_var.wait(lock); + if (flushing_cmd_queue) { + flushing_cmd_queue->lock(); } - } - } - if (task_to_process) { - _process_task(task_to_process); + DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER); + p_caller_pool_thread->awaited_task = nullptr; + } } } - } else { - task->done_semaphore.wait(); - task_mutex.lock(); - task->waiting_user--; - if (task->waiting_pool == 0 && task->waiting_user == 0) { - tasks.erase(p_task_id); - task_allocator.free(task); + + if (task_to_process) { + _process_task(task_to_process); } + } +} + +void WorkerThreadPool::yield() { + int th_index = get_thread_index(); + ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread."); + _wait_collaboratively(&threads[th_index], ThreadData::YIELDING); +} + +void WorkerThreadPool::notify_yield_over(TaskID p_task_id) { + task_mutex.lock(); + Task **taskp = tasks.getptr(p_task_id); + if (!taskp) { task_mutex.unlock(); + ERR_FAIL_MSG("Invalid Task ID."); } + Task *task = *taskp; - return OK; +#ifdef DEBUG_ENABLED + if (task->pool_thread_index == get_thread_index()) { + WARN_PRINT("A worker thread is attempting to notify itself. That makes no sense."); + } +#endif + + ThreadData &td = threads[task->pool_thread_index]; + td.yield_is_over = true; + td.signaled = true; + td.cond_var.notify_one(); + + task_mutex.unlock(); } WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index fdddc9a647..64f24df79f 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -107,13 +107,21 @@ private: BinaryMutex task_mutex; struct ThreadData { + static Task *const YIELDING; // Too bad constexpr doesn't work here. + uint32_t index = 0; Thread thread; - bool ready_for_scripting = false; - bool signaled = false; + bool ready_for_scripting : 1; + bool signaled : 1; + bool yield_is_over : 1; Task *current_task = nullptr; - Task *awaited_task = nullptr; // Null if not awaiting the condition variable. Special value for idle-waiting. + Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING). ConditionVariable cond_var; + + ThreadData() : + ready_for_scripting(false), + signaled(false), + yield_is_over(false) {} }; TightLocalVector<ThreadData> threads; @@ -177,6 +185,8 @@ private: } }; + void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task); + protected: static void _bind_methods(); @@ -196,6 +206,9 @@ public: bool is_task_completed(TaskID p_task_id) const; Error wait_for_task_completion(TaskID p_task_id); + void yield(); + void notify_yield_over(TaskID p_task_id); + template <typename C, typename M, typename U> GroupID add_template_group_task(C *p_instance, M p_method, U p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String()) { typedef GroupUserData<C, M, U> GroupUD; |
