diff options
Diffstat (limited to 'core/io/resource_loader.cpp')
-rw-r--r-- | core/io/resource_loader.cpp | 333 |
1 files changed, 195 insertions, 138 deletions
diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index c9ed4e27d9..d2c4668d12 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -207,34 +207,53 @@ void ResourceFormatLoader::_bind_methods() { /////////////////////////////////// +// These are used before and after a wait for a WorkerThreadPool task +// because that can lead to another load started in the same thread, +// something we must treat as a different stack for the purposes +// of tracking nesting. + +#define PREPARE_FOR_WTP_WAIT \ + int load_nesting_backup = ResourceLoader::load_nesting; \ + Vector<String> load_paths_stack_backup = ResourceLoader::load_paths_stack; \ + ResourceLoader::load_nesting = 0; \ + ResourceLoader::load_paths_stack.clear(); + +#define RESTORE_AFTER_WTP_WAIT \ + DEV_ASSERT(ResourceLoader::load_nesting == 0); \ + DEV_ASSERT(ResourceLoader::load_paths_stack.is_empty()); \ + ResourceLoader::load_nesting = load_nesting_backup; \ + ResourceLoader::load_paths_stack = load_paths_stack_backup; \ + load_paths_stack_backup.clear(); + // This should be robust enough to be called redundantly without issues. void ResourceLoader::LoadToken::clear() { thread_load_mutex.lock(); WorkerThreadPool::TaskID task_to_await = 0; + // User-facing tokens shouldn't be deleted until completely claimed. + DEV_ASSERT(user_rc == 0 && user_path.is_empty()); + if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. DEV_ASSERT(thread_load_tasks.has(local_path)); ThreadLoadTask &load_task = thread_load_tasks[local_path]; - if (!load_task.awaited) { + if (load_task.task_id && !load_task.awaited) { task_to_await = load_task.task_id; - load_task.awaited = true; } + // Removing a task which is still in progress would be catastrophic. + // Tokens must be alive until the task thread function is done. + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); thread_load_tasks.erase(local_path); local_path.clear(); } - if (!user_path.is_empty()) { - DEV_ASSERT(user_load_tokens.has(user_path)); - user_load_tokens.erase(user_path); - user_path.clear(); - } - thread_load_mutex.unlock(); // If task is unused, await it here, locally, now the token data is consistent. if (task_to_await) { + PREPARE_FOR_WTP_WAIT WorkerThreadPool::get_singleton()->wait_for_task_completion(task_to_await); + RESTORE_AFTER_WTP_WAIT } } @@ -295,11 +314,11 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin ERR_FAIL_V_MSG(Ref<Resource>(), vformat("No loader found for resource: %s (expected type: %s)", p_path, p_type_hint)); } -void ResourceLoader::_thread_load_function(void *p_userdata) { +// This implementation must allow re-entrancy for a task that started awaiting in a deeper stack frame. +void ResourceLoader::_run_load_task(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; thread_load_mutex.lock(); - caller_task_id = load_task.task_id; if (cleaning_tasks) { load_task.status = THREAD_LOAD_FAILED; thread_load_mutex.unlock(); @@ -322,8 +341,10 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { } // -- + bool xl_remapped = false; + const String &remapped_path = _path_remap(load_task.local_path, &xl_remapped); Error load_err = OK; - Ref<Resource> res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress); + Ref<Resource> res = _load(remapped_path, remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress); if (MessageQueue::get_singleton() != MessageQueue::get_main_singleton()) { MessageQueue::get_singleton()->flush(); } @@ -356,27 +377,40 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { unlock_pending = false; if (!ignoring) { - if (replacing) { - Ref<Resource> old_res = ResourceCache::get_ref(load_task.local_path); - if (old_res.is_valid() && old_res != load_task.resource) { - // If resource is already loaded, only replace its data, to avoid existing invalidating instances. - old_res->copy_from(load_task.resource); + ResourceCache::lock.lock(); // Check and operations must happen atomically. + bool pending_unlock = true; + Ref<Resource> old_res = ResourceCache::get_ref(load_task.local_path); + if (old_res.is_valid()) { + if (old_res != load_task.resource) { + // Resource can already exists at this point for two reasons: + // a) The load uses replace mode. + // b) There were more than one load in flight for the same path because of deadlock prevention. + // Either case, we want to keep the resource that was already there. + ResourceCache::lock.unlock(); + pending_unlock = false; + if (replacing) { + old_res->copy_from(load_task.resource); + } load_task.resource = old_res; } + } else { + load_task.resource->set_path(load_task.local_path); + } + if (pending_unlock) { + ResourceCache::lock.unlock(); } - load_task.resource->set_path(load_task.local_path, replacing); } else { load_task.resource->set_path_cache(load_task.local_path); } - if (load_task.xl_remapped) { + if (xl_remapped) { load_task.resource->set_as_translation_remapped(true); } #ifdef TOOLS_ENABLED load_task.resource->set_edited(false); if (timestamp_on_load) { - uint64_t mt = FileAccess::get_modified_time(load_task.remapped_path); + uint64_t mt = FileAccess::get_modified_time(remapped_path); //printf("mt %s: %lli\n",remapped_path.utf8().get_data(),mt); load_task.resource->set_last_modified_time(mt); } @@ -426,36 +460,44 @@ static String _validate_local_path(const String &p_path) { } Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode) { - thread_load_mutex.lock(); - if (user_load_tokens.has(p_path)) { - print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); - user_load_tokens[p_path]->reference(); // Additional request. - thread_load_mutex.unlock(); - return OK; - } - user_load_tokens[p_path] = nullptr; - thread_load_mutex.unlock(); + Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode, true); + return token.is_valid() ? OK : FAILED; +} - Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode); - if (token.is_valid()) { - thread_load_mutex.lock(); - token->user_path = p_path; - token->reference(); // First request. - user_load_tokens[p_path] = token.ptr(); - print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); - thread_load_mutex.unlock(); - return OK; +ResourceLoader::LoadToken *ResourceLoader::_load_threaded_request_reuse_user_token(const String &p_path) { + HashMap<String, LoadToken *>::Iterator E = user_load_tokens.find(p_path); + if (E) { + print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); + LoadToken *token = E->value; + token->user_rc++; + return token; } else { - return FAILED; + return nullptr; } } +void ResourceLoader::_load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path) { + p_token->user_path = p_path; + p_token->reference(); // Extra RC until all user requests have been gotten. + p_token->user_rc = 1; + user_load_tokens[p_path] = p_token; + print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); +} + Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) { if (r_error) { *r_error = OK; } - Ref<LoadToken> load_token = _load_start(p_path, p_type_hint, LOAD_THREAD_FROM_CURRENT, p_cache_mode); + LoadThreadMode thread_mode = LOAD_THREAD_FROM_CURRENT; + if (WorkerThreadPool::get_singleton()->get_caller_task_id() != WorkerThreadPool::INVALID_TASK_ID) { + // If user is initiating a single-threaded load from a WorkerThreadPool task, + // we instead spawn a new task so there's a precondition that a load in a pool task + // is always initiated by the engine. That makes certain aspects simpler, such as + // cyclic load detection and awaiting. + thread_mode = LOAD_THREAD_SPAWN_SINGLE; + } + Ref<LoadToken> load_token = _load_start(p_path, p_type_hint, thread_mode, p_cache_mode); if (!load_token.is_valid()) { if (r_error) { *r_error = FAILED; @@ -467,7 +509,7 @@ Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hi return res; } -Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode) { +Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user) { String local_path = _validate_local_path(p_path); bool ignoring_cache = p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE || p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE_DEEP; @@ -480,6 +522,13 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, { MutexLock thread_load_lock(thread_load_mutex); + if (p_for_user) { + LoadToken *existing_token = _load_threaded_request_reuse_user_token(p_path); + if (existing_token) { + return Ref<LoadToken>(existing_token); + } + } + if (!ignoring_cache && thread_load_tasks.has(local_path)) { load_token = Ref<LoadToken>(thread_load_tasks[local_path].load_token); if (load_token.is_valid()) { @@ -493,12 +542,14 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, load_token.instantiate(); load_token->local_path = local_path; + if (p_for_user) { + _load_threaded_request_setup_user_token(load_token.ptr(), p_path); + } //create load task { ThreadLoadTask load_task; - load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped); load_task.load_token = load_token.ptr(); load_task.local_path = local_path; load_task.type_hint = p_type_hint; @@ -511,6 +562,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, load_task.resource = existing; load_task.status = THREAD_LOAD_LOADED; load_task.progress = 1.0; + DEV_ASSERT(!thread_load_tasks.has(local_path)); thread_load_tasks[local_path] = load_task; return load_token; } @@ -532,14 +584,20 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; if (run_on_current_thread) { - load_task_ptr->thread_id = Thread::get_caller_id(); + // The current thread may happen to be a thread from the pool. + WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id(); + if (tid != WorkerThreadPool::INVALID_TASK_ID) { + load_task_ptr->task_id = tid; + } else { + load_task_ptr->thread_id = Thread::get_caller_id(); + } } else { - load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr); + load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr); } - } + } // MutexLock(thread_load_mutex). if (run_on_current_thread) { - _thread_load_function(load_task_ptr); + _run_load_task(load_task_ptr); if (must_not_register) { load_token->res_if_unregistered = load_task_ptr->resource; } @@ -626,22 +684,16 @@ Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_e } LoadToken *load_token = user_load_tokens[p_path]; - if (!load_token) { - // This happens if requested from one thread and rapidly querying from another. - if (r_error) { - *r_error = ERR_BUSY; - } - return Ref<Resource>(); - } + DEV_ASSERT(load_token->user_rc >= 1); // Support userland requesting on the main thread before the load is reported to be complete. if (Thread::is_main_thread() && !load_token->local_path.is_empty()) { const ThreadLoadTask &load_task = thread_load_tasks[load_token->local_path]; while (load_task.status == THREAD_LOAD_IN_PROGRESS) { - thread_load_lock.~MutexLock(); + thread_load_lock.temp_unlock(); bool exit = !_ensure_load_progress(); OS::get_singleton()->delay_usec(1000); - new (&thread_load_lock) MutexLock(thread_load_mutex); + thread_load_lock.temp_relock(); if (exit) { break; } @@ -649,8 +701,15 @@ Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_e } res = _load_complete_inner(*load_token, r_error, thread_load_lock); - if (load_token->unreference()) { - memdelete(load_token); + + load_token->user_rc--; + if (load_token->user_rc == 0) { + load_token->user_path.clear(); + user_load_tokens.erase(p_path); + if (load_token->unreference()) { + memdelete(load_token); + load_token = nullptr; + } } } @@ -682,7 +741,7 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (load_task.status == THREAD_LOAD_IN_PROGRESS) { DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0)); - if ((load_task.task_id != 0 && load_task.task_id == caller_task_id) || + if ((load_task.task_id != 0 && load_task.task_id == WorkerThreadPool::get_singleton()->get_caller_task_id()) || (load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) { // Load is in progress, but it's precisely this thread the one in charge. // That means this is a cyclic load. @@ -693,55 +752,45 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro } bool loader_is_wtp = load_task.task_id != 0; - Error wtp_task_err = FAILED; if (loader_is_wtp) { // Loading thread is in the worker pool. + p_thread_load_lock.temp_unlock(); + + PREPARE_FOR_WTP_WAIT + Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + RESTORE_AFTER_WTP_WAIT + + DEV_ASSERT(!wait_err || wait_err == ERR_BUSY); + if (wait_err == ERR_BUSY) { + // The WorkerThreadPool has reported that the current task wants to await on an older one. + // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of + // resource loading that means that the task to wait for can be restarted here to break the + // cycle, with as much recursion into this process as needed. + // When the stack is eventually unrolled, the original load will have been notified to go on. + _run_load_task(&load_task); + } + + p_thread_load_lock.temp_relock(); load_task.awaited = true; - thread_load_mutex.unlock(); - wtp_task_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); - } - if (load_task.status == THREAD_LOAD_IN_PROGRESS) { // If early errored, awaiting would deadlock. - if (loader_is_wtp) { - if (wtp_task_err == ERR_BUSY) { - // The WorkerThreadPool has reported that the current task wants to await on an older one. - // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of - // resource loading that means that the task to wait for can be restarted here to break the - // cycle, with as much recursion into this process as needed. - // When the stack is eventually unrolled, the original load will have been notified to go on. - // CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's - // an ongoing load for that resource and wait for it again. This value forces a new load. - Ref<ResourceLoader::LoadToken> token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE); - Ref<Resource> resource = _load_complete(*token.ptr(), &wtp_task_err); - if (r_error) { - *r_error = wtp_task_err; - } - thread_load_mutex.lock(); - return resource; - } else { - DEV_ASSERT(wtp_task_err == OK); - thread_load_mutex.lock(); - } - } else if (load_task.need_wait) { - // Loading thread is main or user thread. - if (!load_task.cond_var) { - load_task.cond_var = memnew(ConditionVariable); - } - load_task.awaiters_count++; - do { - load_task.cond_var->wait(p_thread_load_lock); - DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); - } while (load_task.need_wait); - load_task.awaiters_count--; - if (load_task.awaiters_count == 0) { - memdelete(load_task.cond_var); - load_task.cond_var = nullptr; - } + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); + } else if (load_task.need_wait) { + // Loading thread is main or user thread. + if (!load_task.cond_var) { + load_task.cond_var = memnew(ConditionVariable); } - } else { - if (loader_is_wtp) { - thread_load_mutex.lock(); + load_task.awaiters_count++; + do { + load_task.cond_var->wait(p_thread_load_lock); + DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); + } while (load_task.need_wait); + load_task.awaiters_count--; + if (load_task.awaiters_count == 0) { + memdelete(load_task.cond_var); + load_task.cond_var = nullptr; } + + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); } } @@ -1055,36 +1104,39 @@ String ResourceLoader::_path_remap(const String &p_path, bool *r_translation_rem new_path = path_remaps[new_path]; } else { // Try file remap. - Error err; - Ref<FileAccess> f = FileAccess::open(new_path + ".remap", FileAccess::READ, &err); - if (f.is_valid()) { - VariantParser::StreamFile stream; - stream.f = f; - - String assign; - Variant value; - VariantParser::Tag next_tag; - - int lines = 0; - String error_text; - while (true) { - assign = Variant(); - next_tag.fields.clear(); - next_tag.name = String(); - - err = VariantParser::parse_tag_assign_eof(&stream, lines, error_text, next_tag, assign, value, nullptr, true); - if (err == ERR_FILE_EOF) { - break; - } else if (err != OK) { - ERR_PRINT("Parse error: " + p_path + ".remap:" + itos(lines) + " error: " + error_text + "."); - break; - } + // Usually, there's no remap file and FileAccess::exists() is faster than FileAccess::open(). + if (FileAccess::exists(new_path + ".remap")) { + Error err; + Ref<FileAccess> f = FileAccess::open(new_path + ".remap", FileAccess::READ, &err); + if (f.is_valid()) { + VariantParser::StreamFile stream; + stream.f = f; + + String assign; + Variant value; + VariantParser::Tag next_tag; + + int lines = 0; + String error_text; + while (true) { + assign = Variant(); + next_tag.fields.clear(); + next_tag.name = String(); + + err = VariantParser::parse_tag_assign_eof(&stream, lines, error_text, next_tag, assign, value, nullptr, true); + if (err == ERR_FILE_EOF) { + break; + } else if (err != OK) { + ERR_PRINT("Parse error: " + p_path + ".remap:" + itos(lines) + " error: " + error_text + "."); + break; + } - if (assign == "path") { - new_path = value; - break; - } else if (next_tag.name != "remap") { - break; + if (assign == "path") { + new_path = value; + break; + } else if (next_tag.name != "remap") { + break; + } } } } @@ -1156,7 +1208,7 @@ void ResourceLoader::clear_translation_remaps() { void ResourceLoader::clear_thread_load_tasks() { // Bring the thing down as quickly as possible without causing deadlocks or leaks. - thread_load_mutex.lock(); + MutexLock thread_load_lock(thread_load_mutex); cleaning_tasks = true; while (true) { @@ -1175,21 +1227,23 @@ void ResourceLoader::clear_thread_load_tasks() { if (none_running) { break; } - thread_load_mutex.unlock(); + thread_load_lock.temp_unlock(); OS::get_singleton()->delay_usec(1000); - thread_load_mutex.lock(); + thread_load_lock.temp_relock(); } while (user_load_tokens.begin()) { - // User load tokens remove themselves from the map on destruction. - memdelete(user_load_tokens.begin()->value); + LoadToken *user_token = user_load_tokens.begin()->value; + user_load_tokens.remove(user_load_tokens.begin()); + DEV_ASSERT(user_token->user_rc > 0 && !user_token->user_path.is_empty()); + user_token->user_path.clear(); + user_token->user_rc = 0; + user_token->unreference(); } - user_load_tokens.clear(); thread_load_tasks.clear(); cleaning_tasks = false; - thread_load_mutex.unlock(); } void ResourceLoader::load_path_remaps() { @@ -1302,12 +1356,15 @@ bool ResourceLoader::abort_on_missing_resource = true; bool ResourceLoader::timestamp_on_load = false; thread_local int ResourceLoader::load_nesting = 0; -thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0; thread_local Vector<String> ResourceLoader::load_paths_stack; thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides; +SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> &_get_res_loader_mutex() { + return ResourceLoader::thread_load_mutex; +} + template <> -thread_local uint32_t SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG>::count = 0; +thread_local SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG>::TLSData SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG>::tls_data(_get_res_loader_mutex()); SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> ResourceLoader::thread_load_mutex; HashMap<String, ResourceLoader::ThreadLoadTask> ResourceLoader::thread_load_tasks; bool ResourceLoader::cleaning_tasks = false; |