summaryrefslogtreecommitdiffstats
path: root/modules/multiplayer/scene_replication_interface.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/multiplayer/scene_replication_interface.cpp')
-rw-r--r--modules/multiplayer/scene_replication_interface.cpp182
1 files changed, 155 insertions, 27 deletions
diff --git a/modules/multiplayer/scene_replication_interface.cpp b/modules/multiplayer/scene_replication_interface.cpp
index 5889b8f5f9..b058bf7a52 100644
--- a/modules/multiplayer/scene_replication_interface.cpp
+++ b/modules/multiplayer/scene_replication_interface.cpp
@@ -138,15 +138,16 @@ void SceneReplicationInterface::on_network_process() {
spawn_queue.clear();
}
- // Process timed syncs.
- uint64_t msec = OS::get_singleton()->get_ticks_msec();
+ // Process syncs.
+ uint64_t usec = OS::get_singleton()->get_ticks_usec();
for (KeyValue<int, PeerInfo> &E : peers_info) {
const HashSet<ObjectID> to_sync = E.value.sync_nodes;
if (to_sync.is_empty()) {
continue; // Nothing to sync
}
uint16_t sync_net_time = ++E.value.last_sent_sync;
- _send_sync(E.key, to_sync, sync_net_time, msec);
+ _send_sync(E.key, to_sync, sync_net_time, usec);
+ _send_delta(E.key, to_sync, usec, E.value.last_watch_usecs);
}
}
@@ -280,6 +281,7 @@ Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_co
sync_nodes.erase(sid);
for (KeyValue<int, PeerInfo> &E : peers_info) {
E.value.sync_nodes.erase(sid);
+ E.value.last_watch_usecs.erase(sid);
if (sync->get_net_id()) {
E.value.recv_sync_ids.erase(sync->get_net_id());
}
@@ -357,6 +359,7 @@ Error SceneReplicationInterface::_update_sync_visibility(int p_peer, Multiplayer
E.value.sync_nodes.insert(sid);
} else {
E.value.sync_nodes.erase(sid);
+ E.value.last_watch_usecs.erase(sid);
}
}
return OK;
@@ -369,6 +372,7 @@ Error SceneReplicationInterface::_update_sync_visibility(int p_peer, Multiplayer
peers_info[p_peer].sync_nodes.insert(sid);
} else {
peers_info[p_peer].sync_nodes.erase(sid);
+ peers_info[p_peer].last_watch_usecs.erase(sid);
}
return OK;
}
@@ -670,8 +674,126 @@ Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p
return OK;
}
-void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_msec) {
- MAKE_ROOM(sync_mtu);
+bool SceneReplicationInterface::_verify_synchronizer(int p_peer, MultiplayerSynchronizer *p_sync, uint32_t &r_net_id) {
+ r_net_id = p_sync->get_net_id();
+ if (r_net_id == 0 || (r_net_id & 0x80000000)) {
+ int path_id = 0;
+ bool verified = multiplayer->get_path_cache()->send_object_cache(p_sync, p_peer, path_id);
+ ERR_FAIL_COND_V_MSG(path_id < 0, false, "This should never happen!");
+ if (r_net_id == 0) {
+ // First time path based ID.
+ r_net_id = path_id | 0x80000000;
+ p_sync->set_net_id(r_net_id | 0x80000000);
+ }
+ return verified;
+ }
+ return true;
+}
+
+MultiplayerSynchronizer *SceneReplicationInterface::_find_synchronizer(int p_peer, uint32_t p_net_id) {
+ MultiplayerSynchronizer *sync = nullptr;
+ if (p_net_id & 0x80000000) {
+ sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_peer, p_net_id & 0x7FFFFFFF));
+ } else if (peers_info[p_peer].recv_sync_ids.has(p_net_id)) {
+ const ObjectID &sid = peers_info[p_peer].recv_sync_ids[p_net_id];
+ sync = get_id_as<MultiplayerSynchronizer>(sid);
+ }
+ return sync;
+}
+
+void SceneReplicationInterface::_send_delta(int p_peer, const HashSet<ObjectID> p_synchronizers, uint64_t p_usec, const HashMap<ObjectID, uint64_t> p_last_watch_usecs) {
+ MAKE_ROOM(/* header */ 1 + /* element */ 4 + 8 + 4 + delta_mtu);
+ uint8_t *ptr = packet_cache.ptrw();
+ ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC | (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT);
+ int ofs = 1;
+ for (const ObjectID &oid : p_synchronizers) {
+ MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
+ ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority());
+ uint32_t net_id;
+ if (!_verify_synchronizer(p_peer, sync, net_id)) {
+ continue;
+ }
+ uint64_t last_usec = p_last_watch_usecs.has(oid) ? p_last_watch_usecs[oid] : 0;
+ uint64_t indexes;
+ List<Variant> delta = sync->get_delta_state(p_usec, last_usec, indexes);
+
+ if (!delta.size()) {
+ continue; // Nothing to update.
+ }
+
+ Vector<const Variant *> varp;
+ varp.resize(delta.size());
+ const Variant **vptr = varp.ptrw();
+ int i = 0;
+ for (const Variant &v : delta) {
+ vptr[i] = &v;
+ }
+ int size;
+ Error err = MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), nullptr, size);
+ ERR_CONTINUE_MSG(err != OK, "Unable to encode delta state.");
+
+ ERR_CONTINUE_MSG(size > delta_mtu, vformat("Synchronizer delta bigger than MTU will not be sent (%d > %d): %s", size, delta_mtu, sync->get_path()));
+
+ if (ofs + 4 + 8 + 4 + size > delta_mtu) {
+ // Send what we got, and reset write.
+ _send_raw(packet_cache.ptr(), ofs, p_peer, true);
+ ofs = 1;
+ }
+ if (size) {
+ ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);
+ ofs += encode_uint64(indexes, &ptr[ofs]);
+ ofs += encode_uint32(size, &ptr[ofs]);
+ MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), &ptr[ofs], size);
+ ofs += size;
+ }
+#ifdef DEBUG_ENABLED
+ _profile_node_data("delta_out", oid, size);
+#endif
+ peers_info[p_peer].last_watch_usecs[oid] = p_usec;
+ }
+ if (ofs > 1) {
+ // Got some left over to send.
+ _send_raw(packet_cache.ptr(), ofs, p_peer, true);
+ }
+}
+
+Error SceneReplicationInterface::on_delta_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
+ int ofs = 1;
+ while (ofs + 4 + 8 + 4 < p_buffer_len) {
+ uint32_t net_id = decode_uint32(&p_buffer[ofs]);
+ ofs += 4;
+ uint64_t indexes = decode_uint64(&p_buffer[ofs]);
+ ofs += 8;
+ uint32_t size = decode_uint32(&p_buffer[ofs]);
+ ofs += 4;
+ ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
+ MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
+ Node *node = sync ? sync->get_root_node() : nullptr;
+ if (!sync || sync->get_multiplayer_authority() != p_from || !node) {
+ ofs += size;
+ ERR_CONTINUE_MSG(true, "Ignoring delta for non-authority or invalid synchronizer.");
+ }
+ List<NodePath> props = sync->get_delta_properties(indexes);
+ ERR_FAIL_COND_V(props.size() == 0, ERR_INVALID_DATA);
+ Vector<Variant> vars;
+ vars.resize(props.size());
+ int consumed = 0;
+ Error err = MultiplayerAPI::decode_and_decompress_variants(vars, p_buffer + ofs, size, consumed);
+ ERR_FAIL_COND_V(err != OK, err);
+ ERR_FAIL_COND_V(uint32_t(consumed) != size, ERR_INVALID_DATA);
+ err = MultiplayerSynchronizer::set_state(props, node, vars);
+ ERR_FAIL_COND_V(err != OK, err);
+ ofs += size;
+ sync->emit_signal(SNAME("delta_synchronized"));
+#ifdef DEBUG_ENABLED
+ _profile_node_data("delta_in", sync->get_instance_id(), size);
+#endif
+ }
+ return OK;
+}
+
+void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_usec) {
+ MAKE_ROOM(/* header */ 3 + /* element */ 4 + 4 + sync_mtu);
uint8_t *ptr = packet_cache.ptrw();
ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC;
int ofs = 1;
@@ -681,26 +803,16 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p
for (const ObjectID &oid : p_synchronizers) {
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority());
- if (!sync->update_outbound_sync_time(p_msec)) {
+ if (!sync->update_outbound_sync_time(p_usec)) {
continue; // nothing to sync.
}
Node *node = sync->get_root_node();
ERR_CONTINUE(!node);
uint32_t net_id = sync->get_net_id();
- if (net_id == 0 || (net_id & 0x80000000)) {
- int path_id = 0;
- bool verified = multiplayer->get_path_cache()->send_object_cache(sync, p_peer, path_id);
- ERR_CONTINUE_MSG(path_id < 0, "This should never happen!");
- if (net_id == 0) {
- // First time path based ID.
- net_id = path_id | 0x80000000;
- sync->set_net_id(net_id | 0x80000000);
- }
- if (!verified) {
- // The path based sync is not yet confirmed, skipping.
- continue;
- }
+ if (!_verify_synchronizer(p_peer, sync, net_id)) {
+ // The path based sync is not yet confirmed, skipping.
+ continue;
}
int size;
Vector<Variant> vars;
@@ -711,7 +823,7 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p
err = MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), nullptr, size);
ERR_CONTINUE_MSG(err != OK, "Unable to encode sync state.");
// TODO Handle single state above MTU.
- ERR_CONTINUE_MSG(size > 3 + 4 + 4 + sync_mtu, vformat("Node states bigger then MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));
+ ERR_CONTINUE_MSG(size > sync_mtu, vformat("Node states bigger than MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));
if (ofs + 4 + 4 + size > sync_mtu) {
// Send what we got, and reset write.
_send_raw(packet_cache.ptr(), ofs, p_peer, false);
@@ -735,6 +847,10 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p
Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received");
+ bool is_delta = (p_buffer[0] & (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT)) != 0;
+ if (is_delta) {
+ return on_delta_receive(p_from, p_buffer, p_buffer_len);
+ }
uint16_t time = decode_uint16(&p_buffer[1]);
int ofs = 3;
while (ofs + 8 < p_buffer_len) {
@@ -743,13 +859,7 @@ Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_bu
uint32_t size = decode_uint32(&p_buffer[ofs]);
ofs += 4;
ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
- MultiplayerSynchronizer *sync = nullptr;
- if (net_id & 0x80000000) {
- sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_from, net_id & 0x7FFFFFFF));
- } else if (peers_info[p_from].recv_sync_ids.has(net_id)) {
- const ObjectID &sid = peers_info[p_from].recv_sync_ids[net_id];
- sync = get_id_as<MultiplayerSynchronizer>(sid);
- }
+ MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
if (!sync) {
// Not received yet.
ofs += size;
@@ -782,3 +892,21 @@ Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_bu
}
return OK;
}
+
+void SceneReplicationInterface::set_max_sync_packet_size(int p_size) {
+ ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
+ sync_mtu = p_size;
+}
+
+int SceneReplicationInterface::get_max_sync_packet_size() const {
+ return sync_mtu;
+}
+
+void SceneReplicationInterface::set_max_delta_packet_size(int p_size) {
+ ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
+ delta_mtu = p_size;
+}
+
+int SceneReplicationInterface::get_max_delta_packet_size() const {
+ return delta_mtu;
+}