summaryrefslogtreecommitdiffstats
path: root/core/object/worker_thread_pool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'core/object/worker_thread_pool.cpp')
-rw-r--r--core/object/worker_thread_pool.cpp158
1 files changed, 98 insertions, 60 deletions
diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp
index ef3d315e4b..c10f491a11 100644
--- a/core/object/worker_thread_pool.cpp
+++ b/core/object/worker_thread_pool.cpp
@@ -35,6 +35,8 @@
#include "core/os/thread_safe.h"
#include "core/templates/command_queue_mt.h"
+WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;
+
void WorkerThreadPool::Task::free_template_userdata() {
ERR_FAIL_NULL(template_userdata);
ERR_FAIL_NULL(native_func_userdata);
@@ -60,11 +62,13 @@ void WorkerThreadPool::_process_task(Task *p_task) {
// its pre-created threads can't have ScriptServer::thread_enter() called on them early.
// Therefore, we do it late at the first opportunity, so in case the task
// about to be run uses scripting, guarantees are held.
+ task_mutex.lock();
if (!curr_thread.ready_for_scripting && ScriptServer::are_languages_initialized()) {
+ task_mutex.unlock();
ScriptServer::thread_enter();
+ task_mutex.lock();
curr_thread.ready_for_scripting = true;
}
- task_mutex.lock();
p_task->pool_thread_index = pool_thread_index;
prev_task = curr_thread.current_task;
curr_thread.current_task = p_task;
@@ -389,83 +393,117 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
task_mutex.unlock();
if (caller_pool_thread) {
- while (true) {
- Task *task_to_process = nullptr;
- {
- MutexLock lock(task_mutex);
- bool was_signaled = caller_pool_thread->signaled;
- caller_pool_thread->signaled = false;
-
- if (task->completed) {
- // This thread was awaken also for some reason, but it's about to exit.
- // Let's find out what may be pending and forward the requests.
- if (!exit_threads && was_signaled) {
- uint32_t to_process = task_queue.first() ? 1 : 0;
- uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;
- if (to_process || to_promote) {
- // This thread must be left alone since it won't loop again.
- caller_pool_thread->signaled = true;
- _notify_threads(caller_pool_thread, to_process, to_promote);
- }
- }
+ _wait_collaboratively(caller_pool_thread, task);
+ task->waiting_pool--;
+ if (task->waiting_pool == 0 && task->waiting_user == 0) {
+ tasks.erase(p_task_id);
+ task_allocator.free(task);
+ }
+ } else {
+ task->done_semaphore.wait();
+ task_mutex.lock();
+ task->waiting_user--;
+ if (task->waiting_pool == 0 && task->waiting_user == 0) {
+ tasks.erase(p_task_id);
+ task_allocator.free(task);
+ }
+ task_mutex.unlock();
+ }
- task->waiting_pool--;
- if (task->waiting_pool == 0 && task->waiting_user == 0) {
- tasks.erase(p_task_id);
- task_allocator.free(task);
- }
+ return OK;
+}
- break;
- }
+void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
+ // Keep processing tasks until the condition to stop waiting is met.
- if (!exit_threads) {
- // This is a thread from the pool. It shouldn't just idle.
- // Let's try to process other tasks while we wait.
+#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)
- if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
- if (_try_promote_low_priority_task()) {
- _notify_threads(caller_pool_thread, 1, 0);
- }
+ while (true) {
+ Task *task_to_process = nullptr;
+ {
+ MutexLock lock(task_mutex);
+ bool was_signaled = p_caller_pool_thread->signaled;
+ p_caller_pool_thread->signaled = false;
+
+ if (IS_WAIT_OVER) {
+ p_caller_pool_thread->yield_is_over = false;
+ if (!exit_threads && was_signaled) {
+ // This thread was awaken for some additional reason, but it's about to exit.
+ // Let's find out what may be pending and forward the requests.
+ uint32_t to_process = task_queue.first() ? 1 : 0;
+ uint32_t to_promote = p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;
+ if (to_process || to_promote) {
+ // This thread must be left alone since it won't loop again.
+ p_caller_pool_thread->signaled = true;
+ _notify_threads(p_caller_pool_thread, to_process, to_promote);
}
+ }
+
+ break;
+ }
- if (singleton->task_queue.first()) {
- task_to_process = task_queue.first()->self();
- task_queue.remove(task_queue.first());
+ if (!exit_threads) {
+ if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
+ if (_try_promote_low_priority_task()) {
+ _notify_threads(p_caller_pool_thread, 1, 0);
}
+ }
- if (!task_to_process) {
- caller_pool_thread->awaited_task = task;
+ if (singleton->task_queue.first()) {
+ task_to_process = task_queue.first()->self();
+ task_queue.remove(task_queue.first());
+ }
- if (flushing_cmd_queue) {
- flushing_cmd_queue->unlock();
- }
- caller_pool_thread->cond_var.wait(lock);
- if (flushing_cmd_queue) {
- flushing_cmd_queue->lock();
- }
+ if (!task_to_process) {
+ p_caller_pool_thread->awaited_task = p_task;
- DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed);
- caller_pool_thread->awaited_task = nullptr;
+ if (flushing_cmd_queue) {
+ flushing_cmd_queue->unlock();
+ }
+ p_caller_pool_thread->cond_var.wait(lock);
+ if (flushing_cmd_queue) {
+ flushing_cmd_queue->lock();
}
- }
- }
- if (task_to_process) {
- _process_task(task_to_process);
+ DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
+ p_caller_pool_thread->awaited_task = nullptr;
+ }
}
}
- } else {
- task->done_semaphore.wait();
- task_mutex.lock();
- task->waiting_user--;
- if (task->waiting_pool == 0 && task->waiting_user == 0) {
- tasks.erase(p_task_id);
- task_allocator.free(task);
+
+ if (task_to_process) {
+ _process_task(task_to_process);
}
+ }
+}
+
+void WorkerThreadPool::yield() {
+ int th_index = get_thread_index();
+ ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
+ _wait_collaboratively(&threads[th_index], ThreadData::YIELDING);
+}
+
+void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
+ task_mutex.lock();
+ Task **taskp = tasks.getptr(p_task_id);
+ if (!taskp) {
task_mutex.unlock();
+ ERR_FAIL_MSG("Invalid Task ID.");
}
+ Task *task = *taskp;
- return OK;
+#ifdef DEBUG_ENABLED
+ if (task->pool_thread_index == get_thread_index()) {
+ WARN_PRINT("A worker thread is attempting to notify itself. That makes no sense.");
+ }
+#endif
+
+ ThreadData &td = threads[task->pool_thread_index];
+ td.yield_is_over = true;
+ td.signaled = true;
+ td.cond_var.notify_one();
+
+ task_mutex.unlock();
}
WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {