diff options
Diffstat (limited to 'modules/multiplayer/scene_replication_interface.cpp')
-rw-r--r-- | modules/multiplayer/scene_replication_interface.cpp | 182 |
1 files changed, 155 insertions, 27 deletions
diff --git a/modules/multiplayer/scene_replication_interface.cpp b/modules/multiplayer/scene_replication_interface.cpp index 5889b8f5f9..b058bf7a52 100644 --- a/modules/multiplayer/scene_replication_interface.cpp +++ b/modules/multiplayer/scene_replication_interface.cpp @@ -138,15 +138,16 @@ void SceneReplicationInterface::on_network_process() { spawn_queue.clear(); } - // Process timed syncs. - uint64_t msec = OS::get_singleton()->get_ticks_msec(); + // Process syncs. + uint64_t usec = OS::get_singleton()->get_ticks_usec(); for (KeyValue<int, PeerInfo> &E : peers_info) { const HashSet<ObjectID> to_sync = E.value.sync_nodes; if (to_sync.is_empty()) { continue; // Nothing to sync } uint16_t sync_net_time = ++E.value.last_sent_sync; - _send_sync(E.key, to_sync, sync_net_time, msec); + _send_sync(E.key, to_sync, sync_net_time, usec); + _send_delta(E.key, to_sync, usec, E.value.last_watch_usecs); } } @@ -280,6 +281,7 @@ Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_co sync_nodes.erase(sid); for (KeyValue<int, PeerInfo> &E : peers_info) { E.value.sync_nodes.erase(sid); + E.value.last_watch_usecs.erase(sid); if (sync->get_net_id()) { E.value.recv_sync_ids.erase(sync->get_net_id()); } @@ -357,6 +359,7 @@ Error SceneReplicationInterface::_update_sync_visibility(int p_peer, Multiplayer E.value.sync_nodes.insert(sid); } else { E.value.sync_nodes.erase(sid); + E.value.last_watch_usecs.erase(sid); } } return OK; @@ -369,6 +372,7 @@ Error SceneReplicationInterface::_update_sync_visibility(int p_peer, Multiplayer peers_info[p_peer].sync_nodes.insert(sid); } else { peers_info[p_peer].sync_nodes.erase(sid); + peers_info[p_peer].last_watch_usecs.erase(sid); } return OK; } @@ -670,8 +674,126 @@ Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p return OK; } -void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_msec) { - MAKE_ROOM(sync_mtu); +bool SceneReplicationInterface::_verify_synchronizer(int p_peer, MultiplayerSynchronizer *p_sync, uint32_t &r_net_id) { + r_net_id = p_sync->get_net_id(); + if (r_net_id == 0 || (r_net_id & 0x80000000)) { + int path_id = 0; + bool verified = multiplayer->get_path_cache()->send_object_cache(p_sync, p_peer, path_id); + ERR_FAIL_COND_V_MSG(path_id < 0, false, "This should never happen!"); + if (r_net_id == 0) { + // First time path based ID. + r_net_id = path_id | 0x80000000; + p_sync->set_net_id(r_net_id | 0x80000000); + } + return verified; + } + return true; +} + +MultiplayerSynchronizer *SceneReplicationInterface::_find_synchronizer(int p_peer, uint32_t p_net_id) { + MultiplayerSynchronizer *sync = nullptr; + if (p_net_id & 0x80000000) { + sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_peer, p_net_id & 0x7FFFFFFF)); + } else if (peers_info[p_peer].recv_sync_ids.has(p_net_id)) { + const ObjectID &sid = peers_info[p_peer].recv_sync_ids[p_net_id]; + sync = get_id_as<MultiplayerSynchronizer>(sid); + } + return sync; +} + +void SceneReplicationInterface::_send_delta(int p_peer, const HashSet<ObjectID> p_synchronizers, uint64_t p_usec, const HashMap<ObjectID, uint64_t> p_last_watch_usecs) { + MAKE_ROOM(/* header */ 1 + /* element */ 4 + 8 + 4 + delta_mtu); + uint8_t *ptr = packet_cache.ptrw(); + ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC | (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT); + int ofs = 1; + for (const ObjectID &oid : p_synchronizers) { + MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid); + ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority()); + uint32_t net_id; + if (!_verify_synchronizer(p_peer, sync, net_id)) { + continue; + } + uint64_t last_usec = p_last_watch_usecs.has(oid) ? p_last_watch_usecs[oid] : 0; + uint64_t indexes; + List<Variant> delta = sync->get_delta_state(p_usec, last_usec, indexes); + + if (!delta.size()) { + continue; // Nothing to update. + } + + Vector<const Variant *> varp; + varp.resize(delta.size()); + const Variant **vptr = varp.ptrw(); + int i = 0; + for (const Variant &v : delta) { + vptr[i] = &v; + } + int size; + Error err = MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), nullptr, size); + ERR_CONTINUE_MSG(err != OK, "Unable to encode delta state."); + + ERR_CONTINUE_MSG(size > delta_mtu, vformat("Synchronizer delta bigger than MTU will not be sent (%d > %d): %s", size, delta_mtu, sync->get_path())); + + if (ofs + 4 + 8 + 4 + size > delta_mtu) { + // Send what we got, and reset write. + _send_raw(packet_cache.ptr(), ofs, p_peer, true); + ofs = 1; + } + if (size) { + ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]); + ofs += encode_uint64(indexes, &ptr[ofs]); + ofs += encode_uint32(size, &ptr[ofs]); + MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), &ptr[ofs], size); + ofs += size; + } +#ifdef DEBUG_ENABLED + _profile_node_data("delta_out", oid, size); +#endif + peers_info[p_peer].last_watch_usecs[oid] = p_usec; + } + if (ofs > 1) { + // Got some left over to send. + _send_raw(packet_cache.ptr(), ofs, p_peer, true); + } +} + +Error SceneReplicationInterface::on_delta_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) { + int ofs = 1; + while (ofs + 4 + 8 + 4 < p_buffer_len) { + uint32_t net_id = decode_uint32(&p_buffer[ofs]); + ofs += 4; + uint64_t indexes = decode_uint64(&p_buffer[ofs]); + ofs += 8; + uint32_t size = decode_uint32(&p_buffer[ofs]); + ofs += 4; + ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA); + MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id); + Node *node = sync ? sync->get_root_node() : nullptr; + if (!sync || sync->get_multiplayer_authority() != p_from || !node) { + ofs += size; + ERR_CONTINUE_MSG(true, "Ignoring delta for non-authority or invalid synchronizer."); + } + List<NodePath> props = sync->get_delta_properties(indexes); + ERR_FAIL_COND_V(props.size() == 0, ERR_INVALID_DATA); + Vector<Variant> vars; + vars.resize(props.size()); + int consumed = 0; + Error err = MultiplayerAPI::decode_and_decompress_variants(vars, p_buffer + ofs, size, consumed); + ERR_FAIL_COND_V(err != OK, err); + ERR_FAIL_COND_V(uint32_t(consumed) != size, ERR_INVALID_DATA); + err = MultiplayerSynchronizer::set_state(props, node, vars); + ERR_FAIL_COND_V(err != OK, err); + ofs += size; + sync->emit_signal(SNAME("delta_synchronized")); +#ifdef DEBUG_ENABLED + _profile_node_data("delta_in", sync->get_instance_id(), size); +#endif + } + return OK; +} + +void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_usec) { + MAKE_ROOM(/* header */ 3 + /* element */ 4 + 4 + sync_mtu); uint8_t *ptr = packet_cache.ptrw(); ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC; int ofs = 1; @@ -681,26 +803,16 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p for (const ObjectID &oid : p_synchronizers) { MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid); ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority()); - if (!sync->update_outbound_sync_time(p_msec)) { + if (!sync->update_outbound_sync_time(p_usec)) { continue; // nothing to sync. } Node *node = sync->get_root_node(); ERR_CONTINUE(!node); uint32_t net_id = sync->get_net_id(); - if (net_id == 0 || (net_id & 0x80000000)) { - int path_id = 0; - bool verified = multiplayer->get_path_cache()->send_object_cache(sync, p_peer, path_id); - ERR_CONTINUE_MSG(path_id < 0, "This should never happen!"); - if (net_id == 0) { - // First time path based ID. - net_id = path_id | 0x80000000; - sync->set_net_id(net_id | 0x80000000); - } - if (!verified) { - // The path based sync is not yet confirmed, skipping. - continue; - } + if (!_verify_synchronizer(p_peer, sync, net_id)) { + // The path based sync is not yet confirmed, skipping. + continue; } int size; Vector<Variant> vars; @@ -711,7 +823,7 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p err = MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), nullptr, size); ERR_CONTINUE_MSG(err != OK, "Unable to encode sync state."); // TODO Handle single state above MTU. - ERR_CONTINUE_MSG(size > 3 + 4 + 4 + sync_mtu, vformat("Node states bigger then MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path())); + ERR_CONTINUE_MSG(size > sync_mtu, vformat("Node states bigger than MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path())); if (ofs + 4 + 4 + size > sync_mtu) { // Send what we got, and reset write. _send_raw(packet_cache.ptr(), ofs, p_peer, false); @@ -735,6 +847,10 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) { ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received"); + bool is_delta = (p_buffer[0] & (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT)) != 0; + if (is_delta) { + return on_delta_receive(p_from, p_buffer, p_buffer_len); + } uint16_t time = decode_uint16(&p_buffer[1]); int ofs = 3; while (ofs + 8 < p_buffer_len) { @@ -743,13 +859,7 @@ Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_bu uint32_t size = decode_uint32(&p_buffer[ofs]); ofs += 4; ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA); - MultiplayerSynchronizer *sync = nullptr; - if (net_id & 0x80000000) { - sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_from, net_id & 0x7FFFFFFF)); - } else if (peers_info[p_from].recv_sync_ids.has(net_id)) { - const ObjectID &sid = peers_info[p_from].recv_sync_ids[net_id]; - sync = get_id_as<MultiplayerSynchronizer>(sid); - } + MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id); if (!sync) { // Not received yet. ofs += size; @@ -782,3 +892,21 @@ Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_bu } return OK; } + +void SceneReplicationInterface::set_max_sync_packet_size(int p_size) { + ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes."); + sync_mtu = p_size; +} + +int SceneReplicationInterface::get_max_sync_packet_size() const { + return sync_mtu; +} + +void SceneReplicationInterface::set_max_delta_packet_size(int p_size) { + ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes."); + delta_mtu = p_size; +} + +int SceneReplicationInterface::get_max_delta_packet_size() const { + return delta_mtu; +} |