diff options
author | Pedro J. Estébanez <pedrojrulez@gmail.com> | 2024-07-10 12:53:14 +0200 |
---|---|---|
committer | Pedro J. Estébanez <pedrojrulez@gmail.com> | 2024-09-05 13:29:38 +0200 |
commit | ea28ac510de3cef098a7624986072ff44546e87f (patch) | |
tree | 112f605fbaa50865465a6096da24995461cd0dae /core | |
parent | ece392538ec4ec2742546babb4f13d77c28390b8 (diff) | |
download | redot-engine-ea28ac510de3cef098a7624986072ff44546e87f.tar.gz |
ResourceLoader: Enhance deadlock prevention
Benefits:
- Simpler code. The main load function is renamed so it's apparent that it's not just a thread entry point anymore.
- Cache and thread modes of the original task are honored. A beautiful consequence of this is that, unlike formerly, re-issued loads can use the resource cache, which makes this mechanism much more performant.
- The newly added getter for caller task id in WorkerThreadPool allows to remove the custom tracking of that in ResourceLoader.
- The check to replace a cached resource and the replacement itself happen atomically. That fixes deadlock prevention leading to multiple resource instances of the same one on disk. As a side effect, it also makes the regular check for replace load mode more robust.
(cherry picked from commit 28619e26cf35227c3ddab35878e1045f82895657)
Diffstat (limited to 'core')
-rw-r--r-- | core/io/resource_loader.cpp | 52 | ||||
-rw-r--r-- | core/io/resource_loader.h | 3 | ||||
-rw-r--r-- | core/object/worker_thread_pool.cpp | 9 | ||||
-rw-r--r-- | core/object/worker_thread_pool.h | 1 |
4 files changed, 45 insertions, 20 deletions
diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index f754809774..f21d1e92dd 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -311,11 +311,11 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin ERR_FAIL_V_MSG(Ref<Resource>(), vformat("No loader found for resource: %s (expected type: %s)", p_path, p_type_hint)); } -void ResourceLoader::_thread_load_function(void *p_userdata) { +// This implementation must allow re-entrancy for a task that started awaiting in a deeper stack frame. +void ResourceLoader::_run_load_task(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; thread_load_mutex.lock(); - caller_task_id = load_task.task_id; if (cleaning_tasks) { load_task.status = THREAD_LOAD_FAILED; thread_load_mutex.unlock(); @@ -372,15 +372,28 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { unlock_pending = false; if (!ignoring) { - if (replacing) { - Ref<Resource> old_res = ResourceCache::get_ref(load_task.local_path); - if (old_res.is_valid() && old_res != load_task.resource) { - // If resource is already loaded, only replace its data, to avoid existing invalidating instances. - old_res->copy_from(load_task.resource); + ResourceCache::lock.lock(); // Check and operations must happen atomically. + bool pending_unlock = true; + Ref<Resource> old_res = ResourceCache::get_ref(load_task.local_path); + if (old_res.is_valid()) { + if (old_res != load_task.resource) { + // Resource can already exists at this point for two reasons: + // a) The load uses replace mode. + // b) There were more than one load in flight for the same path because of deadlock prevention. + // Either case, we want to keep the resource that was already there. + ResourceCache::lock.unlock(); + pending_unlock = false; + if (replacing) { + old_res->copy_from(load_task.resource); + } load_task.resource = old_res; } + } else { + load_task.resource->set_path(load_task.local_path); + } + if (pending_unlock) { + ResourceCache::lock.unlock(); } - load_task.resource->set_path(load_task.local_path, replacing); } else { load_task.resource->set_path_cache(load_task.local_path); } @@ -548,14 +561,20 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; if (run_on_current_thread) { - load_task_ptr->thread_id = Thread::get_caller_id(); + // The current thread may happen to be a thread from the pool. + WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id(); + if (tid != WorkerThreadPool::INVALID_TASK_ID) { + load_task_ptr->task_id = tid; + } else { + load_task_ptr->thread_id = Thread::get_caller_id(); + } } else { - load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr); + load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr); } } if (run_on_current_thread) { - _thread_load_function(load_task_ptr); + _run_load_task(load_task_ptr); if (must_not_register) { load_token->res_if_unregistered = load_task_ptr->resource; } @@ -708,7 +727,7 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (load_task.status == THREAD_LOAD_IN_PROGRESS) { DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0)); - if ((load_task.task_id != 0 && load_task.task_id == caller_task_id) || + if ((load_task.task_id != 0 && load_task.task_id == WorkerThreadPool::get_singleton()->get_caller_task_id()) || (load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) { // Load is in progress, but it's precisely this thread the one in charge. // That means this is a cyclic load. @@ -737,12 +756,10 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro // resource loading that means that the task to wait for can be restarted here to break the // cycle, with as much recursion into this process as needed. // When the stack is eventually unrolled, the original load will have been notified to go on. - // CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's - // an ongoing load for that resource and wait for it again. This value forces a new load. - Ref<ResourceLoader::LoadToken> token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE); - Ref<Resource> resource = _load_complete(*token.ptr(), &wtp_task_err); + _run_load_task(&load_task); + Ref<Resource> resource = load_task.resource; if (r_error) { - *r_error = wtp_task_err; + *r_error = load_task.error; } thread_load_mutex.lock(); return resource; @@ -1330,7 +1347,6 @@ bool ResourceLoader::abort_on_missing_resource = true; bool ResourceLoader::timestamp_on_load = false; thread_local int ResourceLoader::load_nesting = 0; -thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0; thread_local Vector<String> ResourceLoader::load_paths_stack; thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides; diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 7a931cb161..511bea1e16 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -187,10 +187,9 @@ private: HashSet<String> sub_tasks; }; - static void _thread_load_function(void *p_userdata); + static void _run_load_task(void *p_userdata); static thread_local int load_nesting; - static thread_local WorkerThreadPool::TaskID caller_task_id; static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level. static thread_local Vector<String> load_paths_stack; static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex; diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 56b9fa8475..155d963a2b 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -665,6 +665,15 @@ int WorkerThreadPool::get_thread_index() { return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1; } +WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() { + int th_index = get_thread_index(); + if (th_index != -1 && singleton->threads[th_index].current_task) { + return singleton->threads[th_index].current_task->self; + } else { + return INVALID_TASK_ID; + } +} + #ifdef THREADS_ENABLED uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) { return _thread_enter_unlock_allowance_zone(p_mutex, false); diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index 8774143abf..57b67b32fa 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -239,6 +239,7 @@ public: static WorkerThreadPool *get_singleton() { return singleton; } static int get_thread_index(); + static TaskID get_caller_task_id(); #ifdef THREADS_ENABLED static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex); |