diff options
author | Pedro J. Estébanez <pedrojrulez@gmail.com> | 2024-04-09 17:26:45 +0200 |
---|---|---|
committer | Pedro J. Estébanez <pedrojrulez@gmail.com> | 2024-04-10 18:47:42 +0200 |
commit | 1b104ffcd8bc4573924754552508f5416573a7a1 (patch) | |
tree | dbe714f1a13ca22d34404a1e297ba0859b47b930 /tests/core | |
parent | 71facbaa882494f5c3dccf6a799ce84dcd12e4c9 (diff) | |
download | redot-engine-1b104ffcd8bc4573924754552508f5416573a7a1.tar.gz |
WorkerThreadPool: Support daemon-like tasks (via yield semantics)
Diffstat (limited to 'tests/core')
-rw-r--r-- | tests/core/templates/test_command_queue.h | 44 | ||||
-rw-r--r-- | tests/core/threads/test_worker_thread_pool.h | 67 |
2 files changed, 101 insertions, 10 deletions
diff --git a/tests/core/templates/test_command_queue.h b/tests/core/templates/test_command_queue.h index e94c108694..d2957b5c40 100644 --- a/tests/core/templates/test_command_queue.h +++ b/tests/core/templates/test_command_queue.h @@ -33,6 +33,7 @@ #include "core/config/project_settings.h" #include "core/math/random_number_generator.h" +#include "core/object/worker_thread_pool.h" #include "core/os/os.h" #include "core/os/thread.h" #include "core/templates/command_queue_mt.h" @@ -100,7 +101,7 @@ public: ThreadWork reader_threadwork; ThreadWork writer_threadwork; - CommandQueueMT command_queue = CommandQueueMT(true); + CommandQueueMT command_queue; enum TestMsgType { TEST_MSG_FUNC1_TRANSFORM, @@ -119,6 +120,7 @@ public: bool exit_threads = false; Thread reader_thread; + WorkerThreadPool::TaskID reader_task_id = WorkerThreadPool::INVALID_TASK_ID; Thread writer_thread; int func1_count = 0; @@ -148,11 +150,16 @@ public: void reader_thread_loop() { reader_threadwork.thread_wait_for_work(); while (!exit_threads) { - if (message_count_to_read < 0) { + if (reader_task_id == WorkerThreadPool::INVALID_TASK_ID) { command_queue.flush_all(); - } - for (int i = 0; i < message_count_to_read; i++) { - command_queue.wait_and_flush(); + } else { + if (message_count_to_read < 0) { + command_queue.flush_all(); + } + for (int i = 0; i < message_count_to_read; i++) { + WorkerThreadPool::get_singleton()->yield(); + command_queue.wait_and_flush(); + } } message_count_to_read = 0; @@ -216,8 +223,13 @@ public: sts->writer_thread_loop(); } - void init_threads() { - reader_thread.start(&SharedThreadState::static_reader_thread_loop, this); + void init_threads(bool p_use_thread_pool_sync = false) { + if (p_use_thread_pool_sync) { + reader_task_id = WorkerThreadPool::get_singleton()->add_native_task(&SharedThreadState::static_reader_thread_loop, this, true); + command_queue.set_pump_task_id(reader_task_id); + } else { + reader_thread.start(&SharedThreadState::static_reader_thread_loop, this); + } writer_thread.start(&SharedThreadState::static_writer_thread_loop, this); } void destroy_threads() { @@ -225,16 +237,20 @@ public: reader_threadwork.main_start_work(); writer_threadwork.main_start_work(); - reader_thread.wait_to_finish(); + if (reader_task_id != WorkerThreadPool::INVALID_TASK_ID) { + WorkerThreadPool::get_singleton()->wait_for_task_completion(reader_task_id); + } else { + reader_thread.wait_to_finish(); + } writer_thread.wait_to_finish(); } }; -TEST_CASE("[CommandQueue] Test Queue Basics") { +static void test_command_queue_basic(bool p_use_thread_pool_sync) { const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb"; ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1); SharedThreadState sts; - sts.init_threads(); + sts.init_threads(p_use_thread_pool_sync); sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM); sts.writer_threadwork.main_start_work(); @@ -272,6 +288,14 @@ TEST_CASE("[CommandQueue] Test Queue Basics") { ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING)); } +TEST_CASE("[CommandQueue] Test Queue Basics") { + test_command_queue_basic(false); +} + +TEST_CASE("[CommandQueue] Test Queue Basics with WorkerThreadPool sync.") { + test_command_queue_basic(true); +} + TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") { const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb"; ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1); diff --git a/tests/core/threads/test_worker_thread_pool.h b/tests/core/threads/test_worker_thread_pool.h index e9a762b57b..0a0291d11b 100644 --- a/tests/core/threads/test_worker_thread_pool.h +++ b/tests/core/threads/test_worker_thread_pool.h @@ -38,6 +38,7 @@ namespace TestWorkerThreadPool { static LocalVector<SafeNumeric<int>> counter; +static SafeFlag exit; static void static_test(void *p_arg) { counter[(uint64_t)p_arg].increment(); @@ -106,6 +107,72 @@ TEST_CASE("[WorkerThreadPool] Process elements using group tasks") { } } +static void static_test_daemon(void *p_arg) { + while (!exit.is_set()) { + counter[0].add(1); + WorkerThreadPool::get_singleton()->yield(); + } +} + +static void static_busy_task(void *p_arg) { + while (!exit.is_set()) { + OS::get_singleton()->delay_usec(1); + } +} + +static void static_legit_task(void *p_arg) { + *((bool *)p_arg) = counter[0].get() > 0; + counter[1].add(1); +} + +TEST_CASE("[WorkerThreadPool] Run a yielding daemon as the only hope for other tasks to run") { + exit.clear(); + counter.clear(); + counter.resize(2); + + WorkerThreadPool::TaskID daemon_task_id = WorkerThreadPool::get_singleton()->add_native_task(static_test_daemon, nullptr, true); + + int num_threads = WorkerThreadPool::get_singleton()->get_thread_count(); + + // Keep all the other threads busy. + LocalVector<WorkerThreadPool::TaskID> task_ids; + for (int i = 0; i < num_threads - 1; i++) { + task_ids.push_back(WorkerThreadPool::get_singleton()->add_native_task(static_busy_task, nullptr, true)); + } + + LocalVector<WorkerThreadPool::TaskID> legit_task_ids; + LocalVector<bool> legit_task_needed_yield; + int legit_tasks_count = num_threads * 4; + legit_task_needed_yield.resize(legit_tasks_count); + for (int i = 0; i < legit_tasks_count; i++) { + legit_task_needed_yield[i] = false; + task_ids.push_back(WorkerThreadPool::get_singleton()->add_native_task(static_legit_task, &legit_task_needed_yield[i], i >= legit_tasks_count / 2)); + } + + while (counter[1].get() != legit_tasks_count) { + OS::get_singleton()->delay_usec(1); + } + + exit.set(); + for (uint32_t i = 0; i < task_ids.size(); i++) { + WorkerThreadPool::get_singleton()->wait_for_task_completion(task_ids[i]); + } + WorkerThreadPool::get_singleton()->notify_yield_over(daemon_task_id); + WorkerThreadPool::get_singleton()->wait_for_task_completion(daemon_task_id); + + CHECK_MESSAGE(counter[0].get() > 0, "Daemon task should have looped at least once."); + CHECK_MESSAGE(counter[1].get() == legit_tasks_count, "All legit tasks should have been able to run."); + + bool all_needed_yield = true; + for (int i = 0; i < legit_tasks_count; i++) { + if (!legit_task_needed_yield[i]) { + all_needed_yield = false; + break; + } + } + CHECK_MESSAGE(all_needed_yield, "All legit tasks should have needed the daemon yielding to run."); +} + } // namespace TestWorkerThreadPool #endif // TEST_WORKER_THREAD_POOL_H |