summaryrefslogtreecommitdiffstats
path: root/core/object/worker_thread_pool.cpp
diff options
context:
space:
mode:
authorRémi Verschelde <rverschelde@gmail.com>2023-05-12 10:03:45 +0200
committerRémi Verschelde <rverschelde@gmail.com>2023-05-12 10:03:45 +0200
commit186aa649d2bc7ec58e7ba24620a3b2136c1fc203 (patch)
tree51eb6882349ef43d826b8844ede011884f3c60da /core/object/worker_thread_pool.cpp
parent863bcd3e2bb5bdd6857258a42818dfd9232570aa (diff)
parent9077bb9232bd7f4301f5dc511467e07fc42b388b (diff)
downloadredot-engine-186aa649d2bc7ec58e7ba24620a3b2136c1fc203.tar.gz
Merge pull request #76945 from RandomShaper/fix_wtp
Fix multiple issues in `WorkerThreadPool`
Diffstat (limited to 'core/object/worker_thread_pool.cpp')
-rw-r--r--core/object/worker_thread_pool.cpp22
1 files changed, 12 insertions, 10 deletions
diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp
index 721c8d0a10..3dca6b73a6 100644
--- a/core/object/worker_thread_pool.cpp
+++ b/core/object/worker_thread_pool.cpp
@@ -140,9 +140,9 @@ void WorkerThreadPool::_process_task(Task *p_task) {
task_queue.add_last(&low_prio_task->task_elem);
post = true;
} else {
- low_priority_threads_used.decrement();
+ low_priority_threads_used--;
}
- task_mutex.lock();
+ task_mutex.unlock();
if (post) {
task_available_semaphore.post();
}
@@ -152,7 +152,7 @@ void WorkerThreadPool::_process_task(Task *p_task) {
void WorkerThreadPool::_thread_function(void *p_user) {
while (true) {
singleton->task_available_semaphore.wait();
- if (singleton->exit_threads.is_set()) {
+ if (singleton->exit_threads) {
break;
}
singleton->_process_task_queue();
@@ -168,14 +168,13 @@ void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) {
task_mutex.lock();
p_task->low_priority = !p_high_priority;
if (!p_high_priority && use_native_low_priority_threads) {
- task_mutex.unlock();
p_task->low_priority_thread = native_thread_allocator.alloc();
+ task_mutex.unlock();
p_task->low_priority_thread->start(_native_low_priority_thread_function, p_task); // Pask task directly to thread.
-
- } else if (p_high_priority || low_priority_threads_used.get() < max_low_priority_threads) {
+ } else if (p_high_priority || low_priority_threads_used < max_low_priority_threads) {
task_queue.add_last(&p_task->task_elem);
if (!p_high_priority) {
- low_priority_threads_used.increment();
+ low_priority_threads_used++;
}
task_mutex.unlock();
task_available_semaphore.post();
@@ -251,6 +250,8 @@ void WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
if (use_native_low_priority_threads && task->low_priority) {
task->low_priority_thread->wait_to_finish();
+
+ task_mutex.lock();
native_thread_allocator.free(task->low_priority_thread);
} else {
int *index = thread_ids.getptr(Thread::get_caller_id());
@@ -272,9 +273,10 @@ void WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
} else {
task->done_semaphore.wait();
}
+
+ task_mutex.lock();
}
- task_mutex.lock();
tasks.erase(p_task_id);
task_allocator.free(task);
task_mutex.unlock();
@@ -379,8 +381,8 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {
if (group->low_priority_native_tasks.size() > 0) {
for (Task *task : group->low_priority_native_tasks) {
task->low_priority_thread->wait_to_finish();
- native_thread_allocator.free(task->low_priority_thread);
task_mutex.lock();
+ native_thread_allocator.free(task->low_priority_thread);
task_allocator.free(task);
task_mutex.unlock();
}
@@ -443,7 +445,7 @@ void WorkerThreadPool::finish() {
}
task_mutex.unlock();
- exit_threads.set_to(true);
+ exit_threads = true;
for (uint32_t i = 0; i < threads.size(); i++) {
task_available_semaphore.post();