diff options
Diffstat (limited to 'core/io/resource_loader.cpp')
-rw-r--r-- | core/io/resource_loader.cpp | 601 |
1 files changed, 321 insertions, 280 deletions
diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 9af3a7daed..a27341dd2c 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -202,20 +202,71 @@ void ResourceFormatLoader::_bind_methods() { /////////////////////////////////// +// This should be robust enough to be called redundantly without issues. +void ResourceLoader::LoadToken::clear() { + thread_load_mutex.lock(); + + WorkerThreadPool::TaskID task_to_await = 0; + + if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. + DEV_ASSERT(thread_load_tasks.has(local_path)); + ThreadLoadTask &load_task = thread_load_tasks[local_path]; + if (!load_task.awaited) { + task_to_await = load_task.task_id; + load_task.awaited = true; + } + thread_load_tasks.erase(local_path); + local_path.clear(); + } + + if (!user_path.is_empty()) { + DEV_ASSERT(user_load_tokens.has(user_path)); + user_load_tokens.erase(user_path); + user_path.clear(); + } + + thread_load_mutex.unlock(); + + // If task is unused, await it here, locally, now the token data is consistent. + if (task_to_await) { + WorkerThreadPool::get_singleton()->wait_for_task_completion(task_to_await); + } +} + +ResourceLoader::LoadToken::~LoadToken() { + clear(); +} + Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_original_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error, bool p_use_sub_threads, float *r_progress) { - bool found = false; + load_nesting++; + if (load_paths_stack.size()) { + thread_load_mutex.lock(); + HashMap<String, ThreadLoadTask>::Iterator E = thread_load_tasks.find(load_paths_stack[load_paths_stack.size() - 1]); + if (E) { + E->value.sub_tasks.insert(p_path); + } + thread_load_mutex.unlock(); + } + load_paths_stack.push_back(p_path); // Try all loaders and pick the first match for the type hint + bool found = false; + Ref<Resource> res; for (int i = 0; i < loader_count; i++) { if (!loader[i]->recognize_path(p_path, p_type_hint)) { continue; } found = true; - Ref<Resource> res = loader[i]->load(p_path, !p_original_path.is_empty() ? p_original_path : p_path, r_error, p_use_sub_threads, r_progress, p_cache_mode); - if (res.is_null()) { - continue; + res = loader[i]->load(p_path, !p_original_path.is_empty() ? p_original_path : p_path, r_error, p_use_sub_threads, r_progress, p_cache_mode); + if (!res.is_null()) { + break; } + } + + load_paths_stack.resize(load_paths_stack.size() - 1); + load_nesting--; + if (!res.is_null()) { return res; } @@ -232,47 +283,60 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin void ResourceLoader::_thread_load_function(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; - load_task.loader_id = Thread::get_caller_id(); - if (load_task.cond_var) { - //this is an actual thread, so wait for Ok from semaphore - thread_load_semaphore->wait(); //wait until its ok to start loading + thread_load_mutex.lock(); + caller_task_id = load_task.task_id; + if (cleaning_tasks) { + load_task.status = THREAD_LOAD_FAILED; + thread_load_mutex.unlock(); + return; } - load_task.resource = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress); + thread_load_mutex.unlock(); - load_task.progress = 1.0; //it was fully loaded at this point, so force progress to 1.0 + // Thread-safe either if it's the current thread or a brand new one. + CallQueue *mq_override = nullptr; + if (load_nesting == 0) { + if (!load_task.dependent_path.is_empty()) { + load_paths_stack.push_back(load_task.dependent_path); + } + if (!Thread::is_main_thread()) { + mq_override = memnew(CallQueue); + MessageQueue::set_thread_singleton_override(mq_override); + } + } else { + DEV_ASSERT(load_task.dependent_path.is_empty()); + } + // -- - thread_load_mutex->lock(); + Ref<Resource> res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress); + + thread_load_mutex.lock(); + + load_task.resource = res; + + load_task.progress = 1.0; //it was fully loaded at this point, so force progress to 1.0 if (load_task.error != OK) { load_task.status = THREAD_LOAD_FAILED; } else { load_task.status = THREAD_LOAD_LOADED; } - if (load_task.cond_var) { - if (load_task.start_next && thread_waiting_count > 0) { - thread_waiting_count--; - //thread loading count remains constant, this ends but another one begins - thread_load_semaphore->post(); - } else { - thread_loading_count--; //no threads waiting, just reduce loading count - } - - print_lt("END: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count)); + if (load_task.cond_var) { load_task.cond_var->notify_all(); memdelete(load_task.cond_var); load_task.cond_var = nullptr; } if (load_task.resource.is_valid()) { - load_task.resource->set_path(load_task.local_path); + if (load_task.cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) { + load_task.resource->set_path(load_task.local_path); + } if (load_task.xl_remapped) { load_task.resource->set_as_translation_remapped(true); } #ifdef TOOLS_ENABLED - load_task.resource->set_edited(false); if (timestamp_on_load) { uint64_t mt = FileAccess::get_modified_time(load_task.remapped_path); @@ -286,7 +350,12 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { } } - thread_load_mutex->unlock(); + thread_load_mutex.unlock(); + + if (load_nesting == 0 && mq_override) { + memdelete(mq_override); + MessageQueue::set_thread_singleton_override(nullptr); + } } static String _validate_local_path(const String &p_path) { @@ -299,91 +368,127 @@ static String _validate_local_path(const String &p_path) { return ProjectSettings::get_singleton()->localize_path(p_path); } } -Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode, const String &p_source_resource) { - String local_path = _validate_local_path(p_path); - thread_load_mutex->lock(); +Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode) { + thread_load_mutex.lock(); + if (user_load_tokens.has(p_path)) { + print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); + user_load_tokens[p_path]->reference(); // Additional request. + thread_load_mutex.unlock(); + return OK; + } + user_load_tokens[p_path] = nullptr; + thread_load_mutex.unlock(); + + Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode); + if (token.is_valid()) { + thread_load_mutex.lock(); + token->user_path = p_path; + token->reference(); // First request. + user_load_tokens[p_path] = token.ptr(); + print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); + thread_load_mutex.unlock(); + return OK; + } else { + return FAILED; + } +} - if (!p_source_resource.is_empty()) { - //must be loading from this resource - if (!thread_load_tasks.has(p_source_resource)) { - thread_load_mutex->unlock(); - ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "There is no thread loading source resource '" + p_source_resource + "'."); - } - //must not be already added as s sub tasks - if (thread_load_tasks[p_source_resource].sub_tasks.has(local_path)) { - thread_load_mutex->unlock(); - ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Thread loading source resource '" + p_source_resource + "' already is loading '" + local_path + "'."); - } +Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) { + if (r_error) { + *r_error = OK; } - if (thread_load_tasks.has(local_path)) { - thread_load_tasks[local_path].requests++; - if (!p_source_resource.is_empty()) { - thread_load_tasks[p_source_resource].sub_tasks.insert(local_path); + Ref<LoadToken> load_token = _load_start(p_path, p_type_hint, LOAD_THREAD_FROM_CURRENT, p_cache_mode); + if (!load_token.is_valid()) { + if (r_error) { + *r_error = FAILED; } - thread_load_mutex->unlock(); - return OK; + return Ref<Resource>(); } - { - //create load task - - ThreadLoadTask load_task; + Ref<Resource> res = _load_complete(*load_token.ptr(), r_error); + return res; +} - load_task.requests = 1; - load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped); - load_task.local_path = local_path; - load_task.type_hint = p_type_hint; - load_task.cache_mode = p_cache_mode; - load_task.use_sub_threads = p_use_sub_threads; +Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode) { + String local_path = _validate_local_path(p_path); - { //must check if resource is already loaded before attempting to load it in a thread + Ref<LoadToken> load_token; + bool must_not_register = false; + ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load. + ThreadLoadTask *load_task_ptr = nullptr; + bool run_on_current_thread = false; + { + MutexLock thread_load_lock(thread_load_mutex); - if (load_task.loader_id == Thread::get_caller_id()) { - thread_load_mutex->unlock(); - ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Attempted to load a resource already being loaded from this thread, cyclic reference?"); + if (thread_load_tasks.has(local_path)) { + load_token = Ref<LoadToken>(thread_load_tasks[local_path].load_token); + if (!load_token.is_valid()) { + // The token is dying (reached 0 on another thread). + // Ensure it's killed now so the path can be safely reused right away. + thread_load_tasks[local_path].load_token->clear(); + } else { + if (p_cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) { + return load_token; + } } + } - Ref<Resource> existing = ResourceCache::get_ref(local_path); + load_token.instantiate(); + load_token->local_path = local_path; - if (existing.is_valid()) { - //referencing is fine - load_task.resource = existing; - load_task.status = THREAD_LOAD_LOADED; - load_task.progress = 1.0; + //create load task + { + ThreadLoadTask load_task; + + load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped); + load_task.load_token = load_token.ptr(); + load_task.local_path = local_path; + load_task.type_hint = p_type_hint; + load_task.cache_mode = p_cache_mode; + load_task.use_sub_threads = p_thread_mode == LOAD_THREAD_DISTRIBUTE; + if (p_cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) { + Ref<Resource> existing = ResourceCache::get_ref(local_path); + if (existing.is_valid()) { + //referencing is fine + load_task.resource = existing; + load_task.status = THREAD_LOAD_LOADED; + load_task.progress = 1.0; + thread_load_tasks[local_path] = load_task; + return load_token; + } } - } - - if (!p_source_resource.is_empty()) { - thread_load_tasks[p_source_resource].sub_tasks.insert(local_path); - } - thread_load_tasks[local_path] = load_task; - } + // If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish unconditionally synchronously. + must_not_register = thread_load_tasks.has(local_path) && p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE; + if (must_not_register) { + load_token->local_path.clear(); + unregistered_load_task = load_task; + } else { + thread_load_tasks[local_path] = load_task; + } - ThreadLoadTask &load_task = thread_load_tasks[local_path]; + load_task_ptr = must_not_register ? &unregistered_load_task : &thread_load_tasks[local_path]; + } - if (load_task.resource.is_null()) { //needs to be loaded in thread + run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; - load_task.cond_var = memnew(ConditionVariable); - if (thread_loading_count < thread_load_max) { - thread_loading_count++; - thread_load_semaphore->post(); //we have free threads, so allow one + if (run_on_current_thread) { + load_task_ptr->thread_id = Thread::get_caller_id(); + if (must_not_register) { + load_token->res_if_unregistered = load_task_ptr->resource; + } } else { - thread_waiting_count++; + load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr); } - - print_lt("REQUEST: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count)); - - load_task.thread = memnew(Thread); - load_task.thread->start(_thread_load_function, &thread_load_tasks[local_path]); - load_task.loader_id = load_task.thread->get_id(); } - thread_load_mutex->unlock(); + if (run_on_current_thread) { + _thread_load_function(load_task_ptr); + } - return OK; + return load_token; } float ResourceLoader::_dependency_get_progress(const String &p_path) { @@ -409,13 +514,22 @@ float ResourceLoader::_dependency_get_progress(const String &p_path) { } ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const String &p_path, float *r_progress) { - String local_path = _validate_local_path(p_path); + MutexLock thread_load_lock(thread_load_mutex); + + if (!user_load_tokens.has(p_path)) { + print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected."); + return THREAD_LOAD_INVALID_RESOURCE; + } - thread_load_mutex->lock(); + String local_path = _validate_local_path(p_path); if (!thread_load_tasks.has(local_path)) { - thread_load_mutex->unlock(); +#ifdef DEV_ENABLED + CRASH_NOW(); +#endif + // On non-dev, be defensive and at least avoid crashing (at this point at least). return THREAD_LOAD_INVALID_RESOURCE; } + ThreadLoadTask &load_task = thread_load_tasks[local_path]; ThreadLoadStatus status; status = load_task.status; @@ -423,198 +537,120 @@ ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const *r_progress = _dependency_get_progress(local_path); } - thread_load_mutex->unlock(); - return status; } Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_error) { - String local_path = _validate_local_path(p_path); - - MutexLock thread_load_lock(*thread_load_mutex); - if (!thread_load_tasks.has(local_path)) { - if (r_error) { - *r_error = ERR_INVALID_PARAMETER; - } - return Ref<Resource>(); + if (r_error) { + *r_error = OK; } - ThreadLoadTask &load_task = thread_load_tasks[local_path]; + Ref<Resource> res; + { + MutexLock thread_load_lock(thread_load_mutex); - if (load_task.status == THREAD_LOAD_IN_PROGRESS) { - if (load_task.loader_id == Thread::get_caller_id()) { - // Load is in progress, but it's precisely this thread the one in charge. - // That means this is a cyclic load. + if (!user_load_tokens.has(p_path)) { + print_verbose("load_threaded_get(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected."); if (r_error) { - *r_error = ERR_BUSY; + *r_error = ERR_INVALID_PARAMETER; } return Ref<Resource>(); - } else if (!load_task.cond_var) { - // Load is in progress, but a condition variable was never created for it. - // That happens when a load has been initiated with subthreads disabled, - // but now another load thread needs to interact with this one (either - // because of subthreads being used this time, or because it's simply a - // threaded load running on a different thread). - // Since we want to be notified when the load ends, we must create the - // condition variable now. - load_task.cond_var = memnew(ConditionVariable); } - } - - //cond var still exists, meaning it's still loading, request poll - if (load_task.cond_var) { - { - // As we got a cond var, this means we are going to have to wait - // until the sub-resource is done loading - // - // As this thread will become 'blocked' we should "exchange" its - // active status with a waiting one, to ensure load continues. - // - // This ensures loading is never blocked and that is also within - // the maximum number of active threads. - - if (thread_waiting_count > 0) { - thread_waiting_count--; - thread_loading_count++; - thread_load_semaphore->post(); - - load_task.start_next = false; //do not start next since we are doing it here - } - - thread_suspended_count++; - - print_lt("GET: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count)); - } - - bool still_valid = true; - bool was_thread = load_task.thread; - do { - load_task.cond_var->wait(thread_load_lock); - if (!thread_load_tasks.has(local_path)) { //may have been erased during unlock and this was always an invalid call - still_valid = false; - break; - } - } while (load_task.cond_var); // In case of spurious wakeup. - if (was_thread) { - thread_suspended_count--; - } - - if (!still_valid) { + LoadToken *load_token = user_load_tokens[p_path]; + if (!load_token) { + // This happens if requested from one thread and rapidly querying from another. if (r_error) { - *r_error = ERR_INVALID_PARAMETER; + *r_error = ERR_BUSY; } return Ref<Resource>(); } + res = _load_complete_inner(*load_token, r_error, thread_load_lock); + if (load_token->unreference()) { + memdelete(load_token); + } } - Ref<Resource> resource = load_task.resource; - if (r_error) { - *r_error = load_task.error; - } - - load_task.requests--; + print_lt("GET: user load tokens: " + itos(user_load_tokens.size())); - if (load_task.requests == 0) { - if (load_task.thread) { //thread may not have been used - load_task.thread->wait_to_finish(); - memdelete(load_task.thread); - } - thread_load_tasks.erase(local_path); - } + return res; +} - return resource; +Ref<Resource> ResourceLoader::_load_complete(LoadToken &p_load_token, Error *r_error) { + MutexLock thread_load_lock(thread_load_mutex); + return _load_complete_inner(p_load_token, r_error, thread_load_lock); } -Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) { +Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock<SafeBinaryMutex<BINARY_MUTEX_TAG>> &p_thread_load_lock) { if (r_error) { - *r_error = ERR_CANT_OPEN; + *r_error = OK; } - String local_path = _validate_local_path(p_path); + if (!p_load_token.local_path.is_empty()) { + if (!thread_load_tasks.has(p_load_token.local_path)) { +#ifdef DEV_ENABLED + CRASH_NOW(); +#endif + // On non-dev, be defensive and at least avoid crashing (at this point at least). + if (r_error) { + *r_error = ERR_BUG; + } + return Ref<Resource>(); + } - if (p_cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) { - thread_load_mutex->lock(); + ThreadLoadTask &load_task = thread_load_tasks[p_load_token.local_path]; - //Is it already being loaded? poll until done - if (thread_load_tasks.has(local_path)) { - Error err = load_threaded_request(p_path, p_type_hint); - if (err != OK) { + if (load_task.status == THREAD_LOAD_IN_PROGRESS) { + DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0)); + + if ((load_task.task_id != 0 && load_task.task_id == caller_task_id) || + (load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) { + // Load is in progress, but it's precisely this thread the one in charge. + // That means this is a cyclic load. if (r_error) { - *r_error = err; + *r_error = ERR_BUSY; } - thread_load_mutex->unlock(); return Ref<Resource>(); } - thread_load_mutex->unlock(); - return load_threaded_get(p_path, r_error); - } - - //Is it cached? - - Ref<Resource> existing = ResourceCache::get_ref(local_path); - - if (existing.is_valid()) { - thread_load_mutex->unlock(); - - if (r_error) { - *r_error = OK; + if (load_task.task_id != 0 && !load_task.awaited) { + // Loading thread is in the worker pool and still not awaited. + load_task.awaited = true; + thread_load_mutex.unlock(); + WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + thread_load_mutex.lock(); + } else { + // Loading thread is main or user thread, or in the worker pool, but already awaited by some other thread. + if (!load_task.cond_var) { + load_task.cond_var = memnew(ConditionVariable); + } + do { + load_task.cond_var->wait(p_thread_load_lock); + DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); + } while (load_task.cond_var); } - - return existing; //use cached } - //load using task (but this thread) - ThreadLoadTask load_task; - - load_task.requests = 1; - load_task.local_path = local_path; - load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped); - load_task.type_hint = p_type_hint; - load_task.cache_mode = p_cache_mode; //ignore - load_task.loader_id = Thread::get_caller_id(); - - thread_load_tasks[local_path] = load_task; - - thread_load_mutex->unlock(); - - _thread_load_function(&thread_load_tasks[local_path]); - - return load_threaded_get(p_path, r_error); - - } else { - bool xl_remapped = false; - String path = _path_remap(local_path, &xl_remapped); - - if (path.is_empty()) { - ERR_FAIL_V_MSG(Ref<Resource>(), "Remapping '" + local_path + "' failed."); - } - - print_verbose("Loading resource: " + path); - float p; - Ref<Resource> res = _load(path, local_path, p_type_hint, p_cache_mode, r_error, false, &p); - - if (res.is_null()) { - print_verbose("Failed loading resource: " + path); - return Ref<Resource>(); + if (cleaning_tasks) { + load_task.resource = Ref<Resource>(); + load_task.error = FAILED; } - if (xl_remapped) { - res->set_as_translation_remapped(true); + Ref<Resource> resource = load_task.resource; + if (r_error) { + *r_error = load_task.error; } - -#ifdef TOOLS_ENABLED - - res->set_edited(false); - if (timestamp_on_load) { - uint64_t mt = FileAccess::get_modified_time(path); - //printf("mt %s: %lli\n",remapped_path.utf8().get_data(),mt); - res->set_last_modified_time(mt); + return resource; + } else { + // Special case of an unregistered task. + // The resource should have been loaded by now. + Ref<Resource> resource = p_load_token.res_if_unregistered; + if (!resource.is_valid()) { + if (r_error) { + *r_error = FAILED; + } } -#endif - - return res; + return resource; } } @@ -958,32 +994,42 @@ void ResourceLoader::clear_translation_remaps() { } void ResourceLoader::clear_thread_load_tasks() { - thread_load_mutex->lock(); - - for (KeyValue<String, ResourceLoader::ThreadLoadTask> &E : thread_load_tasks) { - switch (E.value.status) { - case ResourceLoader::ThreadLoadStatus::THREAD_LOAD_LOADED: { - E.value.resource = Ref<Resource>(); - } break; - - case ResourceLoader::ThreadLoadStatus::THREAD_LOAD_IN_PROGRESS: { - if (E.value.thread != nullptr) { - E.value.thread->wait_to_finish(); - memdelete(E.value.thread); - E.value.thread = nullptr; + // Bring the thing down as quickly as possible without causing deadlocks or leaks. + + thread_load_mutex.lock(); + cleaning_tasks = true; + + while (true) { + bool none_running = true; + if (thread_load_tasks.size()) { + for (KeyValue<String, ResourceLoader::ThreadLoadTask> &E : thread_load_tasks) { + if (E.value.status == THREAD_LOAD_IN_PROGRESS) { + if (E.value.cond_var) { + E.value.cond_var->notify_all(); + memdelete(E.value.cond_var); + E.value.cond_var = nullptr; + } + none_running = false; } - E.value.resource = Ref<Resource>(); - } break; - - case ResourceLoader::ThreadLoadStatus::THREAD_LOAD_FAILED: - default: { - // do nothing } } + if (none_running) { + break; + } + thread_load_mutex.unlock(); + OS::get_singleton()->delay_usec(1000); + thread_load_mutex.lock(); + } + + for (KeyValue<String, LoadToken *> &E : user_load_tokens) { + memdelete(E.value); } + user_load_tokens.clear(); + thread_load_tasks.clear(); - thread_load_mutex->unlock(); + cleaning_tasks = false; + thread_load_mutex.unlock(); } void ResourceLoader::load_path_remaps() { @@ -1080,20 +1126,14 @@ void ResourceLoader::remove_custom_loaders() { } } -void ResourceLoader::initialize() { - thread_load_mutex = memnew(SafeBinaryMutex<BINARY_MUTEX_TAG>); - thread_load_max = OS::get_singleton()->get_processor_count(); - thread_loading_count = 0; - thread_waiting_count = 0; - thread_suspended_count = 0; - thread_load_semaphore = memnew(Semaphore); +bool ResourceLoader::is_cleaning_tasks() { + MutexLock lock(thread_load_mutex); + return cleaning_tasks; } -void ResourceLoader::finalize() { - clear_thread_load_tasks(); - memdelete(thread_load_mutex); - memdelete(thread_load_semaphore); -} +void ResourceLoader::initialize() {} + +void ResourceLoader::finalize() {} ResourceLoadErrorNotify ResourceLoader::err_notify = nullptr; void *ResourceLoader::err_notify_ud = nullptr; @@ -1105,16 +1145,17 @@ bool ResourceLoader::create_missing_resources_if_class_unavailable = false; bool ResourceLoader::abort_on_missing_resource = true; bool ResourceLoader::timestamp_on_load = false; +thread_local int ResourceLoader::load_nesting = 0; +thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0; +thread_local Vector<String> ResourceLoader::load_paths_stack; + template <> thread_local uint32_t SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG>::count = 0; -SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> *ResourceLoader::thread_load_mutex = nullptr; +SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> ResourceLoader::thread_load_mutex; HashMap<String, ResourceLoader::ThreadLoadTask> ResourceLoader::thread_load_tasks; -Semaphore *ResourceLoader::thread_load_semaphore = nullptr; +bool ResourceLoader::cleaning_tasks = false; -int ResourceLoader::thread_loading_count = 0; -int ResourceLoader::thread_waiting_count = 0; -int ResourceLoader::thread_suspended_count = 0; -int ResourceLoader::thread_load_max = 0; +HashMap<String, ResourceLoader::LoadToken *> ResourceLoader::user_load_tokens; SelfList<Resource>::List ResourceLoader::remapped_list; HashMap<String, Vector<String>> ResourceLoader::translation_remaps; |