summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPedro J. Estébanez <pedrojrulez@gmail.com>2024-09-16 11:51:57 +0200
committerPedro J. Estébanez <pedrojrulez@gmail.com>2024-09-16 18:20:10 +0200
commit5d371e33780ceda8b597a6b912a49929de8a1f04 (patch)
treefae1c2e2b625cd975cf6e20eba7d2b2e40d27a8e
parent2a483fa9ba46035270c95a9cebf3877f307b071c (diff)
downloadredot-engine-5d371e33780ceda8b597a6b912a49929de8a1f04.tar.gz
WorkerThreadPool: Add safety point between languages finished and pool termination
-rw-r--r--core/object/worker_thread_pool.cpp94
-rw-r--r--core/object/worker_thread_pool.h22
-rw-r--r--main/main.cpp2
3 files changed, 98 insertions, 20 deletions
diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp
index 0c19fe06a4..08903d6196 100644
--- a/core/object/worker_thread_pool.cpp
+++ b/core/object/worker_thread_pool.cpp
@@ -186,7 +186,7 @@ void WorkerThreadPool::_thread_function(void *p_user) {
{
MutexLock lock(singleton->task_mutex);
- bool exit = singleton->_handle_runlevel();
+ bool exit = singleton->_handle_runlevel(thread_data, lock);
if (unlikely(exit)) {
break;
}
@@ -207,19 +207,24 @@ void WorkerThreadPool::_thread_function(void *p_user) {
}
}
-void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
+void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
// Fall back to processing on the calling thread if there are no worker threads.
// Separated into its own variable to make it easier to extend this logic
// in custom builds.
bool process_on_calling_thread = threads.size() == 0;
if (process_on_calling_thread) {
- task_mutex.unlock();
+ p_lock.temp_unlock();
for (uint32_t i = 0; i < p_count; i++) {
_process_task(p_tasks[i]);
}
+ p_lock.temp_relock();
return;
}
+ while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {
+ control_cond_var.wait(p_lock);
+ }
+
uint32_t to_process = 0;
uint32_t to_promote = 0;
@@ -241,8 +246,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count,
}
_notify_threads(caller_pool_thread, to_process, to_promote);
-
- task_mutex.unlock();
}
void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
@@ -326,7 +329,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
}
WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
- task_mutex.lock();
+ MutexLock<BinaryMutex> lock(task_mutex);
+
// Get a free task
Task *task = task_allocator.alloc();
TaskID id = last_task++;
@@ -338,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
task->template_userdata = p_template_userdata;
tasks.insert(id, task);
- _post_tasks_and_unlock(&task, 1, p_high_priority);
+ _post_tasks(&task, 1, p_high_priority, lock);
return id;
}
@@ -454,7 +458,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
bool was_signaled = p_caller_pool_thread->signaled;
p_caller_pool_thread->signaled = false;
- bool exit = _handle_runlevel();
+ bool exit = _handle_runlevel(p_caller_pool_thread, lock);
if (unlikely(exit)) {
break;
}
@@ -523,15 +527,44 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {
DEV_ASSERT(p_runlevel > runlevel);
runlevel = p_runlevel;
+ memset(&runlevel_data, 0, sizeof(runlevel_data));
for (uint32_t i = 0; i < threads.size(); i++) {
threads[i].cond_var.notify_one();
threads[i].signaled = true;
}
+ control_cond_var.notify_all();
}
// Returns whether threads have to exit. This may perform the check about handling needed.
-bool WorkerThreadPool::_handle_runlevel() {
- return runlevel == RUNLEVEL_EXIT;
+bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {
+ bool exit = false;
+ switch (runlevel) {
+ case RUNLEVEL_NORMAL: {
+ } break;
+ case RUNLEVEL_PRE_EXIT_LANGUAGES: {
+ if (!p_thread_data->pre_exited_languages) {
+ if (!task_queue.first() && !low_priority_task_queue.first()) {
+ p_thread_data->pre_exited_languages = true;
+ runlevel_data.pre_exit_languages.num_idle_threads++;
+ control_cond_var.notify_all();
+ }
+ }
+ } break;
+ case RUNLEVEL_EXIT_LANGUAGES: {
+ if (!p_thread_data->exited_languages) {
+ p_lock.temp_unlock();
+ ScriptServer::thread_exit();
+ p_lock.temp_relock();
+ p_thread_data->exited_languages = true;
+ runlevel_data.exit_languages.num_exited_threads++;
+ control_cond_var.notify_all();
+ }
+ } break;
+ case RUNLEVEL_EXIT: {
+ exit = true;
+ } break;
+ }
+ return exit;
}
void WorkerThreadPool::yield() {
@@ -539,11 +572,17 @@ void WorkerThreadPool::yield() {
ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);
- // If this long-lived task started before the scripting server was initialized,
- // now is a good time to have scripting languages ready for the current thread.
- // Otherwise, such a piece of setup won't happen unless another task has been
- // run during the collaborative wait.
- ScriptServer::thread_enter();
+ task_mutex.lock();
+ if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {
+ // If this long-lived task started before the scripting server was initialized,
+ // now is a good time to have scripting languages ready for the current thread.
+ // Otherwise, such a piece of setup won't happen unless another task has been
+ // run during the collaborative wait.
+ task_mutex.unlock();
+ ScriptServer::thread_enter();
+ } else {
+ task_mutex.unlock();
+ }
}
void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
@@ -573,7 +612,8 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
p_tasks = MAX(1u, threads.size());
}
- task_mutex.lock();
+ MutexLock<BinaryMutex> lock(task_mutex);
+
Group *group = group_allocator.alloc();
GroupID id = last_task++;
group->max = p_elements;
@@ -608,7 +648,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
groups[id] = group;
- _post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
+ _post_tasks(tasks_posted, p_tasks, p_high_priority, lock);
return id;
}
@@ -731,6 +771,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
}
}
+void WorkerThreadPool::exit_languages_threads() {
+ if (threads.size() == 0) {
+ return;
+ }
+
+ MutexLock lock(task_mutex);
+
+ // Wait until all threads are idle.
+ _switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);
+ while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {
+ control_cond_var.wait(lock);
+ }
+
+ // Wait until all threads have detached from scripting languages.
+ _switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);
+ while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {
+ control_cond_var.wait(lock);
+ }
+}
+
void WorkerThreadPool::finish() {
if (threads.size() == 0) {
return;
diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h
index ba6efbb065..62296ac040 100644
--- a/core/object/worker_thread_pool.h
+++ b/core/object/worker_thread_pool.h
@@ -114,20 +114,35 @@ private:
Thread thread;
bool signaled : 1;
bool yield_is_over : 1;
+ bool pre_exited_languages : 1;
+ bool exited_languages : 1;
Task *current_task = nullptr;
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
ConditionVariable cond_var;
ThreadData() :
signaled(false),
- yield_is_over(false) {}
+ yield_is_over(false),
+ pre_exited_languages(false),
+ exited_languages(false) {}
};
TightLocalVector<ThreadData> threads;
enum Runlevel {
RUNLEVEL_NORMAL,
+ RUNLEVEL_PRE_EXIT_LANGUAGES, // Block adding new tasks
+ RUNLEVEL_EXIT_LANGUAGES, // All threads detach from scripting threads.
RUNLEVEL_EXIT,
} runlevel = RUNLEVEL_NORMAL;
+ union { // Cleared on every runlevel change.
+ struct {
+ uint32_t num_idle_threads;
+ } pre_exit_languages;
+ struct {
+ uint32_t num_exited_threads;
+ } exit_languages;
+ } runlevel_data;
+ ConditionVariable control_cond_var;
HashMap<Thread::ID, int> thread_ids;
HashMap<
@@ -155,7 +170,7 @@ private:
void _process_task(Task *task);
- void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority);
+ void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock);
void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);
bool _try_promote_low_priority_task();
@@ -197,7 +212,7 @@ private:
void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);
void _switch_runlevel(Runlevel p_runlevel);
- bool _handle_runlevel();
+ bool _handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock);
#ifdef THREADS_ENABLED
static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock);
@@ -262,6 +277,7 @@ public:
#endif
void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
+ void exit_languages_threads();
void finish();
WorkerThreadPool();
~WorkerThreadPool();
diff --git a/main/main.cpp b/main/main.cpp
index 18ffedef18..f1ee4bf2a6 100644
--- a/main/main.cpp
+++ b/main/main.cpp
@@ -4501,6 +4501,8 @@ void Main::cleanup(bool p_force) {
ResourceLoader::clear_translation_remaps();
ResourceLoader::clear_path_remaps();
+ WorkerThreadPool::get_singleton()->exit_languages_threads();
+
ScriptServer::finish_languages();
// Sync pending commands that may have been queued from a different thread during ScriptServer finalization