summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYuri Sizov <yuris@humnom.net>2023-07-14 18:49:08 +0200
committerYuri Sizov <yuris@humnom.net>2023-07-14 18:49:08 +0200
commitacd34153ba7329e95281cd93a6afa5ca6aae58a7 (patch)
tree191225f0fe001be55f652ed58feff34e70ccba2a
parent7e9d63ebd97fb77c5e1cac4f23fc2c00dff6a321 (diff)
parentc85beb8106a1e6633ee2156c524d5f000f3e12d4 (diff)
downloadredot-engine-acd34153ba7329e95281cd93a6afa5ca6aae58a7.tar.gz
Merge pull request #78612 from RandomShaper/mq_flush_ref
Refactor CallQueue flushing for clarity
-rw-r--r--core/object/message_queue.cpp94
-rw-r--r--core/object/message_queue.h2
2 files changed, 51 insertions, 45 deletions
diff --git a/core/object/message_queue.cpp b/core/object/message_queue.cpp
index 18ba5d5b30..506f8291eb 100644
--- a/core/object/message_queue.cpp
+++ b/core/object/message_queue.cpp
@@ -222,62 +222,66 @@ void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args
}
}
-Error CallQueue::flush() {
- LOCK_MUTEX;
-
- // Thread overrides are not meant to be flushed, but appended to the main one.
- if (this == MessageQueue::thread_singleton) {
- if (pages.size() == 0) {
- return OK;
- }
+Error CallQueue::_transfer_messages_to_main_queue() {
+ if (pages.size() == 0) {
+ return OK;
+ }
- CallQueue *mq = MessageQueue::main_singleton;
- DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters.
-
- mq->mutex.lock();
-
- // Here we're transferring the data from this queue to the main one.
- // However, it's very unlikely big amounts of messages will be queued here,
- // so PagedArray/Pool would be overkill. Also, in most cases the data will fit
- // an already existing page of the main queue.
-
- // Let's see if our first (likely only) page fits the current target queue page.
- uint32_t src_page = 0;
- {
- if (mq->pages_used) {
- uint32_t dst_page = mq->pages_used - 1;
- uint32_t dst_offset = mq->page_bytes[dst_page];
- if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) {
- memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]);
- mq->page_bytes[dst_page] += page_bytes[0];
- src_page++;
- }
+ CallQueue *mq = MessageQueue::main_singleton;
+ DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters.
+
+ mq->mutex.lock();
+
+ // Here we're transferring the data from this queue to the main one.
+ // However, it's very unlikely big amounts of messages will be queued here,
+ // so PagedArray/Pool would be overkill. Also, in most cases the data will fit
+ // an already existing page of the main queue.
+
+ // Let's see if our first (likely only) page fits the current target queue page.
+ uint32_t src_page = 0;
+ {
+ if (mq->pages_used) {
+ uint32_t dst_page = mq->pages_used - 1;
+ uint32_t dst_offset = mq->page_bytes[dst_page];
+ if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) {
+ memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]);
+ mq->page_bytes[dst_page] += page_bytes[0];
+ src_page++;
}
}
+ }
- // Any other possibly existing source page needs to be added.
+ // Any other possibly existing source page needs to be added.
- if (mq->pages_used + (pages_used - src_page) > mq->max_pages) {
- ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text);
- mq->statistics();
- mq->mutex.unlock();
- return ERR_OUT_OF_MEMORY;
- }
+ if (mq->pages_used + (pages_used - src_page) > mq->max_pages) {
+ ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text);
+ mq->statistics();
+ mq->mutex.unlock();
+ return ERR_OUT_OF_MEMORY;
+ }
- for (; src_page < pages_used; src_page++) {
- mq->_add_page();
- memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]);
- mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page];
- }
+ for (; src_page < pages_used; src_page++) {
+ mq->_add_page();
+ memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]);
+ mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page];
+ }
- mq->mutex.unlock();
+ mq->mutex.unlock();
- page_bytes[0] = 0;
- pages_used = 1;
+ page_bytes[0] = 0;
+ pages_used = 1;
- return OK;
+ return OK;
+}
+
+Error CallQueue::flush() {
+ // Thread overrides are not meant to be flushed, but appended to the main one.
+ if (unlikely(this == MessageQueue::thread_singleton)) {
+ return _transfer_messages_to_main_queue();
}
+ LOCK_MUTEX;
+
if (pages.size() == 0) {
// Never allocated
UNLOCK_MUTEX;
diff --git a/core/object/message_queue.h b/core/object/message_queue.h
index 9f567e4dd0..c2f4ad1643 100644
--- a/core/object/message_queue.h
+++ b/core/object/message_queue.h
@@ -98,6 +98,8 @@ private:
}
}
+ Error _transfer_messages_to_main_queue();
+
void _add_page();
void _call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error);