diff options
author | Rémi Verschelde <remi@verschelde.fr> | 2024-04-15 10:12:00 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-15 10:12:00 +0200 |
commit | c951421c996467dcb7106a33774410a7b5258690 (patch) | |
tree | 10bf3ef87a62987b6303d7dca63adc809ebbb05b | |
parent | a44b0b6dc1ccd6dc364bb72c426e844d6c85744e (diff) | |
parent | 65686dedf9778e829287f63c7179a523d44fa085 (diff) | |
download | redot-engine-c951421c996467dcb7106a33774410a7b5258690.tar.gz |
Merge pull request #90268 from RandomShaper/wtp_servers
Use WorkerThreadPool for Server threads (enhanced)
38 files changed, 457 insertions, 389 deletions
diff --git a/core/config/engine.cpp b/core/config/engine.cpp index 9f4bff3779..f2f8aebe8b 100644 --- a/core/config/engine.cpp +++ b/core/config/engine.cpp @@ -82,6 +82,17 @@ int Engine::get_audio_output_latency() const { return _audio_output_latency; } +void Engine::increment_frames_drawn() { + if (frame_server_synced) { + server_syncs++; + } else { + server_syncs = 0; + } + frame_server_synced = false; + + frames_drawn++; +} + uint64_t Engine::get_frames_drawn() { return frames_drawn; } @@ -364,6 +375,11 @@ Engine *Engine::get_singleton() { return singleton; } +bool Engine::notify_frame_server_synced() { + frame_server_synced = true; + return server_syncs > SERVER_SYNC_FRAME_COUNT_WARNING; +} + Engine::Engine() { singleton = this; } diff --git a/core/config/engine.h b/core/config/engine.h index d1495b36c2..8dece803e3 100644 --- a/core/config/engine.h +++ b/core/config/engine.h @@ -91,6 +91,10 @@ private: String write_movie_path; String shader_cache_path; + static constexpr int SERVER_SYNC_FRAME_COUNT_WARNING = 5; + int server_syncs = 0; + bool frame_server_synced = false; + public: static Engine *get_singleton(); @@ -179,6 +183,9 @@ public: bool is_generate_spirv_debug_info_enabled() const; int32_t get_gpu_index() const; + void increment_frames_drawn(); + bool notify_frame_server_synced(); + Engine(); virtual ~Engine() {} }; diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index ef3d315e4b..c10f491a11 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -35,6 +35,8 @@ #include "core/os/thread_safe.h" #include "core/templates/command_queue_mt.h" +WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1; + void WorkerThreadPool::Task::free_template_userdata() { ERR_FAIL_NULL(template_userdata); ERR_FAIL_NULL(native_func_userdata); @@ -60,11 +62,13 @@ void WorkerThreadPool::_process_task(Task *p_task) { // its pre-created threads can't have ScriptServer::thread_enter() called on them early. // Therefore, we do it late at the first opportunity, so in case the task // about to be run uses scripting, guarantees are held. + task_mutex.lock(); if (!curr_thread.ready_for_scripting && ScriptServer::are_languages_initialized()) { + task_mutex.unlock(); ScriptServer::thread_enter(); + task_mutex.lock(); curr_thread.ready_for_scripting = true; } - task_mutex.lock(); p_task->pool_thread_index = pool_thread_index; prev_task = curr_thread.current_task; curr_thread.current_task = p_task; @@ -389,83 +393,117 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { task_mutex.unlock(); if (caller_pool_thread) { - while (true) { - Task *task_to_process = nullptr; - { - MutexLock lock(task_mutex); - bool was_signaled = caller_pool_thread->signaled; - caller_pool_thread->signaled = false; - - if (task->completed) { - // This thread was awaken also for some reason, but it's about to exit. - // Let's find out what may be pending and forward the requests. - if (!exit_threads && was_signaled) { - uint32_t to_process = task_queue.first() ? 1 : 0; - uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0; - if (to_process || to_promote) { - // This thread must be left alone since it won't loop again. - caller_pool_thread->signaled = true; - _notify_threads(caller_pool_thread, to_process, to_promote); - } - } + _wait_collaboratively(caller_pool_thread, task); + task->waiting_pool--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); + } + } else { + task->done_semaphore.wait(); + task_mutex.lock(); + task->waiting_user--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); + } + task_mutex.unlock(); + } - task->waiting_pool--; - if (task->waiting_pool == 0 && task->waiting_user == 0) { - tasks.erase(p_task_id); - task_allocator.free(task); - } + return OK; +} - break; - } +void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) { + // Keep processing tasks until the condition to stop waiting is met. - if (!exit_threads) { - // This is a thread from the pool. It shouldn't just idle. - // Let's try to process other tasks while we wait. +#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed) - if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { - if (_try_promote_low_priority_task()) { - _notify_threads(caller_pool_thread, 1, 0); - } + while (true) { + Task *task_to_process = nullptr; + { + MutexLock lock(task_mutex); + bool was_signaled = p_caller_pool_thread->signaled; + p_caller_pool_thread->signaled = false; + + if (IS_WAIT_OVER) { + p_caller_pool_thread->yield_is_over = false; + if (!exit_threads && was_signaled) { + // This thread was awaken for some additional reason, but it's about to exit. + // Let's find out what may be pending and forward the requests. + uint32_t to_process = task_queue.first() ? 1 : 0; + uint32_t to_promote = p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0; + if (to_process || to_promote) { + // This thread must be left alone since it won't loop again. + p_caller_pool_thread->signaled = true; + _notify_threads(p_caller_pool_thread, to_process, to_promote); } + } + + break; + } - if (singleton->task_queue.first()) { - task_to_process = task_queue.first()->self(); - task_queue.remove(task_queue.first()); + if (!exit_threads) { + if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { + if (_try_promote_low_priority_task()) { + _notify_threads(p_caller_pool_thread, 1, 0); } + } - if (!task_to_process) { - caller_pool_thread->awaited_task = task; + if (singleton->task_queue.first()) { + task_to_process = task_queue.first()->self(); + task_queue.remove(task_queue.first()); + } - if (flushing_cmd_queue) { - flushing_cmd_queue->unlock(); - } - caller_pool_thread->cond_var.wait(lock); - if (flushing_cmd_queue) { - flushing_cmd_queue->lock(); - } + if (!task_to_process) { + p_caller_pool_thread->awaited_task = p_task; - DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed); - caller_pool_thread->awaited_task = nullptr; + if (flushing_cmd_queue) { + flushing_cmd_queue->unlock(); + } + p_caller_pool_thread->cond_var.wait(lock); + if (flushing_cmd_queue) { + flushing_cmd_queue->lock(); } - } - } - if (task_to_process) { - _process_task(task_to_process); + DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER); + p_caller_pool_thread->awaited_task = nullptr; + } } } - } else { - task->done_semaphore.wait(); - task_mutex.lock(); - task->waiting_user--; - if (task->waiting_pool == 0 && task->waiting_user == 0) { - tasks.erase(p_task_id); - task_allocator.free(task); + + if (task_to_process) { + _process_task(task_to_process); } + } +} + +void WorkerThreadPool::yield() { + int th_index = get_thread_index(); + ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread."); + _wait_collaboratively(&threads[th_index], ThreadData::YIELDING); +} + +void WorkerThreadPool::notify_yield_over(TaskID p_task_id) { + task_mutex.lock(); + Task **taskp = tasks.getptr(p_task_id); + if (!taskp) { task_mutex.unlock(); + ERR_FAIL_MSG("Invalid Task ID."); } + Task *task = *taskp; - return OK; +#ifdef DEBUG_ENABLED + if (task->pool_thread_index == get_thread_index()) { + WARN_PRINT("A worker thread is attempting to notify itself. That makes no sense."); + } +#endif + + ThreadData &td = threads[task->pool_thread_index]; + td.yield_is_over = true; + td.signaled = true; + td.cond_var.notify_one(); + + task_mutex.unlock(); } WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index fdddc9a647..64f24df79f 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -107,13 +107,21 @@ private: BinaryMutex task_mutex; struct ThreadData { + static Task *const YIELDING; // Too bad constexpr doesn't work here. + uint32_t index = 0; Thread thread; - bool ready_for_scripting = false; - bool signaled = false; + bool ready_for_scripting : 1; + bool signaled : 1; + bool yield_is_over : 1; Task *current_task = nullptr; - Task *awaited_task = nullptr; // Null if not awaiting the condition variable. Special value for idle-waiting. + Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING). ConditionVariable cond_var; + + ThreadData() : + ready_for_scripting(false), + signaled(false), + yield_is_over(false) {} }; TightLocalVector<ThreadData> threads; @@ -177,6 +185,8 @@ private: } }; + void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task); + protected: static void _bind_methods(); @@ -196,6 +206,9 @@ public: bool is_task_completed(TaskID p_task_id) const; Error wait_for_task_completion(TaskID p_task_id); + void yield(); + void notify_yield_over(TaskID p_task_id); + template <typename C, typename M, typename U> GroupID add_template_group_task(C *p_instance, M p_method, U p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String()) { typedef GroupUserData<C, M, U> GroupUD; diff --git a/core/templates/command_queue_mt.cpp b/core/templates/command_queue_mt.cpp index 6ecd75ebc1..0c5c6394a1 100644 --- a/core/templates/command_queue_mt.cpp +++ b/core/templates/command_queue_mt.cpp @@ -70,14 +70,8 @@ CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() { return &sync_sems[idx]; } -CommandQueueMT::CommandQueueMT(bool p_sync) { - if (p_sync) { - sync = memnew(Semaphore); - } +CommandQueueMT::CommandQueueMT() { } CommandQueueMT::~CommandQueueMT() { - if (sync) { - memdelete(sync); - } } diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index e26f11d28a..a4ac338bed 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -248,16 +248,17 @@ #define CMD_TYPE(N) Command##N<T, M COMMA(N) COMMA_SEP_LIST(TYPE_ARG, N)> #define CMD_ASSIGN_PARAM(N) cmd->p##N = p##N -#define DECL_PUSH(N) \ - template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \ - void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \ - CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \ - cmd->instance = p_instance; \ - cmd->method = p_method; \ - SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ - unlock(); \ - if (sync) \ - sync->post(); \ +#define DECL_PUSH(N) \ + template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \ + void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \ + CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \ + cmd->instance = p_instance; \ + cmd->method = p_method; \ + SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ + if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \ + WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \ + } \ + unlock(); \ } #define CMD_RET_TYPE(N) CommandRet##N<T, M, COMMA_SEP_LIST(TYPE_ARG, N) COMMA(N) R> @@ -272,9 +273,10 @@ SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ cmd->ret = r_ret; \ cmd->sync_sem = ss; \ + if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \ + WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \ + } \ unlock(); \ - if (sync) \ - sync->post(); \ ss->sem.wait(); \ ss->in_use = false; \ } @@ -290,9 +292,10 @@ cmd->method = p_method; \ SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ cmd->sync_sem = ss; \ + if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \ + WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \ + } \ unlock(); \ - if (sync) \ - sync->post(); \ ss->sem.wait(); \ ss->in_use = false; \ } @@ -340,7 +343,7 @@ class CommandQueueMT { LocalVector<uint8_t> command_mem; SyncSemaphore sync_sems[SYNC_SEMAPHORES]; Mutex mutex; - Semaphore *sync = nullptr; + WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID; uint64_t flush_read_ptr = 0; template <typename T> @@ -420,12 +423,16 @@ public: } void wait_and_flush() { - ERR_FAIL_NULL(sync); - sync->wait(); + ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID); + WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id); _flush(); } - CommandQueueMT(bool p_sync); + void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) { + pump_task_id = p_task_id; + } + + CommandQueueMT(); ~CommandQueueMT(); }; diff --git a/drivers/egl/egl_manager.cpp b/drivers/egl/egl_manager.cpp index 10c2119260..4eddfd027b 100644 --- a/drivers/egl/egl_manager.cpp +++ b/drivers/egl/egl_manager.cpp @@ -260,21 +260,6 @@ void EGLManager::release_current() { eglMakeCurrent(current_display.egl_display, EGL_NO_SURFACE, EGL_NO_SURFACE, EGL_NO_CONTEXT); } -void EGLManager::make_current() { - if (!current_window) { - return; - } - - if (!current_window->initialized) { - WARN_PRINT("Current OpenGL window is uninitialized!"); - return; - } - - GLDisplay ¤t_display = displays[current_window->gldisplay_id]; - - eglMakeCurrent(current_display.egl_display, current_window->egl_surface, current_window->egl_surface, current_display.egl_context); -} - void EGLManager::swap_buffers() { if (!current_window) { return; diff --git a/drivers/egl/egl_manager.h b/drivers/egl/egl_manager.h index 61d3289b2d..83779349f0 100644 --- a/drivers/egl/egl_manager.h +++ b/drivers/egl/egl_manager.h @@ -98,7 +98,6 @@ public: void window_destroy(DisplayServer::WindowID p_window_id); void release_current(); - void make_current(); void swap_buffers(); void window_make_current(DisplayServer::WindowID p_window_id); diff --git a/drivers/gles3/storage/particles_storage.cpp b/drivers/gles3/storage/particles_storage.cpp index 09878aaee4..fee90599e0 100644 --- a/drivers/gles3/storage/particles_storage.cpp +++ b/drivers/gles3/storage/particles_storage.cpp @@ -135,7 +135,6 @@ bool ParticlesStorage::particles_get_emitting(RID p_particles) { return false; } - ERR_FAIL_COND_V_MSG(RSG::threaded, false, "This function should never be used with threaded rendering, as it stalls the renderer."); Particles *particles = particles_owner.get_or_null(p_particles); ERR_FAIL_NULL_V(particles, false); @@ -380,10 +379,6 @@ void ParticlesStorage::particles_request_process(RID p_particles) { } AABB ParticlesStorage::particles_get_current_aabb(RID p_particles) { - if (RSG::threaded) { - WARN_PRINT_ONCE("Calling this function with threaded rendering enabled stalls the renderer, use with care."); - } - const Particles *particles = particles_owner.get_or_null(p_particles); ERR_FAIL_NULL_V(particles, AABB()); @@ -1207,7 +1202,6 @@ Dependency *ParticlesStorage::particles_get_dependency(RID p_particles) const { } bool ParticlesStorage::particles_is_inactive(RID p_particles) const { - ERR_FAIL_COND_V_MSG(RSG::threaded, false, "This function should never be used with threaded rendering, as it stalls the renderer."); const Particles *particles = particles_owner.get_or_null(p_particles); ERR_FAIL_NULL_V(particles, false); return !particles->emitting && particles->inactive; diff --git a/main/main.cpp b/main/main.cpp index abe15a9984..a1031b5385 100644 --- a/main/main.cpp +++ b/main/main.cpp @@ -2276,6 +2276,9 @@ Error Main::setup(const char *execpath, int argc, char *argv[], bool p_second_ph // Editor and project manager cannot run with rendering in a separate thread (they will crash on startup). rtm = OS::RENDER_THREAD_SAFE; } +#if !defined(THREADS_ENABLED) + rtm = OS::RENDER_THREAD_SAFE; +#endif OS::get_singleton()->_render_thread_mode = OS::RenderThreadMode(rtm); } @@ -2719,7 +2722,9 @@ Error Main::setup2() { } if (OS::get_singleton()->_render_thread_mode == OS::RENDER_SEPARATE_THREAD) { - WARN_PRINT("The Multi-Threaded rendering thread model is experimental, and has known issues which can lead to project crashes. Use the Single-Safe option in the project settings instead."); + WARN_PRINT("The Multi-Threaded rendering thread model is experimental. Feel free to try it since it will eventually become a stable feature.\n" + "However, bear in mind that at the moment it can lead to project crashes or instability.\n" + "So, unless you want to test the engine, use the Single-Safe option in the project settings instead."); } /* Initialize Pen Tablet Driver */ @@ -4027,11 +4032,11 @@ bool Main::iteration() { if ((!force_redraw_requested) && OS::get_singleton()->is_in_low_processor_usage_mode()) { if (RenderingServer::get_singleton()->has_changed()) { RenderingServer::get_singleton()->draw(true, scaled_step); // flush visual commands - Engine::get_singleton()->frames_drawn++; + Engine::get_singleton()->increment_frames_drawn(); } } else { RenderingServer::get_singleton()->draw(true, scaled_step); // flush visual commands - Engine::get_singleton()->frames_drawn++; + Engine::get_singleton()->increment_frames_drawn(); force_redraw_requested = false; } } diff --git a/platform/linuxbsd/wayland/display_server_wayland.cpp b/platform/linuxbsd/wayland/display_server_wayland.cpp index d00d5deb2c..a815db1c05 100644 --- a/platform/linuxbsd/wayland/display_server_wayland.cpp +++ b/platform/linuxbsd/wayland/display_server_wayland.cpp @@ -1174,14 +1174,6 @@ void DisplayServerWayland::release_rendering_thread() { #endif } -void DisplayServerWayland::make_rendering_thread() { -#ifdef GLES3_ENABLED - if (egl_manager) { - egl_manager->make_current(); - } -#endif -} - void DisplayServerWayland::swap_buffers() { #ifdef GLES3_ENABLED if (egl_manager) { diff --git a/platform/linuxbsd/wayland/display_server_wayland.h b/platform/linuxbsd/wayland/display_server_wayland.h index b7d7bee005..368f1b402b 100644 --- a/platform/linuxbsd/wayland/display_server_wayland.h +++ b/platform/linuxbsd/wayland/display_server_wayland.h @@ -276,7 +276,6 @@ public: virtual void process_events() override; virtual void release_rendering_thread() override; - virtual void make_rendering_thread() override; virtual void swap_buffers() override; virtual void set_context(Context p_context) override; diff --git a/platform/linuxbsd/x11/display_server_x11.cpp b/platform/linuxbsd/x11/display_server_x11.cpp index 9069dd423f..4c7dfbb107 100644 --- a/platform/linuxbsd/x11/display_server_x11.cpp +++ b/platform/linuxbsd/x11/display_server_x11.cpp @@ -4268,7 +4268,7 @@ bool DisplayServerX11::_window_focus_check() { } void DisplayServerX11::process_events() { - _THREAD_SAFE_METHOD_ + _THREAD_SAFE_LOCK_ #ifdef DISPLAY_SERVER_X11_DEBUG_LOGS_ENABLED static int frame = 0; @@ -5097,6 +5097,8 @@ void DisplayServerX11::process_events() { */ } + _THREAD_SAFE_UNLOCK_ + Input::get_singleton()->flush_buffered_events(); } @@ -5111,17 +5113,6 @@ void DisplayServerX11::release_rendering_thread() { #endif } -void DisplayServerX11::make_rendering_thread() { -#if defined(GLES3_ENABLED) - if (gl_manager) { - gl_manager->make_current(); - } - if (gl_manager_egl) { - gl_manager_egl->make_current(); - } -#endif -} - void DisplayServerX11::swap_buffers() { #if defined(GLES3_ENABLED) if (gl_manager) { diff --git a/platform/linuxbsd/x11/display_server_x11.h b/platform/linuxbsd/x11/display_server_x11.h index 715a8e48e6..8a7062857c 100644 --- a/platform/linuxbsd/x11/display_server_x11.h +++ b/platform/linuxbsd/x11/display_server_x11.h @@ -526,7 +526,6 @@ public: virtual void process_events() override; virtual void release_rendering_thread() override; - virtual void make_rendering_thread() override; virtual void swap_buffers() override; virtual void set_context(Context p_context) override; diff --git a/platform/linuxbsd/x11/gl_manager_x11.cpp b/platform/linuxbsd/x11/gl_manager_x11.cpp index 602dd784e0..febb7ae584 100644 --- a/platform/linuxbsd/x11/gl_manager_x11.cpp +++ b/platform/linuxbsd/x11/gl_manager_x11.cpp @@ -311,20 +311,6 @@ void GLManager_X11::window_make_current(DisplayServer::WindowID p_window_id) { _internal_set_current_window(&win); } -void GLManager_X11::make_current() { - if (!_current_window) { - return; - } - if (!_current_window->in_use) { - WARN_PRINT("current window not in use!"); - return; - } - const GLDisplay &disp = get_current_display(); - if (!glXMakeCurrent(_x_windisp.x11_display, _x_windisp.x11_window, disp.context->glx_context)) { - ERR_PRINT("glXMakeCurrent failed"); - } -} - void GLManager_X11::swap_buffers() { if (!_current_window) { return; diff --git a/platform/linuxbsd/x11/gl_manager_x11.h b/platform/linuxbsd/x11/gl_manager_x11.h index 235c7d22f9..06e147e39f 100644 --- a/platform/linuxbsd/x11/gl_manager_x11.h +++ b/platform/linuxbsd/x11/gl_manager_x11.h @@ -117,7 +117,6 @@ public: void window_resize(DisplayServer::WindowID p_window_id, int p_width, int p_height); void release_current(); - void make_current(); void swap_buffers(); void window_make_current(DisplayServer::WindowID p_window_id); diff --git a/platform/macos/display_server_macos.h b/platform/macos/display_server_macos.h index 7c9d073afc..5d38bf55ea 100644 --- a/platform/macos/display_server_macos.h +++ b/platform/macos/display_server_macos.h @@ -426,7 +426,6 @@ public: virtual void force_process_and_drop_events() override; virtual void release_rendering_thread() override; - virtual void make_rendering_thread() override; virtual void swap_buffers() override; virtual void set_native_icon(const String &p_filename) override; diff --git a/platform/macos/display_server_macos.mm b/platform/macos/display_server_macos.mm index b7a9fb1bbd..cfe925e79b 100644 --- a/platform/macos/display_server_macos.mm +++ b/platform/macos/display_server_macos.mm @@ -358,7 +358,6 @@ void DisplayServerMacOS::_dispatch_input_events(const Ref<InputEvent> &p_event) } void DisplayServerMacOS::_dispatch_input_event(const Ref<InputEvent> &p_event) { - _THREAD_SAFE_METHOD_ if (!in_dispatch_input_event) { in_dispatch_input_event = true; @@ -2986,7 +2985,7 @@ Key DisplayServerMacOS::keyboard_get_label_from_physical(Key p_keycode) const { } void DisplayServerMacOS::process_events() { - _THREAD_SAFE_METHOD_ + _THREAD_SAFE_LOCK_ while (true) { NSEvent *event = [NSApp @@ -3019,7 +3018,9 @@ void DisplayServerMacOS::process_events() { if (!drop_events) { _process_key_events(); + _THREAD_SAFE_UNLOCK_ Input::get_singleton()->flush_buffered_events(); + _THREAD_SAFE_LOCK_ } for (KeyValue<WindowID, WindowData> &E : windows) { @@ -3045,6 +3046,8 @@ void DisplayServerMacOS::process_events() { } } } + + _THREAD_SAFE_UNLOCK_ } void DisplayServerMacOS::force_process_and_drop_events() { @@ -3056,9 +3059,14 @@ void DisplayServerMacOS::force_process_and_drop_events() { } void DisplayServerMacOS::release_rendering_thread() { -} - -void DisplayServerMacOS::make_rendering_thread() { +#if defined(GLES3_ENABLED) + if (gl_manager_angle) { + gl_manager_angle->release_current(); + } + if (gl_manager_legacy) { + gl_manager_legacy->release_current(); + } +#endif } void DisplayServerMacOS::swap_buffers() { diff --git a/platform/macos/gl_manager_macos_legacy.h b/platform/macos/gl_manager_macos_legacy.h index bafe825efb..af9be8f5ba 100644 --- a/platform/macos/gl_manager_macos_legacy.h +++ b/platform/macos/gl_manager_macos_legacy.h @@ -73,7 +73,6 @@ public: void window_resize(DisplayServer::WindowID p_window_id, int p_width, int p_height); void release_current(); - void make_current(); void swap_buffers(); void window_make_current(DisplayServer::WindowID p_window_id); diff --git a/platform/macos/gl_manager_macos_legacy.mm b/platform/macos/gl_manager_macos_legacy.mm index 701de6df78..6ce3831d9c 100644 --- a/platform/macos/gl_manager_macos_legacy.mm +++ b/platform/macos/gl_manager_macos_legacy.mm @@ -117,6 +117,7 @@ void GLManagerLegacy_MacOS::release_current() { } [NSOpenGLContext clearCurrentContext]; + current_window = DisplayServer::INVALID_WINDOW_ID; } void GLManagerLegacy_MacOS::window_make_current(DisplayServer::WindowID p_window_id) { @@ -133,18 +134,6 @@ void GLManagerLegacy_MacOS::window_make_current(DisplayServer::WindowID p_window current_window = p_window_id; } -void GLManagerLegacy_MacOS::make_current() { - if (current_window == DisplayServer::INVALID_WINDOW_ID) { - return; - } - if (!windows.has(current_window)) { - return; - } - - GLWindow &win = windows[current_window]; - [win.context makeCurrentContext]; -} - void GLManagerLegacy_MacOS::swap_buffers() { GLWindow &win = windows[current_window]; [win.context flushBuffer]; diff --git a/platform/windows/display_server_windows.cpp b/platform/windows/display_server_windows.cpp index 74665664b1..b6b713687f 100644 --- a/platform/windows/display_server_windows.cpp +++ b/platform/windows/display_server_windows.cpp @@ -2962,7 +2962,7 @@ String DisplayServerWindows::keyboard_get_layout_name(int p_index) const { } void DisplayServerWindows::process_events() { - _THREAD_SAFE_METHOD_ + _THREAD_SAFE_LOCK_ MSG msg; @@ -2977,7 +2977,10 @@ void DisplayServerWindows::process_events() { if (!drop_events) { _process_key_events(); + _THREAD_SAFE_UNLOCK_ Input::get_singleton()->flush_buffered_events(); + } else { + _THREAD_SAFE_UNLOCK_ } } @@ -2990,9 +2993,14 @@ void DisplayServerWindows::force_process_and_drop_events() { } void DisplayServerWindows::release_rendering_thread() { -} - -void DisplayServerWindows::make_rendering_thread() { +#if defined(GLES3_ENABLED) + if (gl_manager_angle) { + gl_manager_angle->release_current(); + } + if (gl_manager_native) { + gl_manager_native->release_current(); + } +#endif } void DisplayServerWindows::swap_buffers() { @@ -3433,7 +3441,6 @@ void DisplayServerWindows::_dispatch_input_events(const Ref<InputEvent> &p_event } void DisplayServerWindows::_dispatch_input_event(const Ref<InputEvent> &p_event) { - _THREAD_SAFE_METHOD_ if (in_dispatch_input_event) { return; } diff --git a/platform/windows/display_server_windows.h b/platform/windows/display_server_windows.h index 4792b65801..2fe1b0733d 100644 --- a/platform/windows/display_server_windows.h +++ b/platform/windows/display_server_windows.h @@ -679,7 +679,6 @@ public: virtual void force_process_and_drop_events() override; virtual void release_rendering_thread() override; - virtual void make_rendering_thread() override; virtual void swap_buffers() override; virtual void set_native_icon(const String &p_filename) override; diff --git a/platform/windows/gl_manager_windows_native.cpp b/platform/windows/gl_manager_windows_native.cpp index da837b3f94..80cf818199 100644 --- a/platform/windows/gl_manager_windows_native.cpp +++ b/platform/windows/gl_manager_windows_native.cpp @@ -427,10 +427,6 @@ Error GLManagerNative_Windows::window_create(DisplayServer::WindowID p_window_id return OK; } -void GLManagerNative_Windows::_internal_set_current_window(GLWindow *p_win) { - _current_window = p_win; -} - void GLManagerNative_Windows::window_destroy(DisplayServer::WindowID p_window_id) { GLWindow &win = get_window(p_window_id); if (_current_window == &win) { @@ -447,6 +443,8 @@ void GLManagerNative_Windows::release_current() { if (!gd_wglMakeCurrent(_current_window->hDC, nullptr)) { ERR_PRINT("Could not detach OpenGL context from window marked current: " + format_error_message(GetLastError())); } + + _current_window = nullptr; } void GLManagerNative_Windows::window_make_current(DisplayServer::WindowID p_window_id) { @@ -467,17 +465,7 @@ void GLManagerNative_Windows::window_make_current(DisplayServer::WindowID p_wind ERR_PRINT("Could not switch OpenGL context to other window: " + format_error_message(GetLastError())); } - _internal_set_current_window(&win); -} - -void GLManagerNative_Windows::make_current() { - if (!_current_window) { - return; - } - const GLDisplay &disp = get_current_display(); - if (!gd_wglMakeCurrent(_current_window->hDC, disp.hRC)) { - ERR_PRINT("Could not switch OpenGL context to window marked current: " + format_error_message(GetLastError())); - } + _current_window = &win; } void GLManagerNative_Windows::swap_buffers() { @@ -491,7 +479,6 @@ Error GLManagerNative_Windows::initialize() { void GLManagerNative_Windows::set_use_vsync(DisplayServer::WindowID p_window_id, bool p_use) { GLWindow &win = get_window(p_window_id); - GLWindow *current = _current_window; if (&win != _current_window) { window_make_current(p_window_id); @@ -506,11 +493,6 @@ void GLManagerNative_Windows::set_use_vsync(DisplayServer::WindowID p_window_id, } else { WARN_PRINT("Could not set V-Sync mode. V-Sync is not supported."); } - - if (current != _current_window) { - _current_window = current; - make_current(); - } } bool GLManagerNative_Windows::is_using_vsync(DisplayServer::WindowID p_window_id) const { diff --git a/platform/windows/gl_manager_windows_native.h b/platform/windows/gl_manager_windows_native.h index 829c53b3d2..b4e2a3acdf 100644 --- a/platform/windows/gl_manager_windows_native.h +++ b/platform/windows/gl_manager_windows_native.h @@ -68,9 +68,6 @@ private: PFNWGLSWAPINTERVALEXTPROC wglSwapIntervalEXT = nullptr; - // funcs - void _internal_set_current_window(GLWindow *p_win); - GLWindow &get_window(unsigned int id) { return _windows[id]; } const GLWindow &get_window(unsigned int id) const { return _windows[id]; } @@ -91,7 +88,6 @@ public: void window_resize(DisplayServer::WindowID p_window_id, int p_width, int p_height) {} void release_current(); - void make_current(); void swap_buffers(); void window_make_current(DisplayServer::WindowID p_window_id); diff --git a/scene/main/window.cpp b/scene/main/window.cpp index de6bc29f05..0ccc056a8d 100644 --- a/scene/main/window.cpp +++ b/scene/main/window.cpp @@ -1579,6 +1579,7 @@ bool Window::_can_consume_input_events() const { } void Window::_window_input(const Ref<InputEvent> &p_ev) { + ERR_MAIN_THREAD_GUARD; if (EngineDebugger::is_active()) { // Quit from game window using the stop shortcut (F8 by default). // The custom shortcut is provided via environment variable when running from the editor. diff --git a/servers/display_server.cpp b/servers/display_server.cpp index 9ceb6909fe..351c03c158 100644 --- a/servers/display_server.cpp +++ b/servers/display_server.cpp @@ -697,10 +697,6 @@ void DisplayServer::release_rendering_thread() { WARN_PRINT("Rendering thread not supported by this display server."); } -void DisplayServer::make_rendering_thread() { - WARN_PRINT("Rendering thread not supported by this display server."); -} - void DisplayServer::swap_buffers() { WARN_PRINT("Swap buffers not supported by this display server."); } diff --git a/servers/display_server.h b/servers/display_server.h index f1a98c2c17..aab51644c0 100644 --- a/servers/display_server.h +++ b/servers/display_server.h @@ -559,7 +559,6 @@ public: virtual void force_process_and_drop_events(); virtual void release_rendering_thread(); - virtual void make_rendering_thread(); virtual void swap_buffers(); virtual void set_native_icon(const String &p_filename); diff --git a/servers/physics_server_2d_wrap_mt.cpp b/servers/physics_server_2d_wrap_mt.cpp index a23bb5e701..4548bb91cb 100644 --- a/servers/physics_server_2d_wrap_mt.cpp +++ b/servers/physics_server_2d_wrap_mt.cpp @@ -33,7 +33,7 @@ #include "core/os/os.h" void PhysicsServer2DWrapMT::thread_exit() { - exit.set(); + exit = true; } void PhysicsServer2DWrapMT::thread_step(real_t p_delta) { @@ -41,25 +41,18 @@ void PhysicsServer2DWrapMT::thread_step(real_t p_delta) { step_sem.post(); } -void PhysicsServer2DWrapMT::_thread_callback(void *_instance) { - PhysicsServer2DWrapMT *vsmt = reinterpret_cast<PhysicsServer2DWrapMT *>(_instance); - - vsmt->thread_loop(); -} - void PhysicsServer2DWrapMT::thread_loop() { server_thread = Thread::get_caller_id(); physics_server_2d->init(); - exit.clear(); - step_thread_up.set(); - while (!exit.is_set()) { - // flush commands one by one, until exit is requested - command_queue.wait_and_flush(); + command_queue.set_pump_task_id(server_task_id); + while (!exit) { + WorkerThreadPool::get_singleton()->yield(); + command_queue.flush_all(); } - command_queue.flush_all(); // flush all + command_queue.flush_all(); physics_server_2d->finish(); } @@ -70,18 +63,14 @@ void PhysicsServer2DWrapMT::step(real_t p_step) { if (create_thread) { command_queue.push(this, &PhysicsServer2DWrapMT::thread_step, p_step); } else { - command_queue.flush_all(); //flush all pending from other threads + command_queue.flush_all(); // Flush all pending from other threads. physics_server_2d->step(p_step); } } void PhysicsServer2DWrapMT::sync() { if (create_thread) { - if (first_frame) { - first_frame = false; - } else { - step_sem.wait(); //must not wait if a step was not issued - } + step_sem.wait(); } physics_server_2d->sync(); } @@ -96,40 +85,34 @@ void PhysicsServer2DWrapMT::end_sync() { void PhysicsServer2DWrapMT::init() { if (create_thread) { - //OS::get_singleton()->release_rendering_thread(); - thread.start(_thread_callback, this); - while (!step_thread_up.is_set()) { - OS::get_singleton()->delay_usec(1000); - } + exit = false; + server_task_id = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer2DWrapMT::thread_loop), true); + step_sem.post(); } else { physics_server_2d->init(); } } void PhysicsServer2DWrapMT::finish() { - if (thread.is_started()) { + if (create_thread) { command_queue.push(this, &PhysicsServer2DWrapMT::thread_exit); - thread.wait_to_finish(); + if (server_task_id != WorkerThreadPool::INVALID_TASK_ID) { + WorkerThreadPool::get_singleton()->wait_for_task_completion(server_task_id); + server_task_id = WorkerThreadPool::INVALID_TASK_ID; + } } else { physics_server_2d->finish(); } } -PhysicsServer2DWrapMT::PhysicsServer2DWrapMT(PhysicsServer2D *p_contained, bool p_create_thread) : - command_queue(p_create_thread) { +PhysicsServer2DWrapMT::PhysicsServer2DWrapMT(PhysicsServer2D *p_contained, bool p_create_thread) { physics_server_2d = p_contained; create_thread = p_create_thread; - - if (!p_create_thread) { - server_thread = Thread::get_caller_id(); - } else { - server_thread = 0; + if (!create_thread) { + server_thread = Thread::MAIN_ID; } - - main_thread = Thread::get_caller_id(); } PhysicsServer2DWrapMT::~PhysicsServer2DWrapMT() { memdelete(physics_server_2d); - //finish(); } diff --git a/servers/physics_server_2d_wrap_mt.h b/servers/physics_server_2d_wrap_mt.h index 3bebe5df85..5e2b3b4086 100644 --- a/servers/physics_server_2d_wrap_mt.h +++ b/servers/physics_server_2d_wrap_mt.h @@ -32,6 +32,7 @@ #define PHYSICS_SERVER_2D_WRAP_MT_H #include "core/config/project_settings.h" +#include "core/object/worker_thread_pool.h" #include "core/os/thread.h" #include "core/templates/command_queue_mt.h" #include "core/templates/safe_refcount.h" @@ -43,30 +44,27 @@ #define SYNC_DEBUG #endif +#ifdef DEBUG_ENABLED +#define MAIN_THREAD_SYNC_WARN WARN_PRINT("Call to " + String(__FUNCTION__) + " causing PhysicsServer2D synchronizations on every frame. This significantly affects performance."); +#endif + class PhysicsServer2DWrapMT : public PhysicsServer2D { - mutable PhysicsServer2D *physics_server_2d; + mutable PhysicsServer2D *physics_server_2d = nullptr; mutable CommandQueueMT command_queue; - static void _thread_callback(void *_instance); void thread_loop(); - Thread::ID server_thread; - Thread::ID main_thread; - SafeFlag exit; - Thread thread; - SafeFlag step_thread_up; + Thread::ID server_thread = Thread::UNASSIGNED_ID; + WorkerThreadPool::TaskID server_task_id = WorkerThreadPool::INVALID_TASK_ID; + bool exit = false; + Semaphore step_sem; bool create_thread = false; - Semaphore step_sem; void thread_step(real_t p_delta); void thread_exit(); - bool first_frame = true; - - Mutex alloc_mutex; - public: #define ServerName PhysicsServer2D #define ServerNameWrapMT PhysicsServer2DWrapMT @@ -94,7 +92,7 @@ public: //these work well, but should be used from the main thread only bool shape_collide(RID p_shape_A, const Transform2D &p_xform_A, const Vector2 &p_motion_A, RID p_shape_B, const Transform2D &p_xform_B, const Vector2 &p_motion_B, Vector2 *r_results, int p_result_max, int &r_result_count) override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), false); + ERR_FAIL_COND_V(!Thread::is_main_thread(), false); return physics_server_2d->shape_collide(p_shape_A, p_xform_A, p_motion_A, p_shape_B, p_xform_B, p_motion_B, r_results, p_result_max, r_result_count); } @@ -109,18 +107,18 @@ public: // this function only works on physics process, errors and returns null otherwise PhysicsDirectSpaceState2D *space_get_direct_state(RID p_space) override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), nullptr); + ERR_FAIL_COND_V(!Thread::is_main_thread(), nullptr); return physics_server_2d->space_get_direct_state(p_space); } FUNC2(space_set_debug_contacts, RID, int); virtual Vector<Vector2> space_get_contacts(RID p_space) const override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), Vector<Vector2>()); + ERR_FAIL_COND_V(!Thread::is_main_thread(), Vector<Vector2>()); return physics_server_2d->space_get_contacts(p_space); } virtual int space_get_contact_count(RID p_space) const override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), 0); + ERR_FAIL_COND_V(!Thread::is_main_thread(), 0); return physics_server_2d->space_get_contact_count(p_space); } @@ -261,13 +259,13 @@ public: FUNC2(body_set_pickable, RID, bool); bool body_test_motion(RID p_body, const MotionParameters &p_parameters, MotionResult *r_result = nullptr) override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), false); + ERR_FAIL_COND_V(!Thread::is_main_thread(), false); return physics_server_2d->body_test_motion(p_body, p_parameters, r_result); } // this function only works on physics process, errors and returns null otherwise PhysicsDirectBodyState2D *body_get_direct_state(RID p_body) override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), nullptr); + ERR_FAIL_COND_V(!Thread::is_main_thread(), nullptr); return physics_server_2d->body_get_direct_state(p_body); } @@ -338,4 +336,8 @@ public: #endif #undef SYNC_DEBUG +#ifdef DEBUG_ENABLED +#undef MAIN_THREAD_SYNC_WARN +#endif + #endif // PHYSICS_SERVER_2D_WRAP_MT_H diff --git a/servers/physics_server_3d_wrap_mt.cpp b/servers/physics_server_3d_wrap_mt.cpp index feb17cad84..f8f60281a7 100644 --- a/servers/physics_server_3d_wrap_mt.cpp +++ b/servers/physics_server_3d_wrap_mt.cpp @@ -41,22 +41,15 @@ void PhysicsServer3DWrapMT::thread_step(real_t p_delta) { step_sem.post(); } -void PhysicsServer3DWrapMT::_thread_callback(void *_instance) { - PhysicsServer3DWrapMT *vsmt = reinterpret_cast<PhysicsServer3DWrapMT *>(_instance); - - vsmt->thread_loop(); -} - void PhysicsServer3DWrapMT::thread_loop() { server_thread = Thread::get_caller_id(); physics_server_3d->init(); - exit = false; - step_thread_up = true; + command_queue.set_pump_task_id(server_task_id); while (!exit) { - // flush commands one by one, until exit is requested - command_queue.wait_and_flush(); + WorkerThreadPool::get_singleton()->yield(); + command_queue.flush_all(); } command_queue.flush_all(); // flush all @@ -70,18 +63,14 @@ void PhysicsServer3DWrapMT::step(real_t p_step) { if (create_thread) { command_queue.push(this, &PhysicsServer3DWrapMT::thread_step, p_step); } else { - command_queue.flush_all(); //flush all pending from other threads + command_queue.flush_all(); // Flush all pending from other threads. physics_server_3d->step(p_step); } } void PhysicsServer3DWrapMT::sync() { if (create_thread) { - if (first_frame) { - first_frame = false; - } else { - step_sem.wait(); //must not wait if a step was not issued - } + step_sem.wait(); } physics_server_3d->sync(); } @@ -96,40 +85,34 @@ void PhysicsServer3DWrapMT::end_sync() { void PhysicsServer3DWrapMT::init() { if (create_thread) { - //OS::get_singleton()->release_rendering_thread(); - thread.start(_thread_callback, this); - while (!step_thread_up) { - OS::get_singleton()->delay_usec(1000); - } + exit = false; + server_task_id = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer3DWrapMT::thread_loop), true); + step_sem.post(); } else { physics_server_3d->init(); } } void PhysicsServer3DWrapMT::finish() { - if (thread.is_started()) { + if (create_thread) { command_queue.push(this, &PhysicsServer3DWrapMT::thread_exit); - thread.wait_to_finish(); + if (server_task_id != WorkerThreadPool::INVALID_TASK_ID) { + WorkerThreadPool::get_singleton()->wait_for_task_completion(server_task_id); + server_task_id = WorkerThreadPool::INVALID_TASK_ID; + } } else { physics_server_3d->finish(); } } -PhysicsServer3DWrapMT::PhysicsServer3DWrapMT(PhysicsServer3D *p_contained, bool p_create_thread) : - command_queue(p_create_thread) { +PhysicsServer3DWrapMT::PhysicsServer3DWrapMT(PhysicsServer3D *p_contained, bool p_create_thread) { physics_server_3d = p_contained; create_thread = p_create_thread; - - if (!p_create_thread) { - server_thread = Thread::get_caller_id(); - } else { - server_thread = 0; + if (!create_thread) { + server_thread = Thread::MAIN_ID; } - - main_thread = Thread::get_caller_id(); } PhysicsServer3DWrapMT::~PhysicsServer3DWrapMT() { memdelete(physics_server_3d); - //finish(); } diff --git a/servers/physics_server_3d_wrap_mt.h b/servers/physics_server_3d_wrap_mt.h index fc8930977d..22f3ee0e45 100644 --- a/servers/physics_server_3d_wrap_mt.h +++ b/servers/physics_server_3d_wrap_mt.h @@ -32,6 +32,7 @@ #define PHYSICS_SERVER_3D_WRAP_MT_H #include "core/config/project_settings.h" +#include "core/object/worker_thread_pool.h" #include "core/os/thread.h" #include "core/templates/command_queue_mt.h" #include "servers/physics_server_3d.h" @@ -42,30 +43,27 @@ #define SYNC_DEBUG #endif +#ifdef DEBUG_ENABLED +#define MAIN_THREAD_SYNC_WARN WARN_PRINT("Call to " + String(__FUNCTION__) + " causing PhysicsServer3D synchronizations on every frame. This significantly affects performance."); +#endif + class PhysicsServer3DWrapMT : public PhysicsServer3D { - mutable PhysicsServer3D *physics_server_3d; + mutable PhysicsServer3D *physics_server_3d = nullptr; mutable CommandQueueMT command_queue; - static void _thread_callback(void *_instance); void thread_loop(); - Thread::ID server_thread; - Thread::ID main_thread; - volatile bool exit = false; - Thread thread; - volatile bool step_thread_up = false; + Thread::ID server_thread = Thread::UNASSIGNED_ID; + WorkerThreadPool::TaskID server_task_id = WorkerThreadPool::INVALID_TASK_ID; + bool exit = false; + Semaphore step_sem; bool create_thread = false; - Semaphore step_sem; void thread_step(real_t p_delta); void thread_exit(); - bool first_frame = true; - - Mutex alloc_mutex; - public: #define ServerName PhysicsServer3D #define ServerNameWrapMT PhysicsServer3DWrapMT @@ -98,7 +96,7 @@ public: #if 0 //these work well, but should be used from the main thread only bool shape_collide(RID p_shape_A, const Transform &p_xform_A, const Vector3 &p_motion_A, RID p_shape_B, const Transform &p_xform_B, const Vector3 &p_motion_B, Vector3 *r_results, int p_result_max, int &r_result_count) { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), false); + ERR_FAIL_COND_V(!Thread::is_main_thread(), false); return physics_server_3d->shape_collide(p_shape_A, p_xform_A, p_motion_A, p_shape_B, p_xform_B, p_motion_B, r_results, p_result_max, r_result_count); } #endif @@ -113,18 +111,18 @@ public: // this function only works on physics process, errors and returns null otherwise PhysicsDirectSpaceState3D *space_get_direct_state(RID p_space) override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), nullptr); + ERR_FAIL_COND_V(!Thread::is_main_thread(), nullptr); return physics_server_3d->space_get_direct_state(p_space); } FUNC2(space_set_debug_contacts, RID, int); virtual Vector<Vector3> space_get_contacts(RID p_space) const override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), Vector<Vector3>()); + ERR_FAIL_COND_V(!Thread::is_main_thread(), Vector<Vector3>()); return physics_server_3d->space_get_contacts(p_space); } virtual int space_get_contact_count(RID p_space) const override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), 0); + ERR_FAIL_COND_V(!Thread::is_main_thread(), 0); return physics_server_3d->space_get_contact_count(p_space); } @@ -260,13 +258,13 @@ public: FUNC2(body_set_ray_pickable, RID, bool); bool body_test_motion(RID p_body, const MotionParameters &p_parameters, MotionResult *r_result = nullptr) override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), false); + ERR_FAIL_COND_V(!Thread::is_main_thread(), false); return physics_server_3d->body_test_motion(p_body, p_parameters, r_result); } // this function only works on physics process, errors and returns null otherwise PhysicsDirectBodyState3D *body_get_direct_state(RID p_body) override { - ERR_FAIL_COND_V(main_thread != Thread::get_caller_id(), nullptr); + ERR_FAIL_COND_V(!Thread::is_main_thread(), nullptr); return physics_server_3d->body_get_direct_state(p_body); } @@ -411,4 +409,8 @@ public: #endif #undef SYNC_DEBUG +#ifdef DEBUG_ENABLED +#undef MAIN_THREAD_SYNC_WARN +#endif + #endif // PHYSICS_SERVER_3D_WRAP_MT_H diff --git a/servers/rendering/renderer_rd/storage_rd/particles_storage.cpp b/servers/rendering/renderer_rd/storage_rd/particles_storage.cpp index c9c7c53d04..f7b28e7a1e 100644 --- a/servers/rendering/renderer_rd/storage_rd/particles_storage.cpp +++ b/servers/rendering/renderer_rd/storage_rd/particles_storage.cpp @@ -257,7 +257,6 @@ void ParticlesStorage::particles_set_emitting(RID p_particles, bool p_emitting) } bool ParticlesStorage::particles_get_emitting(RID p_particles) { - ERR_FAIL_COND_V_MSG(RSG::threaded, false, "This function should never be used with threaded rendering, as it stalls the renderer."); Particles *particles = particles_owner.get_or_null(p_particles); ERR_FAIL_NULL_V(particles, false); @@ -608,10 +607,6 @@ void ParticlesStorage::particles_request_process(RID p_particles) { } AABB ParticlesStorage::particles_get_current_aabb(RID p_particles) { - if (RSG::threaded) { - WARN_PRINT_ONCE("Calling this function with threaded rendering enabled stalls the renderer, use with care."); - } - const Particles *particles = particles_owner.get_or_null(p_particles); ERR_FAIL_NULL_V(particles, AABB()); @@ -1642,7 +1637,6 @@ Dependency *ParticlesStorage::particles_get_dependency(RID p_particles) const { } bool ParticlesStorage::particles_is_inactive(RID p_particles) const { - ERR_FAIL_COND_V_MSG(RSG::threaded, false, "This function should never be used with threaded rendering, as it stalls the renderer."); const Particles *particles = particles_owner.get_or_null(p_particles); ERR_FAIL_NULL_V(particles, false); return !particles->emitting && particles->inactive; diff --git a/servers/rendering/rendering_server_default.cpp b/servers/rendering/rendering_server_default.cpp index 5bf0ab0ba6..7e5ccee0e3 100644 --- a/servers/rendering/rendering_server_default.cpp +++ b/servers/rendering/rendering_server_default.cpp @@ -69,9 +69,6 @@ void RenderingServerDefault::request_frame_drawn_callback(const Callable &p_call } void RenderingServerDefault::_draw(bool p_swap_buffers, double frame_step) { - //needs to be done before changes is reset to 0, to not force the editor to redraw - RS::get_singleton()->emit_signal(SNAME("frame_pre_draw")); - changes = 0; RSG::rasterizer->begin_frame(frame_step); @@ -220,16 +217,9 @@ void RenderingServerDefault::_finish() { void RenderingServerDefault::init() { if (create_thread) { - print_verbose("RenderingServerWrapMT: Creating render thread"); + print_verbose("RenderingServerWrapMT: Starting render thread"); DisplayServer::get_singleton()->release_rendering_thread(); - if (create_thread) { - thread.start(_thread_callback, this); - print_verbose("RenderingServerWrapMT: Starting render thread"); - } - while (!draw_thread_up.is_set()) { - OS::get_singleton()->delay_usec(1000); - } - print_verbose("RenderingServerWrapMT: Finished render thread"); + server_task_id = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &RenderingServerDefault::_thread_loop), true); } else { _init(); } @@ -238,8 +228,9 @@ void RenderingServerDefault::init() { void RenderingServerDefault::finish() { if (create_thread) { command_queue.push(this, &RenderingServerDefault::_thread_exit); - if (thread.is_started()) { - thread.wait_to_finish(); + if (server_task_id != WorkerThreadPool::INVALID_TASK_ID) { + WorkerThreadPool::get_singleton()->wait_for_task_completion(server_task_id); + server_task_id = WorkerThreadPool::INVALID_TASK_ID; } } else { _finish(); @@ -337,38 +328,29 @@ Size2i RenderingServerDefault::get_maximum_viewport_size() const { } void RenderingServerDefault::_thread_exit() { - exit.set(); + exit = true; } void RenderingServerDefault::_thread_draw(bool p_swap_buffers, double frame_step) { _draw(p_swap_buffers, frame_step); } -void RenderingServerDefault::_thread_flush() { -} - -void RenderingServerDefault::_thread_callback(void *_instance) { - RenderingServerDefault *vsmt = reinterpret_cast<RenderingServerDefault *>(_instance); - - vsmt->_thread_loop(); -} - void RenderingServerDefault::_thread_loop() { server_thread = Thread::get_caller_id(); - DisplayServer::get_singleton()->make_rendering_thread(); - + DisplayServer::get_singleton()->gl_window_make_current(DisplayServer::MAIN_WINDOW_ID); // Move GL to this thread. _init(); - draw_thread_up.set(); - while (!exit.is_set()) { - // flush commands one by one, until exit is requested - command_queue.wait_and_flush(); + command_queue.set_pump_task_id(server_task_id); + while (!exit) { + WorkerThreadPool::get_singleton()->yield(); + command_queue.flush_all(); } - command_queue.flush_all(); // flush all + command_queue.flush_all(); _finish(); + DisplayServer::get_singleton()->release_rendering_thread(); } /* INTERPOLATION */ @@ -384,15 +366,15 @@ void RenderingServerDefault::set_physics_interpolation_enabled(bool p_enabled) { /* EVENT QUEUING */ void RenderingServerDefault::sync() { - if (create_thread) { - command_queue.push_and_sync(this, &RenderingServerDefault::_thread_flush); - } else { - command_queue.flush_all(); //flush all pending from other threads + if (!create_thread) { + command_queue.flush_all(); // Flush all pending from other threads. } } void RenderingServerDefault::draw(bool p_swap_buffers, double frame_step) { ERR_FAIL_COND_MSG(!Thread::is_main_thread(), "Manually triggering the draw function from the RenderingServer can only be done on the main thread. Call this function from the main thread or use call_deferred()."); + // Needs to be done before changes is reset to 0, to not force the editor to redraw. + RS::get_singleton()->emit_signal(SNAME("frame_pre_draw")); if (create_thread) { command_queue.push(this, &RenderingServerDefault::_thread_draw, p_swap_buffers, frame_step); } else { @@ -404,21 +386,14 @@ void RenderingServerDefault::_call_on_render_thread(const Callable &p_callable) p_callable.call(); } -RenderingServerDefault::RenderingServerDefault(bool p_create_thread) : - command_queue(p_create_thread) { +RenderingServerDefault::RenderingServerDefault(bool p_create_thread) { RenderingServer::init(); -#ifdef THREADS_ENABLED create_thread = p_create_thread; if (!create_thread) { - server_thread = Thread::get_caller_id(); - } else { - server_thread = 0; + server_thread = Thread::MAIN_ID; } -#else - create_thread = false; - server_thread = Thread::get_main_id(); -#endif + RSG::threaded = create_thread; RSG::canvas = memnew(RendererCanvasCull); diff --git a/servers/rendering/rendering_server_default.h b/servers/rendering/rendering_server_default.h index c50472c0cd..f94323f198 100644 --- a/servers/rendering/rendering_server_default.h +++ b/servers/rendering/rendering_server_default.h @@ -31,6 +31,7 @@ #ifndef RENDERING_SERVER_DEFAULT_H #define RENDERING_SERVER_DEFAULT_H +#include "core/object/worker_thread_pool.h" #include "core/os/thread.h" #include "core/templates/command_queue_mt.h" #include "core/templates/hash_map.h" @@ -75,22 +76,17 @@ class RenderingServerDefault : public RenderingServer { mutable CommandQueueMT command_queue; - static void _thread_callback(void *_instance); void _thread_loop(); - Thread::ID server_thread = 0; - SafeFlag exit; - Thread thread; - SafeFlag draw_thread_up; - bool create_thread; + Thread::ID server_thread = Thread::UNASSIGNED_ID; + WorkerThreadPool::TaskID server_task_id = WorkerThreadPool::INVALID_TASK_ID; + bool exit = false; + bool create_thread = false; void _thread_draw(bool p_swap_buffers, double frame_step); - void _thread_flush(); void _thread_exit(); - Mutex alloc_mutex; - void _draw(bool p_swap_buffers, double frame_step); void _init(); void _finish(); @@ -127,6 +123,10 @@ public: #define SYNC_DEBUG #endif +#ifdef DEBUG_ENABLED +#define MAIN_THREAD_SYNC_WARN WARN_PRINT("Call to " + String(__FUNCTION__) + " causing RenderingServer synchronizations on every frame. This significantly affects performance."); +#endif + #include "servers/server_wrap_mt_common.h" /* TEXTURE API */ @@ -1013,6 +1013,9 @@ public: #undef ServerName #undef WRITE_ACTION #undef SYNC_DEBUG +#ifdef DEBUG_ENABLED +#undef MAIN_THREAD_SYNC_WARN +#endif virtual uint64_t get_rendering_info(RenderingInfo p_info) override; virtual RenderingDevice::DeviceType get_video_adapter_type() const override; diff --git a/servers/rendering_server.cpp b/servers/rendering_server.cpp index 96d317ebd3..bbe6b1ad0d 100644 --- a/servers/rendering_server.cpp +++ b/servers/rendering_server.cpp @@ -83,25 +83,16 @@ static PackedInt64Array to_int_array(const Vector<ObjectID> &ids) { } PackedInt64Array RenderingServer::_instances_cull_aabb_bind(const AABB &p_aabb, RID p_scenario) const { - if (RSG::threaded) { - WARN_PRINT_ONCE("Using this function with a threaded renderer hurts performance, as it causes a server stall."); - } Vector<ObjectID> ids = instances_cull_aabb(p_aabb, p_scenario); return to_int_array(ids); } PackedInt64Array RenderingServer::_instances_cull_ray_bind(const Vector3 &p_from, const Vector3 &p_to, RID p_scenario) const { - if (RSG::threaded) { - WARN_PRINT_ONCE("Using this function with a threaded renderer hurts performance, as it causes a server stall."); - } Vector<ObjectID> ids = instances_cull_ray(p_from, p_to, p_scenario); return to_int_array(ids); } PackedInt64Array RenderingServer::_instances_cull_convex_bind(const TypedArray<Plane> &p_convex, RID p_scenario) const { - if (RSG::threaded) { - WARN_PRINT_ONCE("Using this function with a threaded renderer hurts performance, as it causes a server stall."); - } Vector<Plane> planes; for (int i = 0; i < p_convex.size(); ++i) { const Variant &v = p_convex[i]; diff --git a/servers/server_wrap_mt_common.h b/servers/server_wrap_mt_common.h index 1a73c97fc7..40867490ca 100644 --- a/servers/server_wrap_mt_common.h +++ b/servers/server_wrap_mt_common.h @@ -31,12 +31,22 @@ #ifndef SERVER_WRAP_MT_COMMON_H #define SERVER_WRAP_MT_COMMON_H +#ifdef DEBIG_ENABLED +#define MAIN_THREAD_SYNC_CHECK \ + if (unlikely(Thread::is_main_thread() && Engine::get_singleton()->notify_frame_server_synced())) { \ + MAIN_THREAD_SYNC_WARN \ + } +#else +#define MAIN_THREAD_SYNC_CHECK +#endif + #define FUNC0R(m_r, m_type) \ virtual m_r m_type() override { \ if (Thread::get_caller_id() != server_thread) { \ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -68,6 +78,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -102,6 +113,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(); \ @@ -113,6 +125,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(); \ @@ -128,6 +141,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -141,6 +155,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -154,6 +169,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1); \ @@ -165,6 +181,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1); \ @@ -199,6 +216,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -212,6 +230,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -225,6 +244,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2); \ @@ -236,6 +256,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2); \ @@ -270,6 +291,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -283,6 +305,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -296,6 +319,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3); \ @@ -307,6 +331,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3); \ @@ -341,6 +366,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -354,6 +380,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -367,6 +394,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4); \ @@ -378,6 +406,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4); \ @@ -412,6 +441,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -425,6 +455,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -438,6 +469,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4, p5); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4, p5); \ @@ -449,6 +481,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4, p5); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4, p5); \ @@ -483,6 +516,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -496,6 +530,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -509,6 +544,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4, p5, p6); \ @@ -520,6 +556,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4, p5, p6); \ @@ -554,6 +591,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, p7, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -567,6 +605,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, p7, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -580,6 +619,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, p7); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4, p5, p6, p7); \ @@ -591,6 +631,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, p7); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4, p5, p6, p7); \ @@ -625,6 +666,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, p7, p8, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -638,6 +680,7 @@ m_r ret; \ command_queue.push_and_ret(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, p7, p8, &ret); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ return ret; \ } else { \ command_queue.flush_if_pending(); \ @@ -651,6 +694,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, p7, p8); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4, p5, p6, p7, p8); \ @@ -662,6 +706,7 @@ if (Thread::get_caller_id() != server_thread) { \ command_queue.push_and_sync(server_name, &ServerName::m_type, p1, p2, p3, p4, p5, p6, p7, p8); \ SYNC_DEBUG \ + MAIN_THREAD_SYNC_CHECK \ } else { \ command_queue.flush_if_pending(); \ server_name->m_type(p1, p2, p3, p4, p5, p6, p7, p8); \ diff --git a/tests/core/templates/test_command_queue.h b/tests/core/templates/test_command_queue.h index e94c108694..d2957b5c40 100644 --- a/tests/core/templates/test_command_queue.h +++ b/tests/core/templates/test_command_queue.h @@ -33,6 +33,7 @@ #include "core/config/project_settings.h" #include "core/math/random_number_generator.h" +#include "core/object/worker_thread_pool.h" #include "core/os/os.h" #include "core/os/thread.h" #include "core/templates/command_queue_mt.h" @@ -100,7 +101,7 @@ public: ThreadWork reader_threadwork; ThreadWork writer_threadwork; - CommandQueueMT command_queue = CommandQueueMT(true); + CommandQueueMT command_queue; enum TestMsgType { TEST_MSG_FUNC1_TRANSFORM, @@ -119,6 +120,7 @@ public: bool exit_threads = false; Thread reader_thread; + WorkerThreadPool::TaskID reader_task_id = WorkerThreadPool::INVALID_TASK_ID; Thread writer_thread; int func1_count = 0; @@ -148,11 +150,16 @@ public: void reader_thread_loop() { reader_threadwork.thread_wait_for_work(); while (!exit_threads) { - if (message_count_to_read < 0) { + if (reader_task_id == WorkerThreadPool::INVALID_TASK_ID) { command_queue.flush_all(); - } - for (int i = 0; i < message_count_to_read; i++) { - command_queue.wait_and_flush(); + } else { + if (message_count_to_read < 0) { + command_queue.flush_all(); + } + for (int i = 0; i < message_count_to_read; i++) { + WorkerThreadPool::get_singleton()->yield(); + command_queue.wait_and_flush(); + } } message_count_to_read = 0; @@ -216,8 +223,13 @@ public: sts->writer_thread_loop(); } - void init_threads() { - reader_thread.start(&SharedThreadState::static_reader_thread_loop, this); + void init_threads(bool p_use_thread_pool_sync = false) { + if (p_use_thread_pool_sync) { + reader_task_id = WorkerThreadPool::get_singleton()->add_native_task(&SharedThreadState::static_reader_thread_loop, this, true); + command_queue.set_pump_task_id(reader_task_id); + } else { + reader_thread.start(&SharedThreadState::static_reader_thread_loop, this); + } writer_thread.start(&SharedThreadState::static_writer_thread_loop, this); } void destroy_threads() { @@ -225,16 +237,20 @@ public: reader_threadwork.main_start_work(); writer_threadwork.main_start_work(); - reader_thread.wait_to_finish(); + if (reader_task_id != WorkerThreadPool::INVALID_TASK_ID) { + WorkerThreadPool::get_singleton()->wait_for_task_completion(reader_task_id); + } else { + reader_thread.wait_to_finish(); + } writer_thread.wait_to_finish(); } }; -TEST_CASE("[CommandQueue] Test Queue Basics") { +static void test_command_queue_basic(bool p_use_thread_pool_sync) { const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb"; ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1); SharedThreadState sts; - sts.init_threads(); + sts.init_threads(p_use_thread_pool_sync); sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM); sts.writer_threadwork.main_start_work(); @@ -272,6 +288,14 @@ TEST_CASE("[CommandQueue] Test Queue Basics") { ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING)); } +TEST_CASE("[CommandQueue] Test Queue Basics") { + test_command_queue_basic(false); +} + +TEST_CASE("[CommandQueue] Test Queue Basics with WorkerThreadPool sync.") { + test_command_queue_basic(true); +} + TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") { const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb"; ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1); diff --git a/tests/core/threads/test_worker_thread_pool.h b/tests/core/threads/test_worker_thread_pool.h index e9a762b57b..0a0291d11b 100644 --- a/tests/core/threads/test_worker_thread_pool.h +++ b/tests/core/threads/test_worker_thread_pool.h @@ -38,6 +38,7 @@ namespace TestWorkerThreadPool { static LocalVector<SafeNumeric<int>> counter; +static SafeFlag exit; static void static_test(void *p_arg) { counter[(uint64_t)p_arg].increment(); @@ -106,6 +107,72 @@ TEST_CASE("[WorkerThreadPool] Process elements using group tasks") { } } +static void static_test_daemon(void *p_arg) { + while (!exit.is_set()) { + counter[0].add(1); + WorkerThreadPool::get_singleton()->yield(); + } +} + +static void static_busy_task(void *p_arg) { + while (!exit.is_set()) { + OS::get_singleton()->delay_usec(1); + } +} + +static void static_legit_task(void *p_arg) { + *((bool *)p_arg) = counter[0].get() > 0; + counter[1].add(1); +} + +TEST_CASE("[WorkerThreadPool] Run a yielding daemon as the only hope for other tasks to run") { + exit.clear(); + counter.clear(); + counter.resize(2); + + WorkerThreadPool::TaskID daemon_task_id = WorkerThreadPool::get_singleton()->add_native_task(static_test_daemon, nullptr, true); + + int num_threads = WorkerThreadPool::get_singleton()->get_thread_count(); + + // Keep all the other threads busy. + LocalVector<WorkerThreadPool::TaskID> task_ids; + for (int i = 0; i < num_threads - 1; i++) { + task_ids.push_back(WorkerThreadPool::get_singleton()->add_native_task(static_busy_task, nullptr, true)); + } + + LocalVector<WorkerThreadPool::TaskID> legit_task_ids; + LocalVector<bool> legit_task_needed_yield; + int legit_tasks_count = num_threads * 4; + legit_task_needed_yield.resize(legit_tasks_count); + for (int i = 0; i < legit_tasks_count; i++) { + legit_task_needed_yield[i] = false; + task_ids.push_back(WorkerThreadPool::get_singleton()->add_native_task(static_legit_task, &legit_task_needed_yield[i], i >= legit_tasks_count / 2)); + } + + while (counter[1].get() != legit_tasks_count) { + OS::get_singleton()->delay_usec(1); + } + + exit.set(); + for (uint32_t i = 0; i < task_ids.size(); i++) { + WorkerThreadPool::get_singleton()->wait_for_task_completion(task_ids[i]); + } + WorkerThreadPool::get_singleton()->notify_yield_over(daemon_task_id); + WorkerThreadPool::get_singleton()->wait_for_task_completion(daemon_task_id); + + CHECK_MESSAGE(counter[0].get() > 0, "Daemon task should have looped at least once."); + CHECK_MESSAGE(counter[1].get() == legit_tasks_count, "All legit tasks should have been able to run."); + + bool all_needed_yield = true; + for (int i = 0; i < legit_tasks_count; i++) { + if (!legit_task_needed_yield[i]) { + all_needed_yield = false; + break; + } + } + CHECK_MESSAGE(all_needed_yield, "All legit tasks should have needed the daemon yielding to run."); +} + } // namespace TestWorkerThreadPool #endif // TEST_WORKER_THREAD_POOL_H |