diff options
author | Spartan322 <Megacake1234@gmail.com> | 2024-11-12 13:46:08 -0500 |
---|---|---|
committer | Spartan322 <Megacake1234@gmail.com> | 2024-11-12 13:46:59 -0500 |
commit | 3a73c6ebd18bff0fa125be58d3ac9c7a63bab61d (patch) | |
tree | c7341bd56c977259578b127886c9a88eeef11820 /modules/websocket | |
parent | 5094c2a5f7d506b0e685120f14d1df42e1e9d495 (diff) | |
parent | cb411fa960f0b7fdbd97dcdb4c90f9346360ee0e (diff) | |
download | redot-engine-3a73c6ebd18bff0fa125be58d3ac9c7a63bab61d.tar.gz |
Merge commit godotengine/godot@cb411fa960f0b7fdbd97dcdb4c90f9346360ee0e
Diffstat (limited to 'modules/websocket')
-rw-r--r-- | modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml | 2 | ||||
-rw-r--r-- | modules/websocket/doc_classes/WebSocketPeer.xml | 6 | ||||
-rw-r--r-- | modules/websocket/packet_buffer.h | 8 | ||||
-rw-r--r-- | modules/websocket/websocket_peer.cpp | 14 | ||||
-rw-r--r-- | modules/websocket/websocket_peer.h | 6 | ||||
-rw-r--r-- | modules/websocket/wsl_peer.cpp | 123 | ||||
-rw-r--r-- | modules/websocket/wsl_peer.h | 17 |
7 files changed, 154 insertions, 22 deletions
diff --git a/modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml b/modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml index 0978e1fcee..b2e1cb345b 100644 --- a/modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml +++ b/modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml @@ -60,7 +60,7 @@ <member name="inbound_buffer_size" type="int" setter="set_inbound_buffer_size" getter="get_inbound_buffer_size" default="65535"> The inbound buffer size for connected peers. See [member WebSocketPeer.inbound_buffer_size] for more details. </member> - <member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="2048"> + <member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="4096"> The maximum number of queued packets for connected peers. See [member WebSocketPeer.max_queued_packets] for more details. </member> <member name="outbound_buffer_size" type="int" setter="set_outbound_buffer_size" getter="get_outbound_buffer_size" default="65535"> diff --git a/modules/websocket/doc_classes/WebSocketPeer.xml b/modules/websocket/doc_classes/WebSocketPeer.xml index 238dd30536..d329e21b88 100644 --- a/modules/websocket/doc_classes/WebSocketPeer.xml +++ b/modules/websocket/doc_classes/WebSocketPeer.xml @@ -155,10 +155,14 @@ The extra HTTP headers to be sent during the WebSocket handshake. [b]Note:[/b] Not supported in Web exports due to browsers' restrictions. </member> + <member name="heartbeat_interval" type="float" setter="set_heartbeat_interval" getter="get_heartbeat_interval" default="0.0"> + The interval (in seconds) at which the peer will automatically send WebSocket "ping" control frames. When set to [code]0[/code], no "ping" control frames will be sent. + [b]Note:[/b] Has no effect in Web exports due to browser restrictions. + </member> <member name="inbound_buffer_size" type="int" setter="set_inbound_buffer_size" getter="get_inbound_buffer_size" default="65535"> The size of the input buffer in bytes (roughly the maximum amount of memory that will be allocated for the inbound packets). </member> - <member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="2048"> + <member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="4096"> The maximum amount of packets that will be allowed in the queues (both inbound and outbound). </member> <member name="outbound_buffer_size" type="int" setter="set_outbound_buffer_size" getter="get_outbound_buffer_size" default="65535"> diff --git a/modules/websocket/packet_buffer.h b/modules/websocket/packet_buffer.h index 41975e34d4..cb7fd02e30 100644 --- a/modules/websocket/packet_buffer.h +++ b/modules/websocket/packet_buffer.h @@ -106,6 +106,14 @@ public: return _queued; } + int payload_space_left() const { + return _payload.space_left(); + } + + int packets_space_left() const { + return _packets.size() - _queued; + } + void clear() { _payload.resize(0); _packets.resize(0); diff --git a/modules/websocket/websocket_peer.cpp b/modules/websocket/websocket_peer.cpp index 91fdb2470a..724c73816f 100644 --- a/modules/websocket/websocket_peer.cpp +++ b/modules/websocket/websocket_peer.cpp @@ -72,6 +72,9 @@ void WebSocketPeer::_bind_methods() { ClassDB::bind_method(D_METHOD("set_max_queued_packets", "buffer_size"), &WebSocketPeer::set_max_queued_packets); ClassDB::bind_method(D_METHOD("get_max_queued_packets"), &WebSocketPeer::get_max_queued_packets); + ClassDB::bind_method(D_METHOD("set_heartbeat_interval", "interval"), &WebSocketPeer::set_heartbeat_interval); + ClassDB::bind_method(D_METHOD("get_heartbeat_interval"), &WebSocketPeer::get_heartbeat_interval); + ADD_PROPERTY(PropertyInfo(Variant::PACKED_STRING_ARRAY, "supported_protocols"), "set_supported_protocols", "get_supported_protocols"); ADD_PROPERTY(PropertyInfo(Variant::PACKED_STRING_ARRAY, "handshake_headers"), "set_handshake_headers", "get_handshake_headers"); @@ -80,6 +83,8 @@ void WebSocketPeer::_bind_methods() { ADD_PROPERTY(PropertyInfo(Variant::INT, "max_queued_packets"), "set_max_queued_packets", "get_max_queued_packets"); + ADD_PROPERTY(PropertyInfo(Variant::INT, "heartbeat_interval"), "set_heartbeat_interval", "get_heartbeat_interval"); + BIND_ENUM_CONSTANT(WRITE_MODE_TEXT); BIND_ENUM_CONSTANT(WRITE_MODE_BINARY); @@ -153,3 +158,12 @@ void WebSocketPeer::set_max_queued_packets(int p_max_queued_packets) { int WebSocketPeer::get_max_queued_packets() const { return max_queued_packets; } + +double WebSocketPeer::get_heartbeat_interval() const { + return heartbeat_interval_msec / 1000.0; +} + +void WebSocketPeer::set_heartbeat_interval(double p_interval) { + ERR_FAIL_COND(p_interval < 0); + heartbeat_interval_msec = p_interval * 1000.0; +} diff --git a/modules/websocket/websocket_peer.h b/modules/websocket/websocket_peer.h index 609e3fd6b4..e69c8fa756 100644 --- a/modules/websocket/websocket_peer.h +++ b/modules/websocket/websocket_peer.h @@ -73,7 +73,8 @@ protected: int outbound_buffer_size = DEFAULT_BUFFER_SIZE; int inbound_buffer_size = DEFAULT_BUFFER_SIZE; - int max_queued_packets = 2048; + int max_queued_packets = 4096; + uint64_t heartbeat_interval_msec = 0; public: static WebSocketPeer *create(bool p_notify_postinitialize = true) { @@ -119,6 +120,9 @@ public: void set_max_queued_packets(int p_max_queued_packets); int get_max_queued_packets() const; + double get_heartbeat_interval() const; + void set_heartbeat_interval(double p_interval); + WebSocketPeer(); ~WebSocketPeer(); }; diff --git a/modules/websocket/wsl_peer.cpp b/modules/websocket/wsl_peer.cpp index 38de729eb6..f5cfad597e 100644 --- a/modules/websocket/wsl_peer.cpp +++ b/modules/websocket/wsl_peer.cpp @@ -297,6 +297,7 @@ Error WSLPeer::_do_server_handshake() { resolver.stop(); // Response sent, initialize wslay context. wslay_event_context_server_init(&wsl_ctx, &_wsl_callbacks, this); + wslay_event_config_set_no_buffering(wsl_ctx, 1); wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size); in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets); packet_buffer.resize(inbound_buffer_size); @@ -405,6 +406,7 @@ void WSLPeer::_do_client_handshake() { ERR_FAIL_MSG("Invalid response headers."); } wslay_event_context_client_init(&wsl_ctx, &_wsl_callbacks, this); + wslay_event_config_set_no_buffering(wsl_ctx, 1); wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size); in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets); packet_buffer.resize(inbound_buffer_size); @@ -570,8 +572,15 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data, wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); return -1; } + // Make sure we don't read more than what our buffer can hold. + size_t buffer_limit = MIN(peer->in_buffer.payload_space_left(), peer->in_buffer.packets_space_left() * 2); // The minimum size of a websocket message is 2 bytes. + size_t to_read = MIN(len, buffer_limit); + if (to_read == 0) { + wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + return -1; + } int read = 0; - Error err = conn->get_partial_data(data, len, read); + Error err = conn->get_partial_data(data, to_read, read); if (err != OK) { print_verbose("Websocket get data error: " + itos(err) + ", read (should be 0!): " + itos(read)); wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); @@ -584,6 +593,37 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data, return read; } +void WSLPeer::_wsl_recv_start_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg *arg, void *user_data) { + WSLPeer *peer = (WSLPeer *)user_data; + uint8_t op = arg->opcode; + if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) { + // Get ready to process a data package. + PendingMessage &pm = peer->pending_message; + pm.opcode = op; + pm.payload_size = arg->payload_length; + } +} + +void WSLPeer::_wsl_frame_recv_chunk_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg *arg, void *user_data) { + WSLPeer *peer = (WSLPeer *)user_data; + PendingMessage &pm = peer->pending_message; + if (pm.opcode != 0) { + // Only write the payload. + peer->in_buffer.write_packet(arg->data, arg->data_length, nullptr); + } +} + +void WSLPeer::_wsl_frame_recv_end_callback(wslay_event_context_ptr ctx, void *user_data) { + WSLPeer *peer = (WSLPeer *)user_data; + PendingMessage &pm = peer->pending_message; + if (pm.opcode != 0) { + // Only write the packet (since it's now completed). + uint8_t is_string = pm.opcode == WSLAY_TEXT_FRAME ? 1 : 0; + peer->in_buffer.write_packet(nullptr, pm.payload_size, &is_string); + pm.clear(); + } +} + ssize_t WSLPeer::_wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data) { WSLPeer *peer = (WSLPeer *)user_data; Ref<StreamPeer> conn = peer->connection; @@ -629,25 +669,19 @@ void WSLPeer::_wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct w return; } - if (peer->ready_state == STATE_CLOSING) { - return; - } - - if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) { - // Message. - uint8_t is_string = arg->opcode == WSLAY_TEXT_FRAME ? 1 : 0; - peer->in_buffer.write_packet(arg->msg, arg->msg_length, &is_string); + if (op == WSLAY_PONG) { + peer->heartbeat_waiting = false; } - // Ping or pong. + // Ping, or message (already parsed in chunks). } wslay_event_callbacks WSLPeer::_wsl_callbacks = { _wsl_recv_callback, _wsl_send_callback, _wsl_genmask_callback, - nullptr, /* on_frame_recv_start_callback */ - nullptr, /* on_frame_recv_callback */ - nullptr, /* on_frame_recv_end_callback */ + _wsl_recv_start_callback, + _wsl_frame_recv_chunk_callback, + _wsl_frame_recv_end_callback, _wsl_msg_recv_callback }; @@ -682,7 +716,31 @@ void WSLPeer::poll() { if (ready_state == STATE_OPEN || ready_state == STATE_CLOSING) { ERR_FAIL_NULL(wsl_ctx); + uint64_t ticks = OS::get_singleton()->get_ticks_msec(); int err = 0; + if (heartbeat_interval_msec != 0 && ticks - last_heartbeat > heartbeat_interval_msec && ready_state == STATE_OPEN) { + if (heartbeat_waiting) { + wslay_event_context_free(wsl_ctx); + wsl_ctx = nullptr; + close(-1); + return; + } + heartbeat_waiting = true; + struct wslay_event_msg msg; + msg.opcode = WSLAY_PING; + msg.msg = nullptr; + msg.msg_length = 0; + err = wslay_event_queue_msg(wsl_ctx, &msg); + if (err == 0) { + last_heartbeat = ticks; + } else { + print_verbose("Websocket (wslay) failed to send ping: " + itos(err)); + wslay_event_context_free(wsl_ctx); + wsl_ctx = nullptr; + close(-1); + return; + } + } if ((err = wslay_event_recv(wsl_ctx)) != 0 || (err = wslay_event_send(wsl_ctx)) != 0) { // Error close. print_verbose("Websocket (wslay) poll error: " + itos(err)); @@ -691,12 +749,37 @@ void WSLPeer::poll() { close(-1); return; } - if (wslay_event_get_close_sent(wsl_ctx) && wslay_event_get_close_received(wsl_ctx)) { - // Clean close. - wslay_event_context_free(wsl_ctx); - wsl_ctx = nullptr; - close(-1); - return; + if (wslay_event_get_close_sent(wsl_ctx)) { + if (wslay_event_get_close_received(wsl_ctx)) { + // Clean close. + wslay_event_context_free(wsl_ctx); + wsl_ctx = nullptr; + close(-1); + return; + } else if (!wslay_event_get_read_enabled(wsl_ctx)) { + // Some protocol error caused wslay to stop processing incoming events, we'll never receive a close from the other peer. + close_code = wslay_event_get_status_code_sent(wsl_ctx); + switch (close_code) { + case WSLAY_CODE_MESSAGE_TOO_BIG: + close_reason = "Message too big"; + break; + case WSLAY_CODE_PROTOCOL_ERROR: + close_reason = "Protocol error"; + break; + case WSLAY_CODE_ABNORMAL_CLOSURE: + close_reason = "Abnormal closure"; + break; + case WSLAY_CODE_INVALID_FRAME_PAYLOAD_DATA: + close_reason = "Invalid frame payload data"; + break; + default: + close_reason = "Unknown"; + } + wslay_event_context_free(wsl_ctx); + wsl_ctx = nullptr; + close(-1); + return; + } } } } @@ -783,8 +866,10 @@ void WSLPeer::close(int p_code, String p_reason) { } } + heartbeat_waiting = false; in_buffer.clear(); packet_buffer.resize(0); + pending_message.clear(); } IPAddress WSLPeer::get_connected_host() const { diff --git a/modules/websocket/wsl_peer.h b/modules/websocket/wsl_peer.h index 0cc46adae7..677733005e 100644 --- a/modules/websocket/wsl_peer.h +++ b/modules/websocket/wsl_peer.h @@ -55,6 +55,10 @@ private: // Callbacks. static ssize_t _wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data, size_t len, int flags, void *user_data); + static void _wsl_recv_start_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg *arg, void *user_data); + static void _wsl_frame_recv_chunk_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg *arg, void *user_data); + static void _wsl_frame_recv_end_callback(wslay_event_context_ptr ctx, void *user_data); + static ssize_t _wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data); static int _wsl_genmask_callback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, void *user_data); static void _wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *user_data); @@ -82,6 +86,16 @@ private: Resolver() {} }; + struct PendingMessage { + size_t payload_size = 0; + uint8_t opcode = 0; + + void clear() { + payload_size = 0; + opcode = 0; + } + }; + Resolver resolver; // WebSocket connection state. @@ -101,6 +115,9 @@ private: int close_code = -1; String close_reason; uint8_t was_string = 0; + uint64_t last_heartbeat = 0; + bool heartbeat_waiting = false; + PendingMessage pending_message; // WebSocket configuration. bool use_tls = true; |