diff options
Diffstat (limited to 'core/object/worker_thread_pool.cpp')
-rw-r--r-- | core/object/worker_thread_pool.cpp | 158 |
1 files changed, 98 insertions, 60 deletions
diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index ef3d315e4b..c10f491a11 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -35,6 +35,8 @@ #include "core/os/thread_safe.h" #include "core/templates/command_queue_mt.h" +WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1; + void WorkerThreadPool::Task::free_template_userdata() { ERR_FAIL_NULL(template_userdata); ERR_FAIL_NULL(native_func_userdata); @@ -60,11 +62,13 @@ void WorkerThreadPool::_process_task(Task *p_task) { // its pre-created threads can't have ScriptServer::thread_enter() called on them early. // Therefore, we do it late at the first opportunity, so in case the task // about to be run uses scripting, guarantees are held. + task_mutex.lock(); if (!curr_thread.ready_for_scripting && ScriptServer::are_languages_initialized()) { + task_mutex.unlock(); ScriptServer::thread_enter(); + task_mutex.lock(); curr_thread.ready_for_scripting = true; } - task_mutex.lock(); p_task->pool_thread_index = pool_thread_index; prev_task = curr_thread.current_task; curr_thread.current_task = p_task; @@ -389,83 +393,117 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { task_mutex.unlock(); if (caller_pool_thread) { - while (true) { - Task *task_to_process = nullptr; - { - MutexLock lock(task_mutex); - bool was_signaled = caller_pool_thread->signaled; - caller_pool_thread->signaled = false; - - if (task->completed) { - // This thread was awaken also for some reason, but it's about to exit. - // Let's find out what may be pending and forward the requests. - if (!exit_threads && was_signaled) { - uint32_t to_process = task_queue.first() ? 1 : 0; - uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0; - if (to_process || to_promote) { - // This thread must be left alone since it won't loop again. - caller_pool_thread->signaled = true; - _notify_threads(caller_pool_thread, to_process, to_promote); - } - } + _wait_collaboratively(caller_pool_thread, task); + task->waiting_pool--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); + } + } else { + task->done_semaphore.wait(); + task_mutex.lock(); + task->waiting_user--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); + } + task_mutex.unlock(); + } - task->waiting_pool--; - if (task->waiting_pool == 0 && task->waiting_user == 0) { - tasks.erase(p_task_id); - task_allocator.free(task); - } + return OK; +} - break; - } +void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) { + // Keep processing tasks until the condition to stop waiting is met. - if (!exit_threads) { - // This is a thread from the pool. It shouldn't just idle. - // Let's try to process other tasks while we wait. +#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed) - if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { - if (_try_promote_low_priority_task()) { - _notify_threads(caller_pool_thread, 1, 0); - } + while (true) { + Task *task_to_process = nullptr; + { + MutexLock lock(task_mutex); + bool was_signaled = p_caller_pool_thread->signaled; + p_caller_pool_thread->signaled = false; + + if (IS_WAIT_OVER) { + p_caller_pool_thread->yield_is_over = false; + if (!exit_threads && was_signaled) { + // This thread was awaken for some additional reason, but it's about to exit. + // Let's find out what may be pending and forward the requests. + uint32_t to_process = task_queue.first() ? 1 : 0; + uint32_t to_promote = p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0; + if (to_process || to_promote) { + // This thread must be left alone since it won't loop again. + p_caller_pool_thread->signaled = true; + _notify_threads(p_caller_pool_thread, to_process, to_promote); } + } + + break; + } - if (singleton->task_queue.first()) { - task_to_process = task_queue.first()->self(); - task_queue.remove(task_queue.first()); + if (!exit_threads) { + if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { + if (_try_promote_low_priority_task()) { + _notify_threads(p_caller_pool_thread, 1, 0); } + } - if (!task_to_process) { - caller_pool_thread->awaited_task = task; + if (singleton->task_queue.first()) { + task_to_process = task_queue.first()->self(); + task_queue.remove(task_queue.first()); + } - if (flushing_cmd_queue) { - flushing_cmd_queue->unlock(); - } - caller_pool_thread->cond_var.wait(lock); - if (flushing_cmd_queue) { - flushing_cmd_queue->lock(); - } + if (!task_to_process) { + p_caller_pool_thread->awaited_task = p_task; - DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed); - caller_pool_thread->awaited_task = nullptr; + if (flushing_cmd_queue) { + flushing_cmd_queue->unlock(); + } + p_caller_pool_thread->cond_var.wait(lock); + if (flushing_cmd_queue) { + flushing_cmd_queue->lock(); } - } - } - if (task_to_process) { - _process_task(task_to_process); + DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER); + p_caller_pool_thread->awaited_task = nullptr; + } } } - } else { - task->done_semaphore.wait(); - task_mutex.lock(); - task->waiting_user--; - if (task->waiting_pool == 0 && task->waiting_user == 0) { - tasks.erase(p_task_id); - task_allocator.free(task); + + if (task_to_process) { + _process_task(task_to_process); } + } +} + +void WorkerThreadPool::yield() { + int th_index = get_thread_index(); + ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread."); + _wait_collaboratively(&threads[th_index], ThreadData::YIELDING); +} + +void WorkerThreadPool::notify_yield_over(TaskID p_task_id) { + task_mutex.lock(); + Task **taskp = tasks.getptr(p_task_id); + if (!taskp) { task_mutex.unlock(); + ERR_FAIL_MSG("Invalid Task ID."); } + Task *task = *taskp; - return OK; +#ifdef DEBUG_ENABLED + if (task->pool_thread_index == get_thread_index()) { + WARN_PRINT("A worker thread is attempting to notify itself. That makes no sense."); + } +#endif + + ThreadData &td = threads[task->pool_thread_index]; + td.yield_is_over = true; + td.signaled = true; + td.cond_var.notify_one(); + + task_mutex.unlock(); } WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { |