summaryrefslogtreecommitdiffstats
path: root/core/object/worker_thread_pool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'core/object/worker_thread_pool.cpp')
-rw-r--r--core/object/worker_thread_pool.cpp208
1 files changed, 158 insertions, 50 deletions
diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp
index e59ab3d6ae..afe6ecd1b3 100644
--- a/core/object/worker_thread_pool.cpp
+++ b/core/object/worker_thread_pool.cpp
@@ -51,6 +51,23 @@ void WorkerThreadPool::_process_task_queue() {
void WorkerThreadPool::_process_task(Task *p_task) {
bool low_priority = p_task->low_priority;
+ int pool_thread_index = -1;
+ Task *prev_low_prio_task = nullptr; // In case this is recursively called.
+
+ if (!use_native_low_priority_threads) {
+ pool_thread_index = thread_ids[Thread::get_caller_id()];
+ ThreadData &curr_thread = threads[pool_thread_index];
+ task_mutex.lock();
+ p_task->pool_thread_index = pool_thread_index;
+ if (low_priority) {
+ low_priority_tasks_running++;
+ prev_low_prio_task = curr_thread.current_low_prio_task;
+ curr_thread.current_low_prio_task = p_task;
+ } else {
+ curr_thread.current_low_prio_task = nullptr;
+ }
+ task_mutex.unlock();
+ }
if (p_task->group) {
// Handling a group
@@ -126,21 +143,36 @@ void WorkerThreadPool::_process_task(Task *p_task) {
p_task->callable.callp(nullptr, 0, ret, ce);
}
+ task_mutex.lock();
p_task->completed = true;
- p_task->done_semaphore.post();
+ for (uint8_t i = 0; i < p_task->waiting; i++) {
+ p_task->done_semaphore.post();
+ }
+ if (!use_native_low_priority_threads) {
+ p_task->pool_thread_index = -1;
+ }
+ task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed.
}
- if (!use_native_low_priority_threads && low_priority) {
- // A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
+ // Task may have been freed by now (all callers notified).
+ p_task = nullptr;
+
+ if (!use_native_low_priority_threads) {
bool post = false;
task_mutex.lock();
- if (low_priority_task_queue.first()) {
- Task *low_prio_task = low_priority_task_queue.first()->self();
- low_priority_task_queue.remove(low_priority_task_queue.first());
- task_queue.add_last(&low_prio_task->task_elem);
- post = true;
- } else {
+ ThreadData &curr_thread = threads[pool_thread_index];
+ curr_thread.current_low_prio_task = prev_low_prio_task;
+ if (low_priority) {
low_priority_threads_used--;
+ low_priority_tasks_running--;
+ // A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
+ if (_try_promote_low_priority_task()) {
+ post = true;
+ }
+
+ if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
+ _prevent_low_prio_saturation_deadlock();
+ }
}
task_mutex.unlock();
if (post) {
@@ -198,6 +230,35 @@ void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) {
}
}
+bool WorkerThreadPool::_try_promote_low_priority_task() {
+ if (low_priority_task_queue.first()) {
+ Task *low_prio_task = low_priority_task_queue.first()->self();
+ low_priority_task_queue.remove(low_priority_task_queue.first());
+ task_queue.add_last(&low_prio_task->task_elem);
+ low_priority_threads_used++;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() {
+ if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
+#ifdef DEV_ENABLED
+ print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task.");
+#endif
+ // In order not to create dependency cycles, we can only schedule the next one.
+ // We'll keep doing the same until the deadlock is broken,
+ SelfList<Task> *to_promote = low_priority_task_queue.first();
+ if (to_promote) {
+ low_priority_task_queue.remove(to_promote);
+ task_queue.add_last(to_promote);
+ low_priority_threads_used++;
+ task_available_semaphore.post();
+ }
+ }
+}
+
WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {
return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
}
@@ -238,66 +299,113 @@ bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {
return completed;
}
-void WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
+Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
task_mutex.lock();
Task **taskp = tasks.getptr(p_task_id);
if (!taskp) {
task_mutex.unlock();
- ERR_FAIL_MSG("Invalid Task ID"); // Invalid task
+ ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID"); // Invalid task
}
Task *task = *taskp;
- if (task->waiting) {
- String description = task->description;
- task_mutex.unlock();
- if (description.is_empty()) {
- ERR_FAIL_MSG("Another thread is waiting on this task: " + itos(p_task_id)); // Invalid task
- } else {
- ERR_FAIL_MSG("Another thread is waiting on this task: " + description + " (" + itos(p_task_id) + ")"); // Invalid task
+ if (!task->completed) {
+ if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet.
+ int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1;
+ if (caller_pool_th_index == task->pool_thread_index) {
+ // Deadlock prevention.
+ // Waiting for a task run on this same thread? That means the task to be awaited started waiting as well
+ // and another task was run to make use of the thread in the meantime, with enough bad luck as to
+ // the need to wait for the original task arose in turn.
+ // In other words, the task we want to wait for is buried in the stack.
+ // Let's report the caller about the issue to it handles as it sees fit.
+ task_mutex.unlock();
+ return ERR_BUSY;
+ }
}
- }
-
- task->waiting = true;
- task_mutex.unlock();
+ task->waiting++;
+
+ bool is_low_prio_waiting_for_another = false;
+ if (!use_native_low_priority_threads) {
+ // Deadlock prevention:
+ // If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots,
+ // we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued
+ // low-prio task to the schedule queue so it can run and break the "impasse".
+ // NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more
+ // than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph
+ // or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be
+ // an issue in the real world, a further fix can be applied against that.
+ if (task->low_priority) {
+ bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task;
+ if (awaiter_is_a_low_prio_task) {
+ is_low_prio_waiting_for_another = true;
+ low_priority_tasks_awaiting_others++;
+ if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
+ _prevent_low_prio_saturation_deadlock();
+ }
+ }
+ }
+ }
- if (use_native_low_priority_threads && task->low_priority) {
- task->low_priority_thread->wait_to_finish();
+ task_mutex.unlock();
- task_mutex.lock();
- native_thread_allocator.free(task->low_priority_thread);
- } else {
- int *index = thread_ids.getptr(Thread::get_caller_id());
-
- if (index) {
- // We are an actual process thread, we must not be blocked so continue processing stuff if available.
- bool must_exit = false;
- while (true) {
- if (task->done_semaphore.try_wait()) {
- // If done, exit
- break;
- }
- if (!must_exit && task_available_semaphore.try_wait()) {
- if (exit_threads) {
- must_exit = true;
- } else {
- // Solve tasks while they are around.
- _process_task_queue();
- continue;
+ if (use_native_low_priority_threads && task->low_priority) {
+ task->done_semaphore.wait();
+ } else {
+ bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id());
+ if (current_is_pool_thread) {
+ // We are an actual process thread, we must not be blocked so continue processing stuff if available.
+ bool must_exit = false;
+ while (true) {
+ if (task->done_semaphore.try_wait()) {
+ // If done, exit
+ break;
}
+ if (!must_exit) {
+ if (task_available_semaphore.try_wait()) {
+ if (exit_threads) {
+ must_exit = true;
+ } else {
+ // Solve tasks while they are around.
+ _process_task_queue();
+ continue;
+ }
+ } else if (!use_native_low_priority_threads && task->low_priority) {
+ // A low prioriry task started waiting, so see if we can move a pending one to the high priority queue.
+ task_mutex.lock();
+ bool post = _try_promote_low_priority_task();
+ task_mutex.unlock();
+ if (post) {
+ task_available_semaphore.post();
+ }
+ }
+ }
+ OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
}
- OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
+ } else {
+ task->done_semaphore.wait();
}
- } else {
- task->done_semaphore.wait();
}
task_mutex.lock();
+ if (is_low_prio_waiting_for_another) {
+ low_priority_tasks_awaiting_others--;
+ }
+
+ task->waiting--;
+ }
+
+ if (task->waiting == 0) {
+ if (use_native_low_priority_threads && task->low_priority) {
+ task->low_priority_thread->wait_to_finish();
+ native_thread_allocator.free(task->low_priority_thread);
+ }
+ tasks.erase(p_task_id);
+ task_allocator.free(task);
}
- tasks.erase(p_task_id);
- task_allocator.free(task);
task_mutex.unlock();
+ return OK;
}
WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
@@ -429,7 +537,7 @@ void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_pr
if (p_use_native_threads_low_priority) {
max_low_priority_threads = 0;
} else {
- max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count);
+ max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);
}
use_native_low_priority_threads = p_use_native_threads_low_priority;