summaryrefslogtreecommitdiffstats
path: root/core/object/worker_thread_pool.cpp
diff options
context:
space:
mode:
authorClay John <claynjohn@gmail.com>2024-09-16 16:05:57 -0700
committerGitHub <noreply@github.com>2024-09-16 16:05:57 -0700
commit48403b5358c11ffff702da82c48464db8c536ee3 (patch)
treeb5075fa2f06a6148d6d8e63ef4034f725ad767e6 /core/object/worker_thread_pool.cpp
parent99a7a9ccd60fbe4030e067b3c36d54b67737446d (diff)
parent5d371e33780ceda8b597a6b912a49929de8a1f04 (diff)
downloadredot-engine-48403b5358c11ffff702da82c48464db8c536ee3.tar.gz
Merge pull request #96959 from RandomShaper/revamp_languages_exit
WorkerThreadPool: Revamp interaction with ScriptServer
Diffstat (limited to 'core/object/worker_thread_pool.cpp')
-rw-r--r--core/object/worker_thread_pool.cpp178
1 files changed, 130 insertions, 48 deletions
diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp
index cf396c2676..08903d6196 100644
--- a/core/object/worker_thread_pool.cpp
+++ b/core/object/worker_thread_pool.cpp
@@ -180,13 +180,17 @@ void WorkerThreadPool::_process_task(Task *p_task) {
void WorkerThreadPool::_thread_function(void *p_user) {
ThreadData *thread_data = (ThreadData *)p_user;
+
while (true) {
Task *task_to_process = nullptr;
{
MutexLock lock(singleton->task_mutex);
- if (singleton->exit_threads) {
- return;
+
+ bool exit = singleton->_handle_runlevel(thread_data, lock);
+ if (unlikely(exit)) {
+ break;
}
+
thread_data->signaled = false;
if (singleton->task_queue.first()) {
@@ -194,7 +198,6 @@ void WorkerThreadPool::_thread_function(void *p_user) {
singleton->task_queue.remove(singleton->task_queue.first());
} else {
thread_data->cond_var.wait(lock);
- DEV_ASSERT(singleton->exit_threads || thread_data->signaled);
}
}
@@ -204,19 +207,24 @@ void WorkerThreadPool::_thread_function(void *p_user) {
}
}
-void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
+void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
// Fall back to processing on the calling thread if there are no worker threads.
// Separated into its own variable to make it easier to extend this logic
// in custom builds.
bool process_on_calling_thread = threads.size() == 0;
if (process_on_calling_thread) {
- task_mutex.unlock();
+ p_lock.temp_unlock();
for (uint32_t i = 0; i < p_count; i++) {
_process_task(p_tasks[i]);
}
+ p_lock.temp_relock();
return;
}
+ while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {
+ control_cond_var.wait(p_lock);
+ }
+
uint32_t to_process = 0;
uint32_t to_promote = 0;
@@ -238,8 +246,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count,
}
_notify_threads(caller_pool_thread, to_process, to_promote);
-
- task_mutex.unlock();
}
void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
@@ -323,9 +329,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
}
WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
- ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a task because the WorkerThreadPool is either not initialized yet or already terminated.");
+ MutexLock<BinaryMutex> lock(task_mutex);
- task_mutex.lock();
// Get a free task
Task *task = task_allocator.alloc();
TaskID id = last_task++;
@@ -337,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
task->template_userdata = p_template_userdata;
tasks.insert(id, task);
- _post_tasks_and_unlock(&task, 1, p_high_priority);
+ _post_tasks(&task, 1, p_high_priority, lock);
return id;
}
@@ -444,22 +449,34 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() {
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
// Keep processing tasks until the condition to stop waiting is met.
-#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)
-
while (true) {
Task *task_to_process = nullptr;
bool relock_unlockables = false;
{
MutexLock lock(task_mutex);
+
bool was_signaled = p_caller_pool_thread->signaled;
p_caller_pool_thread->signaled = false;
- if (IS_WAIT_OVER) {
- if (unlikely(p_task == ThreadData::YIELDING)) {
+ bool exit = _handle_runlevel(p_caller_pool_thread, lock);
+ if (unlikely(exit)) {
+ break;
+ }
+
+ bool wait_is_over = false;
+ if (unlikely(p_task == ThreadData::YIELDING)) {
+ if (p_caller_pool_thread->yield_is_over) {
p_caller_pool_thread->yield_is_over = false;
+ wait_is_over = true;
}
+ } else {
+ if (p_task->completed) {
+ wait_is_over = true;
+ }
+ }
- if (!exit_threads && was_signaled) {
+ if (wait_is_over) {
+ if (was_signaled) {
// This thread was awaken for some additional reason, but it's about to exit.
// Let's find out what may be pending and forward the requests.
uint32_t to_process = task_queue.first() ? 1 : 0;
@@ -474,28 +491,26 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
break;
}
- if (!exit_threads) {
- if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
- if (_try_promote_low_priority_task()) {
- _notify_threads(p_caller_pool_thread, 1, 0);
- }
+ if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
+ if (_try_promote_low_priority_task()) {
+ _notify_threads(p_caller_pool_thread, 1, 0);
}
+ }
- if (singleton->task_queue.first()) {
- task_to_process = task_queue.first()->self();
- task_queue.remove(task_queue.first());
- }
+ if (singleton->task_queue.first()) {
+ task_to_process = task_queue.first()->self();
+ task_queue.remove(task_queue.first());
+ }
- if (!task_to_process) {
- p_caller_pool_thread->awaited_task = p_task;
+ if (!task_to_process) {
+ p_caller_pool_thread->awaited_task = p_task;
- _unlock_unlockable_mutexes();
- relock_unlockables = true;
- p_caller_pool_thread->cond_var.wait(lock);
+ _unlock_unlockable_mutexes();
+ relock_unlockables = true;
- DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
- p_caller_pool_thread->awaited_task = nullptr;
- }
+ p_caller_pool_thread->cond_var.wait(lock);
+
+ p_caller_pool_thread->awaited_task = nullptr;
}
}
@@ -509,16 +524,65 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
}
}
+void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {
+ DEV_ASSERT(p_runlevel > runlevel);
+ runlevel = p_runlevel;
+ memset(&runlevel_data, 0, sizeof(runlevel_data));
+ for (uint32_t i = 0; i < threads.size(); i++) {
+ threads[i].cond_var.notify_one();
+ threads[i].signaled = true;
+ }
+ control_cond_var.notify_all();
+}
+
+// Returns whether threads have to exit. This may perform the check about handling needed.
+bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {
+ bool exit = false;
+ switch (runlevel) {
+ case RUNLEVEL_NORMAL: {
+ } break;
+ case RUNLEVEL_PRE_EXIT_LANGUAGES: {
+ if (!p_thread_data->pre_exited_languages) {
+ if (!task_queue.first() && !low_priority_task_queue.first()) {
+ p_thread_data->pre_exited_languages = true;
+ runlevel_data.pre_exit_languages.num_idle_threads++;
+ control_cond_var.notify_all();
+ }
+ }
+ } break;
+ case RUNLEVEL_EXIT_LANGUAGES: {
+ if (!p_thread_data->exited_languages) {
+ p_lock.temp_unlock();
+ ScriptServer::thread_exit();
+ p_lock.temp_relock();
+ p_thread_data->exited_languages = true;
+ runlevel_data.exit_languages.num_exited_threads++;
+ control_cond_var.notify_all();
+ }
+ } break;
+ case RUNLEVEL_EXIT: {
+ exit = true;
+ } break;
+ }
+ return exit;
+}
+
void WorkerThreadPool::yield() {
int th_index = get_thread_index();
ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);
- // If this long-lived task started before the scripting server was initialized,
- // now is a good time to have scripting languages ready for the current thread.
- // Otherwise, such a piece of setup won't happen unless another task has been
- // run during the collaborative wait.
- ScriptServer::thread_enter();
+ task_mutex.lock();
+ if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {
+ // If this long-lived task started before the scripting server was initialized,
+ // now is a good time to have scripting languages ready for the current thread.
+ // Otherwise, such a piece of setup won't happen unless another task has been
+ // run during the collaborative wait.
+ task_mutex.unlock();
+ ScriptServer::thread_enter();
+ } else {
+ task_mutex.unlock();
+ }
}
void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
@@ -543,13 +607,13 @@ void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
}
WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
- ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a group task because the WorkerThreadPool is either not initialized yet or already terminated.");
ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID);
if (p_tasks < 0) {
p_tasks = MAX(1u, threads.size());
}
- task_mutex.lock();
+ MutexLock<BinaryMutex> lock(task_mutex);
+
Group *group = group_allocator.alloc();
GroupID id = last_task++;
group->max = p_elements;
@@ -584,7 +648,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
groups[id] = group;
- _post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
+ _post_tasks(tasks_posted, p_tasks, p_high_priority, lock);
return id;
}
@@ -687,6 +751,9 @@ void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {
void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
ERR_FAIL_COND(threads.size() > 0);
+
+ runlevel = RUNLEVEL_NORMAL;
+
if (p_thread_count < 0) {
p_thread_count = OS::get_singleton()->get_default_thread_pool_size();
}
@@ -704,6 +771,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
}
}
+void WorkerThreadPool::exit_languages_threads() {
+ if (threads.size() == 0) {
+ return;
+ }
+
+ MutexLock lock(task_mutex);
+
+ // Wait until all threads are idle.
+ _switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);
+ while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {
+ control_cond_var.wait(lock);
+ }
+
+ // Wait until all threads have detached from scripting languages.
+ _switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);
+ while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {
+ control_cond_var.wait(lock);
+ }
+}
+
void WorkerThreadPool::finish() {
if (threads.size() == 0) {
return;
@@ -716,15 +803,10 @@ void WorkerThreadPool::finish() {
print_error("Task waiting was never re-claimed: " + E->self()->description);
E = E->next();
}
- }
- {
- MutexLock lock(task_mutex);
- exit_threads = true;
- }
- for (ThreadData &data : threads) {
- data.cond_var.notify_one();
+ _switch_runlevel(RUNLEVEL_EXIT);
}
+
for (ThreadData &data : threads) {
data.thread.wait_to_finish();
}
@@ -755,5 +837,5 @@ WorkerThreadPool::WorkerThreadPool() {
}
WorkerThreadPool::~WorkerThreadPool() {
- DEV_ASSERT(threads.size() == 0 && "finish() hasn't been called!");
+ finish();
}