diff options
author | Pedro J. Estébanez <pedrojrulez@gmail.com> | 2023-12-29 01:27:17 +0100 |
---|---|---|
committer | Pedro J. Estébanez <pedrojrulez@gmail.com> | 2024-01-08 12:45:43 +0100 |
commit | ae418f9469937b9a438a509bf359da9039cbee37 (patch) | |
tree | 0fbda5527df158c74ab9fb041413d3278bbf5754 /core/templates/command_queue_mt.h | |
parent | 9444d297ed0b1dbc7c05fa0bf2e06241335f5057 (diff) | |
download | redot-engine-ae418f9469937b9a438a509bf359da9039cbee37.tar.gz |
WorkerThreadPool: Avoid deadlocks when CommandQueueMT is involved
This commit lets CommandQueueMT play nicely with the WorkerThreadPool to avoid
non-progressable situations caused by an interdependence between both. While a
command queue is being flushed, it allows the WTP to release its lock while tasks
are being awaited so they can make progress in case they need in turn to post
to the command queue.
Diffstat (limited to 'core/templates/command_queue_mt.h')
-rw-r--r-- | core/templates/command_queue_mt.h | 50 |
1 files changed, 31 insertions, 19 deletions
diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index 7e480653ac..b1010f7f43 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -31,6 +31,7 @@ #ifndef COMMAND_QUEUE_MT_H #define COMMAND_QUEUE_MT_H +#include "core/object/worker_thread_pool.h" #include "core/os/memory.h" #include "core/os/mutex.h" #include "core/os/semaphore.h" @@ -306,15 +307,15 @@ class CommandQueueMT { struct CommandBase { virtual void call() = 0; - virtual void post() {} - virtual ~CommandBase() {} + virtual SyncSemaphore *get_sync_semaphore() { return nullptr; } + virtual ~CommandBase() = default; // Won't be called. }; struct SyncCommand : public CommandBase { SyncSemaphore *sync_sem = nullptr; - virtual void post() override { - sync_sem->sem.post(); + virtual SyncSemaphore *get_sync_semaphore() override { + return sync_sem; } }; @@ -340,6 +341,7 @@ class CommandQueueMT { SyncSemaphore sync_sems[SYNC_SEMAPHORES]; Mutex mutex; Semaphore *sync = nullptr; + uint64_t flush_read_ptr = 0; template <class T> T *allocate() { @@ -362,31 +364,41 @@ class CommandQueueMT { void _flush() { lock(); - uint64_t read_ptr = 0; - uint64_t limit = command_mem.size(); - - while (read_ptr < limit) { - uint64_t size = *(uint64_t *)&command_mem[read_ptr]; - read_ptr += 8; - CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[read_ptr]); - - cmd->call(); //execute the function - cmd->post(); //release in case it needs sync/ret - cmd->~CommandBase(); //should be done, so erase the command - - read_ptr += size; + WorkerThreadPool::thread_enter_command_queue_mt_flush(this); + while (flush_read_ptr < command_mem.size()) { + uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; + flush_read_ptr += 8; + CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]); + + SyncSemaphore *sync_sem = cmd->get_sync_semaphore(); + cmd->call(); + if (sync_sem) { + sync_sem->sem.post(); // Release in case it needs sync/ret. + } + + if (unlikely(flush_read_ptr == 0)) { + // A reentrant call flushed. + DEV_ASSERT(command_mem.is_empty()); + unlock(); + return; + } + + flush_read_ptr += size; } + WorkerThreadPool::thread_exit_command_queue_mt_flush(); command_mem.clear(); + flush_read_ptr = 0; unlock(); } - void lock(); - void unlock(); void wait_for_flush(); SyncSemaphore *_alloc_sync_sem(); public: + void lock(); + void unlock(); + /* NORMAL PUSH COMMANDS */ DECL_PUSH(0) SPACE_SEP_LIST(DECL_PUSH, 15) |