diff options
Diffstat (limited to 'core/templates/command_queue_mt.h')
-rw-r--r-- | core/templates/command_queue_mt.h | 73 |
1 files changed, 30 insertions, 43 deletions
diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index a4ac338bed..c149861467 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -32,9 +32,9 @@ #define COMMAND_QUEUE_MT_H #include "core/object/worker_thread_pool.h" +#include "core/os/condition_variable.h" #include "core/os/memory.h" #include "core/os/mutex.h" -#include "core/os/semaphore.h" #include "core/string/print_string.h" #include "core/templates/local_vector.h" #include "core/templates/simple_type.h" @@ -251,14 +251,14 @@ #define DECL_PUSH(N) \ template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \ void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \ - CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \ + MutexLock mlock(mutex); \ + CMD_TYPE(N) *cmd = allocate<CMD_TYPE(N)>(); \ cmd->instance = p_instance; \ cmd->method = p_method; \ SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \ WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \ } \ - unlock(); \ } #define CMD_RET_TYPE(N) CommandRet##N<T, M, COMMA_SEP_LIST(TYPE_ARG, N) COMMA(N) R> @@ -266,19 +266,17 @@ #define DECL_PUSH_AND_RET(N) \ template <typename T, typename M, COMMA_SEP_LIST(TYPE_PARAM, N) COMMA(N) typename R> \ void push_and_ret(T *p_instance, M p_method, COMMA_SEP_LIST(PARAM, N) COMMA(N) R *r_ret) { \ - SyncSemaphore *ss = _alloc_sync_sem(); \ - CMD_RET_TYPE(N) *cmd = allocate_and_lock<CMD_RET_TYPE(N)>(); \ + MutexLock mlock(mutex); \ + CMD_RET_TYPE(N) *cmd = allocate<CMD_RET_TYPE(N)>(); \ cmd->instance = p_instance; \ cmd->method = p_method; \ SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ cmd->ret = r_ret; \ - cmd->sync_sem = ss; \ if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \ WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \ } \ - unlock(); \ - ss->sem.wait(); \ - ss->in_use = false; \ + sync_tail++; \ + _wait_for_sync(mlock); \ } #define CMD_SYNC_TYPE(N) CommandSync##N<T, M COMMA(N) COMMA_SEP_LIST(TYPE_ARG, N)> @@ -286,39 +284,31 @@ #define DECL_PUSH_AND_SYNC(N) \ template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \ void push_and_sync(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \ - SyncSemaphore *ss = _alloc_sync_sem(); \ - CMD_SYNC_TYPE(N) *cmd = allocate_and_lock<CMD_SYNC_TYPE(N)>(); \ + MutexLock mlock(mutex); \ + CMD_SYNC_TYPE(N) *cmd = allocate<CMD_SYNC_TYPE(N)>(); \ cmd->instance = p_instance; \ cmd->method = p_method; \ SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ - cmd->sync_sem = ss; \ if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \ WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \ } \ - unlock(); \ - ss->sem.wait(); \ - ss->in_use = false; \ + sync_tail++; \ + _wait_for_sync(mlock); \ } #define MAX_CMD_PARAMS 15 class CommandQueueMT { - struct SyncSemaphore { - Semaphore sem; - bool in_use = false; - }; - struct CommandBase { + bool sync = false; virtual void call() = 0; - virtual SyncSemaphore *get_sync_semaphore() { return nullptr; } virtual ~CommandBase() = default; // Won't be called. }; struct SyncCommand : public CommandBase { - SyncSemaphore *sync_sem = nullptr; - - virtual SyncSemaphore *get_sync_semaphore() override { - return sync_sem; + virtual void call() override {} + SyncCommand() { + sync = true; } }; @@ -340,9 +330,11 @@ class CommandQueueMT { SYNC_SEMAPHORES = 8 }; + BinaryMutex mutex; LocalVector<uint8_t> command_mem; - SyncSemaphore sync_sems[SYNC_SEMAPHORES]; - Mutex mutex; + ConditionVariable sync_cond_var; + uint32_t sync_head = 0; + uint32_t sync_tail = 0; WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID; uint64_t flush_read_ptr = 0; @@ -357,32 +349,23 @@ class CommandQueueMT { return cmd; } - template <typename T> - T *allocate_and_lock() { - lock(); - T *ret = allocate<T>(); - return ret; - } - void _flush() { - lock(); - if (unlikely(flush_read_ptr)) { // Re-entrant call. - unlock(); return; } + lock(); + WorkerThreadPool::thread_enter_command_queue_mt_flush(this); while (flush_read_ptr < command_mem.size()) { uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; flush_read_ptr += 8; CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]); - - SyncSemaphore *sync_sem = cmd->get_sync_semaphore(); cmd->call(); - if (sync_sem) { - sync_sem->sem.post(); // Release in case it needs sync/ret. + if (unlikely(cmd->sync)) { + sync_head++; + sync_cond_var.notify_all(); } flush_read_ptr += size; @@ -394,8 +377,12 @@ class CommandQueueMT { unlock(); } - void wait_for_flush(); - SyncSemaphore *_alloc_sync_sem(); + _FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) { + uint32_t sync_head_goal = sync_tail; + do { + sync_cond_var.wait(p_lock); + } while (sync_head != sync_head_goal); // Can't use lower-than because of wraparound. + } public: void lock(); |