diff options
Diffstat (limited to 'core/object/message_queue.cpp')
| -rw-r--r-- | core/object/message_queue.cpp | 98 |
1 files changed, 51 insertions, 47 deletions
diff --git a/core/object/message_queue.cpp b/core/object/message_queue.cpp index 18ba5d5b30..de71295ee5 100644 --- a/core/object/message_queue.cpp +++ b/core/object/message_queue.cpp @@ -36,7 +36,7 @@ #include "core/object/script_language.h" #ifdef DEV_ENABLED -// Includes sanity checks to ensure that a queue set as a thread singleton override +// Includes safety checks to ensure that a queue set as a thread singleton override // is only ever called from the thread it was set for. #define LOCK_MUTEX \ if (this != MessageQueue::thread_singleton) { \ @@ -222,62 +222,66 @@ void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args } } -Error CallQueue::flush() { - LOCK_MUTEX; - - // Thread overrides are not meant to be flushed, but appended to the main one. - if (this == MessageQueue::thread_singleton) { - if (pages.size() == 0) { - return OK; - } +Error CallQueue::_transfer_messages_to_main_queue() { + if (pages.size() == 0) { + return OK; + } - CallQueue *mq = MessageQueue::main_singleton; - DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters. - - mq->mutex.lock(); - - // Here we're transferring the data from this queue to the main one. - // However, it's very unlikely big amounts of messages will be queued here, - // so PagedArray/Pool would be overkill. Also, in most cases the data will fit - // an already existing page of the main queue. - - // Let's see if our first (likely only) page fits the current target queue page. - uint32_t src_page = 0; - { - if (mq->pages_used) { - uint32_t dst_page = mq->pages_used - 1; - uint32_t dst_offset = mq->page_bytes[dst_page]; - if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) { - memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]); - mq->page_bytes[dst_page] += page_bytes[0]; - src_page++; - } + CallQueue *mq = MessageQueue::main_singleton; + DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters. + + mq->mutex.lock(); + + // Here we're transferring the data from this queue to the main one. + // However, it's very unlikely big amounts of messages will be queued here, + // so PagedArray/Pool would be overkill. Also, in most cases the data will fit + // an already existing page of the main queue. + + // Let's see if our first (likely only) page fits the current target queue page. + uint32_t src_page = 0; + { + if (mq->pages_used) { + uint32_t dst_page = mq->pages_used - 1; + uint32_t dst_offset = mq->page_bytes[dst_page]; + if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) { + memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]); + mq->page_bytes[dst_page] += page_bytes[0]; + src_page++; } } + } - // Any other possibly existing source page needs to be added. + // Any other possibly existing source page needs to be added. - if (mq->pages_used + (pages_used - src_page) > mq->max_pages) { - ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text); - mq->statistics(); - mq->mutex.unlock(); - return ERR_OUT_OF_MEMORY; - } + if (mq->pages_used + (pages_used - src_page) > mq->max_pages) { + ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text); + mq->statistics(); + mq->mutex.unlock(); + return ERR_OUT_OF_MEMORY; + } - for (; src_page < pages_used; src_page++) { - mq->_add_page(); - memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]); - mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page]; - } + for (; src_page < pages_used; src_page++) { + mq->_add_page(); + memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]); + mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page]; + } - mq->mutex.unlock(); + mq->mutex.unlock(); - page_bytes[0] = 0; - pages_used = 1; + page_bytes[0] = 0; + pages_used = 1; - return OK; + return OK; +} + +Error CallQueue::flush() { + // Thread overrides are not meant to be flushed, but appended to the main one. + if (unlikely(this == MessageQueue::thread_singleton)) { + return _transfer_messages_to_main_queue(); } + LOCK_MUTEX; + if (pages.size() == 0) { // Never allocated UNLOCK_MUTEX; @@ -533,7 +537,7 @@ CallQueue::~CallQueue() { if (!allocator_is_custom) { memdelete(allocator); } - // This is done here to avoid a circular dependency between the sanity checks and the thread singleton pointer. + // This is done here to avoid a circular dependency between the safety checks and the thread singleton pointer. if (this == MessageQueue::thread_singleton) { MessageQueue::thread_singleton = nullptr; } |
