diff --git a/packages/agent_core/src/network/tcp/tcp_clients.rs b/packages/agent_core/src/network/tcp/tcp_clients.rs index 47443ace..10a47c26 100644 --- a/packages/agent_core/src/network/tcp/tcp_clients.rs +++ b/packages/agent_core/src/network/tcp/tcp_clients.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, num::NonZeroU32, sync::Arc, time::Duration}; +use std::{io, net::SocketAddr, num::NonZeroU32, sync::Arc, time::Duration}; use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; use playit_agent_proto::control_feed::NewClient; @@ -6,7 +6,7 @@ use playit_api_client::api::ProxyProtocol; use serde::Serialize; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - net::TcpStream, + net::{TcpSocket, TcpStream}, sync::mpsc::{Receiver, Sender, channel}, time::Instant, }; @@ -26,6 +26,25 @@ use super::{ tcp_settings::TcpSettings, }; +const TCP_CLAIM_SEND_BUFFER_SIZE: u32 = 8 * 1024 * 1024; + +async fn connect_claim_address(addr: SocketAddr) -> io::Result { + let socket = match addr { + SocketAddr::V4(_) => TcpSocket::new_v4()?, + SocketAddr::V6(_) => TcpSocket::new_v6()?, + }; + + if let Err(error) = socket.set_send_buffer_size(TCP_CLAIM_SEND_BUFFER_SIZE) { + tracing::error!( + ?error, + send_buffer_size = TCP_CLAIM_SEND_BUFFER_SIZE, + "failed to set claim tcp send buffer size" + ); + } + + socket.connect(addr).await +} + fn build_quota(settings: &TcpSettings) -> Quota { let rate = NonZeroU32::new(settings.new_client_ratelimit).unwrap_or_else(|| { tracing::warn!("invalid tcp new client rate limit of 0, clamping to 1"); @@ -251,7 +270,7 @@ impl Worker { _ = cancel.cancelled() => return, res = tokio::time::timeout( Duration::from_secs(8), - TcpStream::connect(details.claim_instructions.address), + connect_claim_address(details.claim_instructions.address), ) => res, };