diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/io/resource.cpp | 17 | ||||
-rw-r--r-- | core/io/resource_loader.cpp | 83 | ||||
-rw-r--r-- | core/object/message_queue.cpp | 6 | ||||
-rw-r--r-- | core/object/worker_thread_pool.cpp | 3 |
4 files changed, 55 insertions, 54 deletions
diff --git a/core/io/resource.cpp b/core/io/resource.cpp index 1ecfd8366d..c045c0fc74 100644 --- a/core/io/resource.cpp +++ b/core/io/resource.cpp @@ -40,12 +40,7 @@ #include <stdio.h> void Resource::emit_changed() { - if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { - // Let the connection happen on the main thread, later, since signals are not thread-safe. - call_deferred("emit_signal", CoreStringName(changed)); - } else { - emit_signal(CoreStringName(changed)); - } + emit_signal(CoreStringName(changed)); } void Resource::_resource_path_changed() { @@ -166,22 +161,12 @@ bool Resource::editor_can_reload_from_file() { } void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) { - if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { - // Let the check and connection happen on the main thread, later, since signals are not thread-safe. - callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags); - return; - } if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) { connect(CoreStringName(changed), p_callable, p_flags); } } void Resource::disconnect_changed(const Callable &p_callable) { - if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { - // Let the check and disconnection happen on the main thread, later, since signals are not thread-safe. - callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable); - return; - } if (is_connected(CoreStringName(changed), p_callable)) { disconnect(CoreStringName(changed), p_callable); } diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index c3c37aa89d..0dcde5509a 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -302,7 +302,8 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { thread_load_mutex.unlock(); // Thread-safe either if it's the current thread or a brand new one. - CallQueue *mq_override = nullptr; + bool mq_override_present = false; + CallQueue *own_mq_override = nullptr; if (load_nesting == 0) { load_paths_stack = memnew(Vector<String>); @@ -310,8 +311,12 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { load_paths_stack->push_back(load_task.dependent_path); } if (!Thread::is_main_thread()) { - mq_override = memnew(CallQueue); - MessageQueue::set_thread_singleton_override(mq_override); + // Let the caller thread use its own, for added flexibility. Provide one otherwise. + if (MessageQueue::get_singleton() == MessageQueue::get_main_singleton()) { + own_mq_override = memnew(CallQueue); + MessageQueue::set_thread_singleton_override(own_mq_override); + } + mq_override_present = true; set_current_thread_safe_for_nodes(true); } } else { @@ -324,8 +329,8 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { } Ref<Resource> res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress); - if (mq_override) { - mq_override->flush(); + if (mq_override_present) { + MessageQueue::get_singleton()->flush(); } thread_load_mutex.lock(); @@ -394,8 +399,9 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { thread_load_mutex.unlock(); if (load_nesting == 0) { - if (mq_override) { - memdelete(mq_override); + if (own_mq_override) { + MessageQueue::set_thread_singleton_override(nullptr); + memdelete(own_mq_override); } memdelete(load_paths_stack); } @@ -656,39 +662,50 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro return Ref<Resource>(); } - if (load_task.task_id != 0) { + bool loader_is_wtp = load_task.task_id != 0; + Error wtp_task_err = FAILED; + if (loader_is_wtp) { // Loading thread is in the worker pool. thread_load_mutex.unlock(); - Error err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); - if (err == ERR_BUSY) { - // The WorkerThreadPool has reported that the current task wants to await on an older one. - // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of - // resource loading that means that the task to wait for can be restarted here to break the - // cycle, with as much recursion into this process as needed. - // When the stack is eventually unrolled, the original load will have been notified to go on. - // CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's - // an ongoing load for that resource and wait for it again. This value forces a new load. - Ref<ResourceLoader::LoadToken> token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE); - Ref<Resource> resource = _load_complete(*token.ptr(), &err); - if (r_error) { - *r_error = err; + wtp_task_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + } + + if (load_task.status == THREAD_LOAD_IN_PROGRESS) { // If early errored, awaiting would deadlock. + if (loader_is_wtp) { + if (wtp_task_err == ERR_BUSY) { + // The WorkerThreadPool has reported that the current task wants to await on an older one. + // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of + // resource loading that means that the task to wait for can be restarted here to break the + // cycle, with as much recursion into this process as needed. + // When the stack is eventually unrolled, the original load will have been notified to go on. + // CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's + // an ongoing load for that resource and wait for it again. This value forces a new load. + Ref<ResourceLoader::LoadToken> token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE); + Ref<Resource> resource = _load_complete(*token.ptr(), &wtp_task_err); + if (r_error) { + *r_error = wtp_task_err; + } + thread_load_mutex.lock(); + return resource; + } else { + DEV_ASSERT(wtp_task_err == OK); + thread_load_mutex.lock(); + load_task.awaited = true; } - thread_load_mutex.lock(); - return resource; } else { - DEV_ASSERT(err == OK); - thread_load_mutex.lock(); - load_task.awaited = true; + // Loading thread is main or user thread. + if (!load_task.cond_var) { + load_task.cond_var = memnew(ConditionVariable); + } + do { + load_task.cond_var->wait(p_thread_load_lock); + DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); + } while (load_task.cond_var); } } else { - // Loading thread is main or user thread. - if (!load_task.cond_var) { - load_task.cond_var = memnew(ConditionVariable); + if (loader_is_wtp) { + thread_load_mutex.lock(); } - do { - load_task.cond_var->wait(p_thread_load_lock); - DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); - } while (load_task.cond_var); } } diff --git a/core/object/message_queue.cpp b/core/object/message_queue.cpp index 762bab75e7..4b0b1ce63d 100644 --- a/core/object/message_queue.cpp +++ b/core/object/message_queue.cpp @@ -481,10 +481,7 @@ CallQueue::~CallQueue() { if (!allocator_is_custom) { memdelete(allocator); } - // This is done here to avoid a circular dependency between the safety checks and the thread singleton pointer. - if (this == MessageQueue::thread_singleton) { - MessageQueue::thread_singleton = nullptr; - } + DEV_ASSERT(!is_current_thread_override); } ////////////////////// @@ -493,7 +490,6 @@ CallQueue *MessageQueue::main_singleton = nullptr; thread_local CallQueue *MessageQueue::thread_singleton = nullptr; void MessageQueue::set_thread_singleton_override(CallQueue *p_thread_singleton) { - DEV_ASSERT(p_thread_singleton); // To unset the thread singleton, don't call this with nullptr, but just memfree() it. #ifdef DEV_ENABLED if (thread_singleton) { thread_singleton->is_current_thread_override = false; diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 9c9e0fa899..a7c0a0353e 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -53,7 +53,9 @@ void WorkerThreadPool::_process_task(Task *p_task) { int pool_thread_index = thread_ids[Thread::get_caller_id()]; ThreadData &curr_thread = threads[pool_thread_index]; Task *prev_task = nullptr; // In case this is recursively called. + bool safe_for_nodes_backup = is_current_thread_safe_for_nodes(); + CallQueue *call_queue_backup = MessageQueue::get_singleton() != MessageQueue::get_main_singleton() ? MessageQueue::get_singleton() : nullptr; { // Tasks must start with this unset. They are free to set-and-forget otherwise. @@ -169,6 +171,7 @@ void WorkerThreadPool::_process_task(Task *p_task) { } set_current_thread_safe_for_nodes(safe_for_nodes_backup); + MessageQueue::set_thread_singleton_override(call_queue_backup); #endif } |