diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/io/resource_loader.cpp | 12 | ||||
-rw-r--r-- | core/object/worker_thread_pool.cpp | 522 | ||||
-rw-r--r-- | core/object/worker_thread_pool.h | 64 | ||||
-rw-r--r-- | core/os/condition_variable.h | 2 | ||||
-rw-r--r-- | core/os/semaphore.h | 8 | ||||
-rw-r--r-- | core/register_core_types.cpp | 1 | ||||
-rw-r--r-- | core/templates/command_queue_mt.h | 50 | ||||
-rw-r--r-- | core/templates/paged_allocator.h | 6 |
8 files changed, 376 insertions, 289 deletions
diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 1fbb6ff2ed..ba11d84bce 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -630,15 +630,16 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (load_task.task_id != 0) { // Loading thread is in the worker pool. - load_task.awaited = true; thread_load_mutex.unlock(); Error err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); if (err == ERR_BUSY) { - // The WorkerThreadPool has scheduled tasks in a way that the current load depends on - // another one in a lower stack frame. Restart such load here. When the stack is eventually - // unrolled, the original load will have been notified to go on. + // The WorkerThreadPool has reported that the current task wants to await on an older one. + // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of + // resource loading that means that the task to wait for can be restarted here to break the + // cycle, with as much recursion into this process as needed. + // When the stack is eventually unrolled, the original load will have been notified to go on. #ifdef DEV_ENABLED - print_verbose("ResourceLoader: Load task happened to wait on another one deep in the call stack. Attempting to avoid deadlock by re-issuing the load now."); + print_verbose("ResourceLoader: Potential for deadlock detected in task dependency. Attempting to avoid it by re-issuing the load now."); #endif // CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's // an ongoing load for that resource and wait for it again. This value forces a new load. @@ -652,6 +653,7 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro } else { DEV_ASSERT(err == OK); thread_load_mutex.lock(); + load_task.awaited = true; } } else { // Loading thread is main or user thread. diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 631767219f..8e8a2ef06b 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -33,6 +33,7 @@ #include "core/object/script_language.h" #include "core/os/os.h" #include "core/os/thread_safe.h" +#include "core/templates/command_queue_mt.h" void WorkerThreadPool::Task::free_template_userdata() { ERR_FAIL_NULL(template_userdata); @@ -43,24 +44,17 @@ void WorkerThreadPool::Task::free_template_userdata() { WorkerThreadPool *WorkerThreadPool::singleton = nullptr; -void WorkerThreadPool::_process_task_queue() { - task_mutex.lock(); - Task *task = task_queue.first()->self(); - task_queue.remove(task_queue.first()); - task_mutex.unlock(); - _process_task(task); -} +thread_local CommandQueueMT *WorkerThreadPool::flushing_cmd_queue = nullptr; void WorkerThreadPool::_process_task(Task *p_task) { - bool low_priority = p_task->low_priority; - int pool_thread_index = -1; - Task *prev_low_prio_task = nullptr; // In case this is recursively called. + int pool_thread_index = thread_ids[Thread::get_caller_id()]; + ThreadData &curr_thread = threads[pool_thread_index]; + Task *prev_task = nullptr; // In case this is recursively called. + bool safe_for_nodes_backup = is_current_thread_safe_for_nodes(); - if (!use_native_low_priority_threads) { + { // Tasks must start with this unset. They are free to set-and-forget otherwise. set_current_thread_safe_for_nodes(false); - pool_thread_index = thread_ids[Thread::get_caller_id()]; - ThreadData &curr_thread = threads[pool_thread_index]; // Since the WorkerThreadPool is started before the script server, // its pre-created threads can't have ScriptServer::thread_enter() called on them early. // Therefore, we do it late at the first opportunity, so in case the task @@ -71,13 +65,8 @@ void WorkerThreadPool::_process_task(Task *p_task) { } task_mutex.lock(); p_task->pool_thread_index = pool_thread_index; - if (low_priority) { - low_priority_tasks_running++; - prev_low_prio_task = curr_thread.current_low_prio_task; - curr_thread.current_low_prio_task = p_task; - } else { - curr_thread.current_low_prio_task = nullptr; - } + prev_task = curr_thread.current_task; + curr_thread.current_task = p_task; task_mutex.unlock(); } @@ -111,33 +100,24 @@ void WorkerThreadPool::_process_task(Task *p_task) { memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it. } - if (low_priority && use_native_low_priority_threads) { - p_task->completed = true; - p_task->done_semaphore.post(); - if (do_post) { - p_task->group->completed.set_to(true); - } - } else { - if (do_post) { - p_task->group->done_semaphore.post(); - p_task->group->completed.set_to(true); - } - uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. - uint32_t finished_users = p_task->group->finished.increment(); - - if (finished_users == max_users) { - // Get rid of the group, because nobody else is using it. - task_mutex.lock(); - group_allocator.free(p_task->group); - task_mutex.unlock(); - } - - // For groups, tasks get rid of themselves. + if (do_post) { + p_task->group->done_semaphore.post(); + p_task->group->completed.set_to(true); + } + uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. + uint32_t finished_users = p_task->group->finished.increment(); + if (finished_users == max_users) { + // Get rid of the group, because nobody else is using it. task_mutex.lock(); - task_allocator.free(p_task); + group_allocator.free(p_task->group); task_mutex.unlock(); } + + // For groups, tasks get rid of themselves. + + task_mutex.lock(); + task_allocator.free(p_task); } else { if (p_task->native_func) { p_task->native_func(p_task->native_func_userdata); @@ -150,88 +130,162 @@ void WorkerThreadPool::_process_task(Task *p_task) { task_mutex.lock(); p_task->completed = true; - for (uint8_t i = 0; i < p_task->waiting; i++) { - p_task->done_semaphore.post(); + p_task->pool_thread_index = -1; + if (p_task->waiting_user) { + p_task->done_semaphore.post(p_task->waiting_user); } - if (!use_native_low_priority_threads) { - p_task->pool_thread_index = -1; + // Let awaiters know. + for (uint32_t i = 0; i < threads.size(); i++) { + if (threads[i].awaited_task == p_task) { + threads[i].cond_var.notify_one(); + threads[i].signaled = true; + } } - task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed. } - // Task may have been freed by now (all callers notified). - p_task = nullptr; - - if (!use_native_low_priority_threads) { - bool post = false; - task_mutex.lock(); - ThreadData &curr_thread = threads[pool_thread_index]; - curr_thread.current_low_prio_task = prev_low_prio_task; - if (low_priority) { + { + curr_thread.current_task = prev_task; + if (p_task->low_priority) { low_priority_threads_used--; - low_priority_tasks_running--; - // A low prioriry task was freed, so see if we can move a pending one to the high priority queue. - if (_try_promote_low_priority_task()) { - post = true; - } - if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { - _prevent_low_prio_saturation_deadlock(); + if (_try_promote_low_priority_task()) { + if (prev_task) { // Otherwise, this thread will catch it. + _notify_threads(&curr_thread, 1, 0); + } } } + task_mutex.unlock(); - if (post) { - task_available_semaphore.post(); - } } + + set_current_thread_safe_for_nodes(safe_for_nodes_backup); } void WorkerThreadPool::_thread_function(void *p_user) { + ThreadData *thread_data = (ThreadData *)p_user; while (true) { - singleton->task_available_semaphore.wait(); - if (singleton->exit_threads) { - break; + Task *task_to_process = nullptr; + { + MutexLock lock(singleton->task_mutex); + if (singleton->exit_threads) { + return; + } + thread_data->signaled = false; + + if (singleton->task_queue.first()) { + task_to_process = singleton->task_queue.first()->self(); + singleton->task_queue.remove(singleton->task_queue.first()); + } else { + thread_data->cond_var.wait(lock); + DEV_ASSERT(singleton->exit_threads || thread_data->signaled); + } } - singleton->_process_task_queue(); - } -} -void WorkerThreadPool::_native_low_priority_thread_function(void *p_user) { - Task *task = (Task *)p_user; - singleton->_process_task(task); + if (task_to_process) { + singleton->_process_task(task_to_process); + } + } } -void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) { +void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) { // Fall back to processing on the calling thread if there are no worker threads. // Separated into its own variable to make it easier to extend this logic // in custom builds. bool process_on_calling_thread = threads.size() == 0; if (process_on_calling_thread) { - _process_task(p_task); + task_mutex.unlock(); + for (uint32_t i = 0; i < p_count; i++) { + _process_task(p_tasks[i]); + } return; } - task_mutex.lock(); - p_task->low_priority = !p_high_priority; - if (!p_high_priority && use_native_low_priority_threads) { - p_task->low_priority_thread = native_thread_allocator.alloc(); - task_mutex.unlock(); + uint32_t to_process = 0; + uint32_t to_promote = 0; - if (p_task->group) { - p_task->group->low_priority_native_tasks.push_back(p_task); + ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr; + + for (uint32_t i = 0; i < p_count; i++) { + p_tasks[i]->low_priority = !p_high_priority; + if (p_high_priority || low_priority_threads_used < max_low_priority_threads) { + task_queue.add_last(&p_tasks[i]->task_elem); + if (!p_high_priority) { + low_priority_threads_used++; + } + to_process++; + } else { + // Too many threads using low priority, must go to queue. + low_priority_task_queue.add_last(&p_tasks[i]->task_elem); + to_promote++; } - p_task->low_priority_thread->start(_native_low_priority_thread_function, p_task); // Pask task directly to thread. - } else if (p_high_priority || low_priority_threads_used < max_low_priority_threads) { - task_queue.add_last(&p_task->task_elem); - if (!p_high_priority) { - low_priority_threads_used++; + } + + _notify_threads(caller_pool_thread, to_process, to_promote); + + task_mutex.unlock(); +} + +void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) { + uint32_t to_process = p_process_count; + uint32_t to_promote = p_promote_count; + + // This is where which threads are awaken is decided according to the workload. + // Threads that will anyway have a chance to check the situation and process/promote tasks + // are excluded from being notified. Others will be tried anyway to try to distribute load. + // The current thread, if is a pool thread, is also excluded depending on the promoting/processing + // needs because it will anyway loop again. However, it will contribute to decreasing the count, + // which helps reducing sync traffic. + + uint32_t thread_count = threads.size(); + + // First round: + // 1. For processing: notify threads that are not running tasks, to keep the stacks as shallow as possible. + // 2. For promoting: since it's exclusive with processing, we fin threads able to promote low-prio tasks now. + for (uint32_t i = 0; + i < thread_count && (to_process || to_promote); + i++, notify_index = (notify_index + 1) % thread_count) { + ThreadData &th = threads[notify_index]; + + if (th.signaled) { + continue; + } + if (th.current_task) { + // Good thread for promoting low-prio? + if (to_promote && th.awaited_task && th.current_task->low_priority) { + if (likely(&th != p_current_thread_data)) { + th.cond_var.notify_one(); + } + th.signaled = true; + to_promote--; + } + } else { + if (to_process) { + if (likely(&th != p_current_thread_data)) { + th.cond_var.notify_one(); + } + th.signaled = true; + to_process--; + } + } + } + + // Second round: + // For processing: if the first round wasn't enough, let's try now with threads processing tasks but currently awaiting. + for (uint32_t i = 0; + i < thread_count && to_process; + i++, notify_index = (notify_index + 1) % thread_count) { + ThreadData &th = threads[notify_index]; + + if (th.signaled) { + continue; + } + if (th.awaited_task) { + if (likely(&th != p_current_thread_data)) { + th.cond_var.notify_one(); + } + th.signaled = true; + to_process--; } - task_mutex.unlock(); - task_available_semaphore.post(); - } else { - // Too many threads using low priority, must go to queue. - low_priority_task_queue.add_last(&p_task->task_elem); - task_mutex.unlock(); } } @@ -247,23 +301,6 @@ bool WorkerThreadPool::_try_promote_low_priority_task() { } } -void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() { - if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { -#ifdef DEV_ENABLED - print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task."); -#endif - // In order not to create dependency cycles, we can only schedule the next one. - // We'll keep doing the same until the deadlock is broken, - SelfList<Task> *to_promote = low_priority_task_queue.first(); - if (to_promote) { - low_priority_task_queue.remove(to_promote); - task_queue.add_last(to_promote); - low_priority_threads_used++; - task_available_semaphore.post(); - } - } -} - WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) { return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description); } @@ -273,15 +310,15 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, // Get a free task Task *task = task_allocator.alloc(); TaskID id = last_task++; + task->self = id; task->callable = p_callable; task->native_func = p_func; task->native_func_userdata = p_userdata; task->description = p_description; task->template_userdata = p_template_userdata; tasks.insert(id, task); - task_mutex.unlock(); - _post_task(task, p_high_priority); + _post_tasks_and_unlock(&task, 1, p_high_priority); return id; } @@ -313,105 +350,117 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { } Task *task = *taskp; - if (!task->completed) { - if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet. - int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1; - if (caller_pool_th_index == task->pool_thread_index) { - // Deadlock prevention. - // Waiting for a task run on this same thread? That means the task to be awaited started waiting as well - // and another task was run to make use of the thread in the meantime, with enough bad luck as to - // the need to wait for the original task arose in turn. - // In other words, the task we want to wait for is buried in the stack. - // Let's report the caller about the issue to it handles as it sees fit. - task_mutex.unlock(); - return ERR_BUSY; - } + if (task->completed) { + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); } + task_mutex.unlock(); + return OK; + } + + ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr; + if (caller_pool_thread && p_task_id <= caller_pool_thread->current_task->self) { + // Deadlock prevention: + // When a pool thread wants to wait for an older task, the following situations can happen: + // 1. Awaited task is deep in the stack of the awaiter. + // 2. A group of awaiter threads end up depending on some tasks buried in the stack + // of their worker threads in such a way that progress can't be made. + // Both would entail a deadlock. Some may be handled here in the WorkerThreadPool + // with some extra logic and bookkeeping. However, there would still be unavoidable + // cases of deadlock because of the way waiting threads process outstanding tasks. + // Taking into account there's no feasible solution for every possible case + // with the current design, we just simply reject attempts to await on older tasks, + // with a specific error code that signals the situation so the caller can handle it. + task_mutex.unlock(); + return ERR_BUSY; + } - task->waiting++; - - bool is_low_prio_waiting_for_another = false; - if (!use_native_low_priority_threads) { - // Deadlock prevention: - // If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots, - // we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued - // low-prio task to the schedule queue so it can run and break the "impasse". - // NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more - // than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph - // or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be - // an issue in the real world, a further fix can be applied against that. - if (task->low_priority) { - bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task; - if (awaiter_is_a_low_prio_task) { - is_low_prio_waiting_for_another = true; - low_priority_tasks_awaiting_others++; - if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { - _prevent_low_prio_saturation_deadlock(); + if (caller_pool_thread) { + task->waiting_pool++; + } else { + task->waiting_user++; + } + + task_mutex.unlock(); + + if (caller_pool_thread) { + while (true) { + Task *task_to_process = nullptr; + { + MutexLock lock(task_mutex); + bool was_signaled = caller_pool_thread->signaled; + caller_pool_thread->signaled = false; + + if (task->completed) { + // This thread was awaken also for some reason, but it's about to exit. + // Let's find out what may be pending and forward the requests. + if (!exit_threads && was_signaled) { + uint32_t to_process = task_queue.first() ? 1 : 0; + uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0; + if (to_process || to_promote) { + // This thread must be left alone since it won't loop again. + caller_pool_thread->signaled = true; + _notify_threads(caller_pool_thread, to_process, to_promote); + } + } + + task->waiting_pool--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); } + + break; } - } - } - task_mutex.unlock(); + if (!exit_threads) { + // This is a thread from the pool. It shouldn't just idle. + // Let's try to process other tasks while we wait. - if (use_native_low_priority_threads && task->low_priority) { - task->done_semaphore.wait(); - } else { - bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id()); - if (current_is_pool_thread) { - // We are an actual process thread, we must not be blocked so continue processing stuff if available. - bool must_exit = false; - while (true) { - if (task->done_semaphore.try_wait()) { - // If done, exit - break; + if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { + if (_try_promote_low_priority_task()) { + _notify_threads(caller_pool_thread, 1, 0); + } + } + + if (singleton->task_queue.first()) { + task_to_process = task_queue.first()->self(); + task_queue.remove(task_queue.first()); } - if (!must_exit) { - if (task_available_semaphore.try_wait()) { - if (exit_threads) { - must_exit = true; - } else { - // Solve tasks while they are around. - bool safe_for_nodes_backup = is_current_thread_safe_for_nodes(); - _process_task_queue(); - set_current_thread_safe_for_nodes(safe_for_nodes_backup); - continue; - } - } else if (!use_native_low_priority_threads && task->low_priority) { - // A low prioriry task started waiting, so see if we can move a pending one to the high priority queue. - task_mutex.lock(); - bool post = _try_promote_low_priority_task(); - task_mutex.unlock(); - if (post) { - task_available_semaphore.post(); - } + + if (!task_to_process) { + caller_pool_thread->awaited_task = task; + + if (flushing_cmd_queue) { + flushing_cmd_queue->unlock(); } + caller_pool_thread->cond_var.wait(lock); + if (flushing_cmd_queue) { + flushing_cmd_queue->lock(); + } + + DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed); + caller_pool_thread->awaited_task = nullptr; } - OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance. } - } else { - task->done_semaphore.wait(); } - } - task_mutex.lock(); - if (is_low_prio_waiting_for_another) { - low_priority_tasks_awaiting_others--; + if (task_to_process) { + _process_task(task_to_process); + } } - - task->waiting--; - } - - if (task->waiting == 0) { - if (use_native_low_priority_threads && task->low_priority) { - task->low_priority_thread->wait_to_finish(); - native_thread_allocator.free(task->low_priority_thread); + } else { + task->done_semaphore.wait(); + task_mutex.lock(); + task->waiting_user--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); } - tasks.erase(p_task_id); - task_allocator.free(task); + task_mutex.unlock(); } - task_mutex.unlock(); return OK; } @@ -455,11 +504,8 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca } groups[id] = group; - task_mutex.unlock(); - for (int i = 0; i < p_tasks; i++) { - _post_task(tasks_posted[i], p_high_priority); - } + _post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority); return id; } @@ -502,22 +548,17 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) { if (!groupp) { ERR_FAIL_MSG("Invalid Group ID"); } - Group *group = *groupp; - if (group->low_priority_native_tasks.size() > 0) { - for (Task *task : group->low_priority_native_tasks) { - task->low_priority_thread->wait_to_finish(); - task_mutex.lock(); - native_thread_allocator.free(task->low_priority_thread); - task_allocator.free(task); - task_mutex.unlock(); - } + { + Group *group = *groupp; - task_mutex.lock(); - group_allocator.free(group); - task_mutex.unlock(); - } else { + if (flushing_cmd_queue) { + flushing_cmd_queue->unlock(); + } group->done_semaphore.wait(); + if (flushing_cmd_queue) { + flushing_cmd_queue->lock(); + } uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later. @@ -540,19 +581,23 @@ int WorkerThreadPool::get_thread_index() { return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1; } -void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_priority, float p_low_priority_task_ratio) { +void WorkerThreadPool::thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue) { + ERR_FAIL_COND(flushing_cmd_queue != nullptr); + flushing_cmd_queue = p_queue; +} + +void WorkerThreadPool::thread_exit_command_queue_mt_flush() { + ERR_FAIL_NULL(flushing_cmd_queue); + flushing_cmd_queue = nullptr; +} + +void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) { ERR_FAIL_COND(threads.size() > 0); if (p_thread_count < 0) { p_thread_count = OS::get_singleton()->get_default_thread_pool_size(); } - if (p_use_native_threads_low_priority) { - max_low_priority_threads = 0; - } else { - max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1); - } - - use_native_low_priority_threads = p_use_native_threads_low_priority; + max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1); threads.resize(p_thread_count); @@ -568,24 +613,33 @@ void WorkerThreadPool::finish() { return; } - task_mutex.lock(); - SelfList<Task> *E = low_priority_task_queue.first(); - while (E) { - print_error("Task waiting was never re-claimed: " + E->self()->description); - E = E->next(); + { + MutexLock lock(task_mutex); + SelfList<Task> *E = low_priority_task_queue.first(); + while (E) { + print_error("Task waiting was never re-claimed: " + E->self()->description); + E = E->next(); + } } - task_mutex.unlock(); - exit_threads = true; - - for (uint32_t i = 0; i < threads.size(); i++) { - task_available_semaphore.post(); + { + MutexLock lock(task_mutex); + exit_threads = true; + } + for (ThreadData &data : threads) { + data.cond_var.notify_one(); } - for (ThreadData &data : threads) { data.thread.wait_to_finish(); } + { + MutexLock lock(task_mutex); + for (KeyValue<TaskID, Task *> &E : tasks) { + task_allocator.free(E.value); + } + } + threads.clear(); } diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index dd56f95cae..c9921c808d 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -31,6 +31,7 @@ #ifndef WORKER_THREAD_POOL_H #define WORKER_THREAD_POOL_H +#include "core/os/condition_variable.h" #include "core/os/memory.h" #include "core/os/os.h" #include "core/os/semaphore.h" @@ -40,6 +41,8 @@ #include "core/templates/rid.h" #include "core/templates/safe_refcount.h" +class CommandQueueMT; + class WorkerThreadPool : public Object { GDCLASS(WorkerThreadPool, Object) public: @@ -60,7 +63,7 @@ private: }; struct Group { - GroupID self; + GroupID self = -1; SafeNumeric<uint32_t> index; SafeNumeric<uint32_t> completed_index; uint32_t max = 0; @@ -68,23 +71,23 @@ private: SafeFlag completed; SafeNumeric<uint32_t> finished; uint32_t tasks_used = 0; - TightLocalVector<Task *> low_priority_native_tasks; }; struct Task { + TaskID self = -1; Callable callable; void (*native_func)(void *) = nullptr; void (*native_group_func)(void *, uint32_t) = nullptr; void *native_func_userdata = nullptr; String description; - Semaphore done_semaphore; + Semaphore done_semaphore; // For user threads awaiting. bool completed = false; Group *group = nullptr; SelfList<Task> task_elem; - uint32_t waiting = 0; + uint32_t waiting_pool = 0; + uint32_t waiting_user = 0; bool low_priority = false; BaseTemplateUserdata *template_userdata = nullptr; - Thread *low_priority_thread = nullptr; int pool_thread_index = -1; void free_template_userdata(); @@ -92,51 +95,65 @@ private: task_elem(this) {} }; - PagedAllocator<Task> task_allocator; - PagedAllocator<Group> group_allocator; - PagedAllocator<Thread> native_thread_allocator; + static const uint32_t TASKS_PAGE_SIZE = 1024; + static const uint32_t GROUPS_PAGE_SIZE = 256; + + PagedAllocator<Task, false, TASKS_PAGE_SIZE> task_allocator; + PagedAllocator<Group, false, GROUPS_PAGE_SIZE> group_allocator; SelfList<Task>::List low_priority_task_queue; SelfList<Task>::List task_queue; - Mutex task_mutex; - Semaphore task_available_semaphore; + BinaryMutex task_mutex; struct ThreadData { - uint32_t index; + uint32_t index = 0; Thread thread; - Task *current_low_prio_task = nullptr; bool ready_for_scripting = false; + bool signaled = false; + Task *current_task = nullptr; + Task *awaited_task = nullptr; // Null if not awaiting the condition variable. Special value for idle-waiting. + ConditionVariable cond_var; }; TightLocalVector<ThreadData> threads; bool exit_threads = false; HashMap<Thread::ID, int> thread_ids; - HashMap<TaskID, Task *> tasks; - HashMap<GroupID, Group *> groups; + HashMap< + TaskID, + Task *, + HashMapHasherDefault, + HashMapComparatorDefault<TaskID>, + PagedAllocator<HashMapElement<TaskID, Task *>, false, TASKS_PAGE_SIZE>> + tasks; + HashMap< + GroupID, + Group *, + HashMapHasherDefault, + HashMapComparatorDefault<GroupID>, + PagedAllocator<HashMapElement<GroupID, Group *>, false, GROUPS_PAGE_SIZE>> + groups; - bool use_native_low_priority_threads = false; uint32_t max_low_priority_threads = 0; uint32_t low_priority_threads_used = 0; - uint32_t low_priority_tasks_running = 0; - uint32_t low_priority_tasks_awaiting_others = 0; + uint32_t notify_index = 0; // For rotating across threads, no help distributing load. uint64_t last_task = 1; static void _thread_function(void *p_user); - static void _native_low_priority_thread_function(void *p_user); - void _process_task_queue(); void _process_task(Task *task); - void _post_task(Task *p_task, bool p_high_priority); + void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority); + void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count); bool _try_promote_low_priority_task(); - void _prevent_low_prio_saturation_deadlock(); static WorkerThreadPool *singleton; + static thread_local CommandQueueMT *flushing_cmd_queue; + TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description); GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description); @@ -199,7 +216,10 @@ public: static WorkerThreadPool *get_singleton() { return singleton; } static int get_thread_index(); - void init(int p_thread_count = -1, bool p_use_native_threads_low_priority = true, float p_low_priority_task_ratio = 0.3); + static void thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue); + static void thread_exit_command_queue_mt_flush(); + + void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3); void finish(); WorkerThreadPool(); ~WorkerThreadPool(); diff --git a/core/os/condition_variable.h b/core/os/condition_variable.h index 6a6996019d..6a49ced31b 100644 --- a/core/os/condition_variable.h +++ b/core/os/condition_variable.h @@ -31,6 +31,8 @@ #ifndef CONDITION_VARIABLE_H #define CONDITION_VARIABLE_H +#include "core/os/mutex.h" + #ifdef MINGW_ENABLED #define MINGW_STDTHREAD_REDUNDANCY_WARNING #include "thirdparty/mingw-std-threads/mingw.condition_variable.h" diff --git a/core/os/semaphore.h b/core/os/semaphore.h index 8bb1529bbd..b8ae35b86b 100644 --- a/core/os/semaphore.h +++ b/core/os/semaphore.h @@ -58,10 +58,12 @@ private: #endif public: - _ALWAYS_INLINE_ void post() const { + _ALWAYS_INLINE_ void post(uint32_t p_count = 1) const { std::lock_guard lock(mutex); - count++; - condition.notify_one(); + count += p_count; + for (uint32_t i = 0; i < p_count; ++i) { + condition.notify_one(); + } } _ALWAYS_INLINE_ void wait() const { diff --git a/core/register_core_types.cpp b/core/register_core_types.cpp index 2785d1daa5..82b3d27942 100644 --- a/core/register_core_types.cpp +++ b/core/register_core_types.cpp @@ -307,7 +307,6 @@ void register_core_settings() { GLOBAL_DEF(PropertyInfo(Variant::STRING, "network/tls/certificate_bundle_override", PROPERTY_HINT_FILE, "*.crt"), ""); GLOBAL_DEF("threading/worker_pool/max_threads", -1); - GLOBAL_DEF("threading/worker_pool/use_system_threads_for_low_priority_tasks", true); GLOBAL_DEF("threading/worker_pool/low_priority_thread_ratio", 0.3); } diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index 7e480653ac..b1010f7f43 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -31,6 +31,7 @@ #ifndef COMMAND_QUEUE_MT_H #define COMMAND_QUEUE_MT_H +#include "core/object/worker_thread_pool.h" #include "core/os/memory.h" #include "core/os/mutex.h" #include "core/os/semaphore.h" @@ -306,15 +307,15 @@ class CommandQueueMT { struct CommandBase { virtual void call() = 0; - virtual void post() {} - virtual ~CommandBase() {} + virtual SyncSemaphore *get_sync_semaphore() { return nullptr; } + virtual ~CommandBase() = default; // Won't be called. }; struct SyncCommand : public CommandBase { SyncSemaphore *sync_sem = nullptr; - virtual void post() override { - sync_sem->sem.post(); + virtual SyncSemaphore *get_sync_semaphore() override { + return sync_sem; } }; @@ -340,6 +341,7 @@ class CommandQueueMT { SyncSemaphore sync_sems[SYNC_SEMAPHORES]; Mutex mutex; Semaphore *sync = nullptr; + uint64_t flush_read_ptr = 0; template <class T> T *allocate() { @@ -362,31 +364,41 @@ class CommandQueueMT { void _flush() { lock(); - uint64_t read_ptr = 0; - uint64_t limit = command_mem.size(); - - while (read_ptr < limit) { - uint64_t size = *(uint64_t *)&command_mem[read_ptr]; - read_ptr += 8; - CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[read_ptr]); - - cmd->call(); //execute the function - cmd->post(); //release in case it needs sync/ret - cmd->~CommandBase(); //should be done, so erase the command - - read_ptr += size; + WorkerThreadPool::thread_enter_command_queue_mt_flush(this); + while (flush_read_ptr < command_mem.size()) { + uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; + flush_read_ptr += 8; + CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]); + + SyncSemaphore *sync_sem = cmd->get_sync_semaphore(); + cmd->call(); + if (sync_sem) { + sync_sem->sem.post(); // Release in case it needs sync/ret. + } + + if (unlikely(flush_read_ptr == 0)) { + // A reentrant call flushed. + DEV_ASSERT(command_mem.is_empty()); + unlock(); + return; + } + + flush_read_ptr += size; } + WorkerThreadPool::thread_exit_command_queue_mt_flush(); command_mem.clear(); + flush_read_ptr = 0; unlock(); } - void lock(); - void unlock(); void wait_for_flush(); SyncSemaphore *_alloc_sync_sem(); public: + void lock(); + void unlock(); + /* NORMAL PUSH COMMANDS */ DECL_PUSH(0) SPACE_SEP_LIST(DECL_PUSH, 15) diff --git a/core/templates/paged_allocator.h b/core/templates/paged_allocator.h index 6f3f78d4d2..d880eae0c3 100644 --- a/core/templates/paged_allocator.h +++ b/core/templates/paged_allocator.h @@ -40,7 +40,7 @@ #include <type_traits> #include <typeinfo> -template <class T, bool thread_safe = false> +template <class T, bool thread_safe = false, uint32_t DEFAULT_PAGE_SIZE = 4096> class PagedAllocator { T **page_pool = nullptr; T ***available_pool = nullptr; @@ -53,10 +53,6 @@ class PagedAllocator { SpinLock spin_lock; public: - enum { - DEFAULT_PAGE_SIZE = 4096 - }; - template <class... Args> T *alloc(Args &&...p_args) { if (thread_safe) { |