summaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/config/engine.cpp16
-rw-r--r--core/config/engine.h7
-rw-r--r--core/object/worker_thread_pool.cpp158
-rw-r--r--core/object/worker_thread_pool.h19
-rw-r--r--core/templates/command_queue_mt.cpp8
-rw-r--r--core/templates/command_queue_mt.h43
6 files changed, 163 insertions, 88 deletions
diff --git a/core/config/engine.cpp b/core/config/engine.cpp
index 9f4bff3779..f2f8aebe8b 100644
--- a/core/config/engine.cpp
+++ b/core/config/engine.cpp
@@ -82,6 +82,17 @@ int Engine::get_audio_output_latency() const {
return _audio_output_latency;
}
+void Engine::increment_frames_drawn() {
+ if (frame_server_synced) {
+ server_syncs++;
+ } else {
+ server_syncs = 0;
+ }
+ frame_server_synced = false;
+
+ frames_drawn++;
+}
+
uint64_t Engine::get_frames_drawn() {
return frames_drawn;
}
@@ -364,6 +375,11 @@ Engine *Engine::get_singleton() {
return singleton;
}
+bool Engine::notify_frame_server_synced() {
+ frame_server_synced = true;
+ return server_syncs > SERVER_SYNC_FRAME_COUNT_WARNING;
+}
+
Engine::Engine() {
singleton = this;
}
diff --git a/core/config/engine.h b/core/config/engine.h
index d1495b36c2..8dece803e3 100644
--- a/core/config/engine.h
+++ b/core/config/engine.h
@@ -91,6 +91,10 @@ private:
String write_movie_path;
String shader_cache_path;
+ static constexpr int SERVER_SYNC_FRAME_COUNT_WARNING = 5;
+ int server_syncs = 0;
+ bool frame_server_synced = false;
+
public:
static Engine *get_singleton();
@@ -179,6 +183,9 @@ public:
bool is_generate_spirv_debug_info_enabled() const;
int32_t get_gpu_index() const;
+ void increment_frames_drawn();
+ bool notify_frame_server_synced();
+
Engine();
virtual ~Engine() {}
};
diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp
index ef3d315e4b..c10f491a11 100644
--- a/core/object/worker_thread_pool.cpp
+++ b/core/object/worker_thread_pool.cpp
@@ -35,6 +35,8 @@
#include "core/os/thread_safe.h"
#include "core/templates/command_queue_mt.h"
+WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;
+
void WorkerThreadPool::Task::free_template_userdata() {
ERR_FAIL_NULL(template_userdata);
ERR_FAIL_NULL(native_func_userdata);
@@ -60,11 +62,13 @@ void WorkerThreadPool::_process_task(Task *p_task) {
// its pre-created threads can't have ScriptServer::thread_enter() called on them early.
// Therefore, we do it late at the first opportunity, so in case the task
// about to be run uses scripting, guarantees are held.
+ task_mutex.lock();
if (!curr_thread.ready_for_scripting && ScriptServer::are_languages_initialized()) {
+ task_mutex.unlock();
ScriptServer::thread_enter();
+ task_mutex.lock();
curr_thread.ready_for_scripting = true;
}
- task_mutex.lock();
p_task->pool_thread_index = pool_thread_index;
prev_task = curr_thread.current_task;
curr_thread.current_task = p_task;
@@ -389,83 +393,117 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
task_mutex.unlock();
if (caller_pool_thread) {
- while (true) {
- Task *task_to_process = nullptr;
- {
- MutexLock lock(task_mutex);
- bool was_signaled = caller_pool_thread->signaled;
- caller_pool_thread->signaled = false;
-
- if (task->completed) {
- // This thread was awaken also for some reason, but it's about to exit.
- // Let's find out what may be pending and forward the requests.
- if (!exit_threads && was_signaled) {
- uint32_t to_process = task_queue.first() ? 1 : 0;
- uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;
- if (to_process || to_promote) {
- // This thread must be left alone since it won't loop again.
- caller_pool_thread->signaled = true;
- _notify_threads(caller_pool_thread, to_process, to_promote);
- }
- }
+ _wait_collaboratively(caller_pool_thread, task);
+ task->waiting_pool--;
+ if (task->waiting_pool == 0 && task->waiting_user == 0) {
+ tasks.erase(p_task_id);
+ task_allocator.free(task);
+ }
+ } else {
+ task->done_semaphore.wait();
+ task_mutex.lock();
+ task->waiting_user--;
+ if (task->waiting_pool == 0 && task->waiting_user == 0) {
+ tasks.erase(p_task_id);
+ task_allocator.free(task);
+ }
+ task_mutex.unlock();
+ }
- task->waiting_pool--;
- if (task->waiting_pool == 0 && task->waiting_user == 0) {
- tasks.erase(p_task_id);
- task_allocator.free(task);
- }
+ return OK;
+}
- break;
- }
+void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
+ // Keep processing tasks until the condition to stop waiting is met.
- if (!exit_threads) {
- // This is a thread from the pool. It shouldn't just idle.
- // Let's try to process other tasks while we wait.
+#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)
- if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
- if (_try_promote_low_priority_task()) {
- _notify_threads(caller_pool_thread, 1, 0);
- }
+ while (true) {
+ Task *task_to_process = nullptr;
+ {
+ MutexLock lock(task_mutex);
+ bool was_signaled = p_caller_pool_thread->signaled;
+ p_caller_pool_thread->signaled = false;
+
+ if (IS_WAIT_OVER) {
+ p_caller_pool_thread->yield_is_over = false;
+ if (!exit_threads && was_signaled) {
+ // This thread was awaken for some additional reason, but it's about to exit.
+ // Let's find out what may be pending and forward the requests.
+ uint32_t to_process = task_queue.first() ? 1 : 0;
+ uint32_t to_promote = p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;
+ if (to_process || to_promote) {
+ // This thread must be left alone since it won't loop again.
+ p_caller_pool_thread->signaled = true;
+ _notify_threads(p_caller_pool_thread, to_process, to_promote);
}
+ }
+
+ break;
+ }
- if (singleton->task_queue.first()) {
- task_to_process = task_queue.first()->self();
- task_queue.remove(task_queue.first());
+ if (!exit_threads) {
+ if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
+ if (_try_promote_low_priority_task()) {
+ _notify_threads(p_caller_pool_thread, 1, 0);
}
+ }
- if (!task_to_process) {
- caller_pool_thread->awaited_task = task;
+ if (singleton->task_queue.first()) {
+ task_to_process = task_queue.first()->self();
+ task_queue.remove(task_queue.first());
+ }
- if (flushing_cmd_queue) {
- flushing_cmd_queue->unlock();
- }
- caller_pool_thread->cond_var.wait(lock);
- if (flushing_cmd_queue) {
- flushing_cmd_queue->lock();
- }
+ if (!task_to_process) {
+ p_caller_pool_thread->awaited_task = p_task;
- DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed);
- caller_pool_thread->awaited_task = nullptr;
+ if (flushing_cmd_queue) {
+ flushing_cmd_queue->unlock();
+ }
+ p_caller_pool_thread->cond_var.wait(lock);
+ if (flushing_cmd_queue) {
+ flushing_cmd_queue->lock();
}
- }
- }
- if (task_to_process) {
- _process_task(task_to_process);
+ DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
+ p_caller_pool_thread->awaited_task = nullptr;
+ }
}
}
- } else {
- task->done_semaphore.wait();
- task_mutex.lock();
- task->waiting_user--;
- if (task->waiting_pool == 0 && task->waiting_user == 0) {
- tasks.erase(p_task_id);
- task_allocator.free(task);
+
+ if (task_to_process) {
+ _process_task(task_to_process);
}
+ }
+}
+
+void WorkerThreadPool::yield() {
+ int th_index = get_thread_index();
+ ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
+ _wait_collaboratively(&threads[th_index], ThreadData::YIELDING);
+}
+
+void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
+ task_mutex.lock();
+ Task **taskp = tasks.getptr(p_task_id);
+ if (!taskp) {
task_mutex.unlock();
+ ERR_FAIL_MSG("Invalid Task ID.");
}
+ Task *task = *taskp;
- return OK;
+#ifdef DEBUG_ENABLED
+ if (task->pool_thread_index == get_thread_index()) {
+ WARN_PRINT("A worker thread is attempting to notify itself. That makes no sense.");
+ }
+#endif
+
+ ThreadData &td = threads[task->pool_thread_index];
+ td.yield_is_over = true;
+ td.signaled = true;
+ td.cond_var.notify_one();
+
+ task_mutex.unlock();
}
WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h
index fdddc9a647..64f24df79f 100644
--- a/core/object/worker_thread_pool.h
+++ b/core/object/worker_thread_pool.h
@@ -107,13 +107,21 @@ private:
BinaryMutex task_mutex;
struct ThreadData {
+ static Task *const YIELDING; // Too bad constexpr doesn't work here.
+
uint32_t index = 0;
Thread thread;
- bool ready_for_scripting = false;
- bool signaled = false;
+ bool ready_for_scripting : 1;
+ bool signaled : 1;
+ bool yield_is_over : 1;
Task *current_task = nullptr;
- Task *awaited_task = nullptr; // Null if not awaiting the condition variable. Special value for idle-waiting.
+ Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
ConditionVariable cond_var;
+
+ ThreadData() :
+ ready_for_scripting(false),
+ signaled(false),
+ yield_is_over(false) {}
};
TightLocalVector<ThreadData> threads;
@@ -177,6 +185,8 @@ private:
}
};
+ void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);
+
protected:
static void _bind_methods();
@@ -196,6 +206,9 @@ public:
bool is_task_completed(TaskID p_task_id) const;
Error wait_for_task_completion(TaskID p_task_id);
+ void yield();
+ void notify_yield_over(TaskID p_task_id);
+
template <typename C, typename M, typename U>
GroupID add_template_group_task(C *p_instance, M p_method, U p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String()) {
typedef GroupUserData<C, M, U> GroupUD;
diff --git a/core/templates/command_queue_mt.cpp b/core/templates/command_queue_mt.cpp
index 6ecd75ebc1..0c5c6394a1 100644
--- a/core/templates/command_queue_mt.cpp
+++ b/core/templates/command_queue_mt.cpp
@@ -70,14 +70,8 @@ CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() {
return &sync_sems[idx];
}
-CommandQueueMT::CommandQueueMT(bool p_sync) {
- if (p_sync) {
- sync = memnew(Semaphore);
- }
+CommandQueueMT::CommandQueueMT() {
}
CommandQueueMT::~CommandQueueMT() {
- if (sync) {
- memdelete(sync);
- }
}
diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h
index e26f11d28a..a4ac338bed 100644
--- a/core/templates/command_queue_mt.h
+++ b/core/templates/command_queue_mt.h
@@ -248,16 +248,17 @@
#define CMD_TYPE(N) Command##N<T, M COMMA(N) COMMA_SEP_LIST(TYPE_ARG, N)>
#define CMD_ASSIGN_PARAM(N) cmd->p##N = p##N
-#define DECL_PUSH(N) \
- template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
- void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
- CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \
- cmd->instance = p_instance; \
- cmd->method = p_method; \
- SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
- unlock(); \
- if (sync) \
- sync->post(); \
+#define DECL_PUSH(N) \
+ template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
+ void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
+ CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \
+ cmd->instance = p_instance; \
+ cmd->method = p_method; \
+ SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
+ if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
+ WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
+ } \
+ unlock(); \
}
#define CMD_RET_TYPE(N) CommandRet##N<T, M, COMMA_SEP_LIST(TYPE_ARG, N) COMMA(N) R>
@@ -272,9 +273,10 @@
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
cmd->ret = r_ret; \
cmd->sync_sem = ss; \
+ if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
+ WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
+ } \
unlock(); \
- if (sync) \
- sync->post(); \
ss->sem.wait(); \
ss->in_use = false; \
}
@@ -290,9 +292,10 @@
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
cmd->sync_sem = ss; \
+ if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
+ WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
+ } \
unlock(); \
- if (sync) \
- sync->post(); \
ss->sem.wait(); \
ss->in_use = false; \
}
@@ -340,7 +343,7 @@ class CommandQueueMT {
LocalVector<uint8_t> command_mem;
SyncSemaphore sync_sems[SYNC_SEMAPHORES];
Mutex mutex;
- Semaphore *sync = nullptr;
+ WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
uint64_t flush_read_ptr = 0;
template <typename T>
@@ -420,12 +423,16 @@ public:
}
void wait_and_flush() {
- ERR_FAIL_NULL(sync);
- sync->wait();
+ ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
+ WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
_flush();
}
- CommandQueueMT(bool p_sync);
+ void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
+ pump_task_id = p_task_id;
+ }
+
+ CommandQueueMT();
~CommandQueueMT();
};