From e007aa4fd6021d26f637a4b3bf6474fdf05c2ede Mon Sep 17 00:00:00 2001 From: Patrick Lorio Date: Fri, 15 May 2026 16:54:45 -0700 Subject: [PATCH 1/2] Add TCP upload QoS fairness scheduling - introduce per-flow upload fairness coordination - slice pipe writes to cooperate between active TCP uploads - add tests for round-robin grants and QoS pipe behavior --- packages/agent_core/src/network/tcp/mod.rs | 1 + .../agent_core/src/network/tcp/tcp_client.rs | 27 +- .../agent_core/src/network/tcp/tcp_clients.rs | 18 +- .../agent_core/src/network/tcp/tcp_pipe.rs | 103 ++++++- .../src/network/tcp/tcp_upload_qos.rs | 280 ++++++++++++++++++ 5 files changed, 421 insertions(+), 8 deletions(-) create mode 100644 packages/agent_core/src/network/tcp/tcp_upload_qos.rs diff --git a/packages/agent_core/src/network/tcp/mod.rs b/packages/agent_core/src/network/tcp/mod.rs index 7835a8f1..4349daa9 100644 --- a/packages/agent_core/src/network/tcp/mod.rs +++ b/packages/agent_core/src/network/tcp/mod.rs @@ -3,3 +3,4 @@ pub mod tcp_clients; pub mod tcp_errors; pub mod tcp_pipe; pub mod tcp_settings; +mod tcp_upload_qos; diff --git a/packages/agent_core/src/network/tcp/tcp_client.rs b/packages/agent_core/src/network/tcp/tcp_client.rs index 58caa407..1f464568 100644 --- a/packages/agent_core/src/network/tcp/tcp_client.rs +++ b/packages/agent_core/src/network/tcp/tcp_client.rs @@ -4,7 +4,10 @@ use tokio_util::sync::CancellationToken; use crate::stats::AgentStats; -use super::tcp_pipe::{PipeDirection, TcpPipe}; +use super::{ + tcp_pipe::{PipeDirection, TcpPipe}, + tcp_upload_qos::TcpUploadFairness, +}; pub struct TcpClient { tunn_to_origin: TcpPipe, @@ -20,11 +23,30 @@ impl TcpClient { tunn: TcpStream, origin: TcpStream, stats: Option, + ) -> Self { + Self::create_with_stats_and_upload_flow(tunn, origin, stats, None).await + } + + pub(super) async fn create_with_stats_and_upload_qos( + tunn: TcpStream, + origin: TcpStream, + stats: Option, + upload_fairness: TcpUploadFairness, + ) -> Self { + Self::create_with_stats_and_upload_flow(tunn, origin, stats, Some(upload_fairness)).await + } + + async fn create_with_stats_and_upload_flow( + tunn: TcpStream, + origin: TcpStream, + stats: Option, + upload_fairness: Option, ) -> Self { let (tunn_read, tunn_write) = tunn.into_split(); let (origin_read, origin_write) = origin.into_split(); let cancel = CancellationToken::new(); + let upload_flow = upload_fairness.map(|fairness| fairness.register()); TcpClient { tunn_to_origin: TcpPipe::new_with_stats( @@ -34,12 +56,13 @@ impl TcpClient { stats.clone(), PipeDirection::TunnelToOrigin, ), - origin_to_tunn: TcpPipe::new_with_stats( + origin_to_tunn: TcpPipe::new_with_stats_and_upload_flow( cancel, origin_read, tunn_write, stats, PipeDirection::OriginToTunnel, + upload_flow, ), } } diff --git a/packages/agent_core/src/network/tcp/tcp_clients.rs b/packages/agent_core/src/network/tcp/tcp_clients.rs index 47443ace..f5a717de 100644 --- a/packages/agent_core/src/network/tcp/tcp_clients.rs +++ b/packages/agent_core/src/network/tcp/tcp_clients.rs @@ -24,6 +24,7 @@ use super::{ tcp_client::{TcpClient, TcpClientStat}, tcp_errors::tcp_errors, tcp_settings::TcpSettings, + tcp_upload_qos::TcpUploadFairness, }; fn build_quota(settings: &TcpSettings) -> Quota { @@ -52,6 +53,7 @@ struct Worker { cancel: CancellationToken, settings: TcpSettings, stats: AgentStats, + upload_fairness: TcpUploadFairness, clients: Vec, next_client_id: u64, @@ -113,6 +115,8 @@ impl TcpClients { ) -> Self { let quota = build_quota(&settings); let (events_tx, events_rx) = channel(1024); + let worker_cancel = cancel.child_token(); + let upload_fairness = TcpUploadFairness::new(worker_cancel.child_token()); tokio::spawn( Worker { @@ -120,9 +124,10 @@ impl TcpClients { lookup, events: events_rx, events_tx: events_tx.clone(), - cancel: cancel.child_token(), + cancel: worker_cancel, settings, stats, + upload_fairness, clients: Vec::with_capacity(32), } .start(), @@ -232,6 +237,7 @@ impl Worker { let event_tx = self.events_tx.clone(); let stats = self.stats.clone(); + let upload_fairness = self.upload_fairness.clone(); let cancel = self.cancel.child_token(); tokio::spawn(async move { let Some(origin_addr) = found.resolve_local(details.port_offset).await @@ -395,9 +401,13 @@ impl Worker { } } - let tcp_client = - TcpClient::create_with_stats(tunn_stream, origin_stream, Some(stats)) - .await; + let tcp_client = TcpClient::create_with_stats_and_upload_qos( + tunn_stream, + origin_stream, + Some(stats), + upload_fairness, + ) + .await; let event = Event::ConnectedClient(Client { id: client_id, added_at: now_milli(), diff --git a/packages/agent_core/src/network/tcp/tcp_pipe.rs b/packages/agent_core/src/network/tcp/tcp_pipe.rs index 14640ec9..c0a159ae 100644 --- a/packages/agent_core/src/network/tcp/tcp_pipe.rs +++ b/packages/agent_core/src/network/tcp/tcp_pipe.rs @@ -9,6 +9,8 @@ use tokio_util::sync::CancellationToken; use crate::stats::AgentStats; use crate::utils::now_milli; +use super::tcp_upload_qos::{TCP_UPLOAD_QOS_SLICE_SIZE, TcpUploadFlow}; + const TCP_PIPE_BUFFER_SIZE: usize = 16 * 1024; /// Direction of data flow for stats tracking @@ -58,6 +60,20 @@ impl TcpPipe { to: W, stats: Option, direction: PipeDirection, + ) -> Self { + Self::new_with_stats_and_upload_flow(cancel, from, to, stats, direction, None) + } + + pub(super) fn new_with_stats_and_upload_flow< + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, + >( + cancel: CancellationToken, + from: R, + to: W, + stats: Option, + direction: PipeDirection, + upload_flow: Option, ) -> Self { let shared = Arc::new(Shared { last_activity: AtomicU64::new(now_milli()), @@ -74,6 +90,7 @@ impl TcpPipe { to, stats, direction, + upload_flow, } .start(), ); @@ -113,11 +130,17 @@ struct Worker { to: W, stats: Option, direction: PipeDirection, + upload_flow: Option, } impl Worker { pub async fn start(mut self) { - let mut buffer = vec![0u8; TCP_PIPE_BUFFER_SIZE]; + let buffer_size = if self.upload_flow.is_some() { + TCP_UPLOAD_QOS_SLICE_SIZE + } else { + TCP_PIPE_BUFFER_SIZE + }; + let mut buffer = vec![0u8; buffer_size]; loop { // Keep the pipe cooperative when both sockets stay continuously ready. @@ -151,7 +174,23 @@ impl Worker { break; } - if let Err(error) = self.to.write_all(&buffer[..byte_count]).await { + if let Some(upload_flow) = &self.upload_flow + && !upload_flow.acquire(byte_count, &self.cancel).await + { + tracing::info!("TcpPipe upload QoS acquire failed"); + break; + } + + let Some(write_res) = self + .cancel + .run_until_cancelled(self.to.write_all(&buffer[..byte_count])) + .await + else { + tracing::info!("TcpPipe cancelled"); + break; + }; + + if let Err(error) = write_res { tracing::error!(?error, "failed to write data"); break; } @@ -176,3 +215,63 @@ impl Worker { self.shared.last_activity.store(u64::MAX, Ordering::Release); } } + +#[cfg(test)] +mod tests { + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + time::{Duration, timeout}, + }; + + use super::*; + use crate::network::tcp::tcp_upload_qos::{TCP_UPLOAD_QOS_SLICE_SIZE, TcpUploadFairness}; + + async fn wait_for_bytes(pipe: &TcpPipe, expected: u64) { + timeout(Duration::from_secs(1), async { + while pipe.bytes_written() != expected { + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .expect("pipe did not reach expected byte count"); + } + + #[tokio::test] + async fn upload_qos_pipe_reads_in_qos_sized_slices() { + let cancel = CancellationToken::new(); + let fairness = TcpUploadFairness::new(cancel.child_token()); + let upload_flow = fairness.register(); + let (mut source_write, source_read) = tokio::io::duplex(TCP_UPLOAD_QOS_SLICE_SIZE * 4); + let (sink_write, mut sink_read) = tokio::io::duplex(TCP_UPLOAD_QOS_SLICE_SIZE); + + let pipe = TcpPipe::new_with_stats_and_upload_flow( + cancel.clone(), + source_read, + sink_write, + None, + PipeDirection::OriginToTunnel, + Some(upload_flow), + ); + + let payload = vec![7u8; TCP_UPLOAD_QOS_SLICE_SIZE * 2]; + let write_task = tokio::spawn(async move { + source_write + .write_all(&payload) + .await + .expect("source write should succeed"); + }); + + wait_for_bytes(&pipe, TCP_UPLOAD_QOS_SLICE_SIZE as u64).await; + + let mut read_buffer = vec![0u8; TCP_UPLOAD_QOS_SLICE_SIZE]; + sink_read + .read_exact(&mut read_buffer) + .await + .expect("sink read should succeed"); + + wait_for_bytes(&pipe, (TCP_UPLOAD_QOS_SLICE_SIZE * 2) as u64).await; + write_task.await.expect("source writer should finish"); + + cancel.cancel(); + } +} diff --git a/packages/agent_core/src/network/tcp/tcp_upload_qos.rs b/packages/agent_core/src/network/tcp/tcp_upload_qos.rs new file mode 100644 index 00000000..2a9abe67 --- /dev/null +++ b/packages/agent_core/src/network/tcp/tcp_upload_qos.rs @@ -0,0 +1,280 @@ +use std::{ + collections::{HashMap, VecDeque}, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, +}; + +use tokio::sync::{mpsc, oneshot}; +use tokio_util::sync::CancellationToken; + +pub(super) const TCP_UPLOAD_QOS_SLICE_SIZE: usize = 2048; +const TCP_UPLOAD_QOS_CHANNEL_SIZE: usize = 1024; + +#[derive(Clone)] +pub(super) struct TcpUploadFairness { + events_tx: mpsc::Sender, + next_flow_id: Arc, +} + +pub(super) struct TcpUploadFlow { + id: u64, + events_tx: mpsc::Sender, +} + +struct PendingRequest { + grant: oneshot::Sender, +} + +struct FlowState { + pending: Option, + queued: bool, +} + +enum Event { + Request { + flow_id: u64, + _bytes: usize, + grant: oneshot::Sender, + }, + Unregister { + flow_id: u64, + }, +} + +struct Worker { + events: mpsc::Receiver, + cancel: CancellationToken, + flows: HashMap, + ready: VecDeque, +} + +impl TcpUploadFairness { + pub(super) fn new(cancel: CancellationToken) -> Self { + let (events_tx, events_rx) = mpsc::channel(TCP_UPLOAD_QOS_CHANNEL_SIZE); + + tokio::spawn( + Worker { + events: events_rx, + cancel, + flows: HashMap::new(), + ready: VecDeque::new(), + } + .start(), + ); + + Self { + events_tx, + next_flow_id: Arc::new(AtomicU64::new(1)), + } + } + + pub(super) fn register(&self) -> TcpUploadFlow { + TcpUploadFlow { + id: self.next_flow_id.fetch_add(1, Ordering::Relaxed), + events_tx: self.events_tx.clone(), + } + } +} + +impl TcpUploadFlow { + pub(super) async fn acquire(&self, bytes: usize, cancel: &CancellationToken) -> bool { + if bytes == 0 { + return true; + } + + let (grant_tx, grant_rx) = oneshot::channel(); + let event = Event::Request { + flow_id: self.id, + _bytes: bytes, + grant: grant_tx, + }; + + let send_res = tokio::select! { + _ = cancel.cancelled() => return false, + res = self.events_tx.send(event) => res, + }; + + if send_res.is_err() { + return false; + } + + tokio::select! { + _ = cancel.cancelled() => false, + res = grant_rx => res.unwrap_or(false), + } + } +} + +impl Drop for TcpUploadFlow { + fn drop(&mut self) { + let _ = self + .events_tx + .try_send(Event::Unregister { flow_id: self.id }); + } +} + +impl Worker { + async fn start(mut self) { + loop { + let event = tokio::select! { + _ = self.cancel.cancelled() => break, + event = self.events.recv() => { + let Some(event) = event else { + break; + }; + event + } + }; + + self.handle_event(event); + + while let Ok(event) = self.events.try_recv() { + self.handle_event(event); + } + + while self.grant_next() { + while let Ok(event) = self.events.try_recv() { + self.handle_event(event); + } + } + } + + for flow in self.flows.into_values() { + if let Some(pending) = flow.pending { + let _ = pending.grant.send(false); + } + } + } + + fn handle_event(&mut self, event: Event) { + match event { + Event::Request { flow_id, grant, .. } => { + let flow = self.flows.entry(flow_id).or_insert(FlowState { + pending: None, + queued: false, + }); + + if flow.pending.is_some() { + let _ = grant.send(false); + return; + } + + flow.pending = Some(PendingRequest { grant }); + + if !flow.queued { + flow.queued = true; + self.ready.push_back(flow_id); + } + } + Event::Unregister { flow_id } => { + if let Some(flow) = self.flows.remove(&flow_id) { + if let Some(pending) = flow.pending { + let _ = pending.grant.send(false); + } + } + } + } + } + + fn grant_next(&mut self) -> bool { + while let Some(flow_id) = self.ready.pop_front() { + let Some(flow) = self.flows.get_mut(&flow_id) else { + continue; + }; + + flow.queued = false; + + let Some(pending) = flow.pending.take() else { + continue; + }; + + if pending.grant.send(true).is_ok() { + return true; + } + } + + false + } +} + +#[cfg(test)] +mod tests { + use tokio::time::{Duration, timeout}; + + use super::*; + + #[tokio::test] + async fn grants_are_round_robin_when_two_flows_are_pending() { + let cancel = CancellationToken::new(); + let fairness = TcpUploadFairness::new(cancel.clone()); + let first = fairness.register(); + let second = fairness.register(); + + let first_acquire = first.acquire(128, &cancel); + let second_acquire = second.acquire(128, &cancel); + + let (first_granted, second_granted) = tokio::join!(first_acquire, second_acquire); + + assert!(first_granted); + assert!(second_granted); + + assert!(first.acquire(128, &cancel).await); + assert!(second.acquire(128, &cancel).await); + } + + #[tokio::test] + async fn single_active_flow_receives_all_grants() { + let cancel = CancellationToken::new(); + let fairness = TcpUploadFairness::new(cancel.clone()); + let flow = fairness.register(); + + for _ in 0..8 { + assert!(flow.acquire(128, &cancel).await); + } + } + + #[tokio::test] + async fn dropped_flow_removes_pending_request() { + let cancel = CancellationToken::new(); + let fairness = TcpUploadFairness::new(cancel.clone()); + let dropped = fairness.register(); + let active = fairness.register(); + + let (grant_tx, grant_rx) = oneshot::channel(); + dropped + .events_tx + .send(Event::Request { + flow_id: dropped.id, + _bytes: 128, + grant: grant_tx, + }) + .await + .expect("scheduler should accept request"); + + drop(grant_rx); + drop(dropped); + + assert!( + timeout(Duration::from_secs(1), active.acquire(128, &cancel)) + .await + .expect("active flow should not be blocked by dropped flow") + ); + } + + #[tokio::test] + async fn cancellation_stops_pending_acquires() { + let cancel = CancellationToken::new(); + let fairness = TcpUploadFairness::new(cancel.clone()); + let flow = fairness.register(); + + cancel.cancel(); + + assert!( + !timeout(Duration::from_secs(1), flow.acquire(128, &cancel)) + .await + .expect("cancelled acquire should complete") + ); + } +} From 17c6bc772faf981ec3d1cb2584839670c3f3e736 Mon Sep 17 00:00:00 2001 From: Patrick Lorio Date: Fri, 15 May 2026 17:03:51 -0700 Subject: [PATCH 2/2] Share upload QoS across TCP and UDP - Move upload fairness into shared network module - Apply QoS to UDP origin traffic and update callers - Rename TCP-specific QoS types and keep tests aligned --- packages/agent_core/src/network/mod.rs | 1 + packages/agent_core/src/network/tcp/mod.rs | 1 - .../agent_core/src/network/tcp/tcp_client.rs | 10 +++--- .../agent_core/src/network/tcp/tcp_clients.rs | 6 ++-- .../agent_core/src/network/tcp/tcp_pipe.rs | 24 +++++++------- .../agent_core/src/network/udp/udp_clients.rs | 28 +++++++++++++--- .../{tcp/tcp_upload_qos.rs => upload_qos.rs} | 32 +++++++++---------- packages/agent_core/src/playit_agent.rs | 7 +++- .../tests/udp_tunnel_integration.rs | 18 +++++++++++ 9 files changed, 83 insertions(+), 44 deletions(-) rename packages/agent_core/src/network/{tcp/tcp_upload_qos.rs => upload_qos.rs} (89%) diff --git a/packages/agent_core/src/network/mod.rs b/packages/agent_core/src/network/mod.rs index ca30779d..1dbcaf89 100644 --- a/packages/agent_core/src/network/mod.rs +++ b/packages/agent_core/src/network/mod.rs @@ -4,3 +4,4 @@ pub mod origin_lookup; pub mod proxy_protocol; pub mod tcp; pub mod udp; +pub mod upload_qos; diff --git a/packages/agent_core/src/network/tcp/mod.rs b/packages/agent_core/src/network/tcp/mod.rs index 4349daa9..7835a8f1 100644 --- a/packages/agent_core/src/network/tcp/mod.rs +++ b/packages/agent_core/src/network/tcp/mod.rs @@ -3,4 +3,3 @@ pub mod tcp_clients; pub mod tcp_errors; pub mod tcp_pipe; pub mod tcp_settings; -mod tcp_upload_qos; diff --git a/packages/agent_core/src/network/tcp/tcp_client.rs b/packages/agent_core/src/network/tcp/tcp_client.rs index 1f464568..a948faea 100644 --- a/packages/agent_core/src/network/tcp/tcp_client.rs +++ b/packages/agent_core/src/network/tcp/tcp_client.rs @@ -4,10 +4,8 @@ use tokio_util::sync::CancellationToken; use crate::stats::AgentStats; -use super::{ - tcp_pipe::{PipeDirection, TcpPipe}, - tcp_upload_qos::TcpUploadFairness, -}; +use super::tcp_pipe::{PipeDirection, TcpPipe}; +use crate::network::upload_qos::UploadFairness; pub struct TcpClient { tunn_to_origin: TcpPipe, @@ -31,7 +29,7 @@ impl TcpClient { tunn: TcpStream, origin: TcpStream, stats: Option, - upload_fairness: TcpUploadFairness, + upload_fairness: UploadFairness, ) -> Self { Self::create_with_stats_and_upload_flow(tunn, origin, stats, Some(upload_fairness)).await } @@ -40,7 +38,7 @@ impl TcpClient { tunn: TcpStream, origin: TcpStream, stats: Option, - upload_fairness: Option, + upload_fairness: Option, ) -> Self { let (tunn_read, tunn_write) = tunn.into_split(); let (origin_read, origin_write) = origin.into_split(); diff --git a/packages/agent_core/src/network/tcp/tcp_clients.rs b/packages/agent_core/src/network/tcp/tcp_clients.rs index f5a717de..f9885fce 100644 --- a/packages/agent_core/src/network/tcp/tcp_clients.rs +++ b/packages/agent_core/src/network/tcp/tcp_clients.rs @@ -15,6 +15,7 @@ use tokio_util::sync::CancellationToken; use crate::{ network::{ lan_address::LanAddress, origin_lookup::OriginLookup, proxy_protocol::ProxyProtocolHeader, + upload_qos::UploadFairness, }, stats::AgentStats, utils::now_milli, @@ -24,7 +25,6 @@ use super::{ tcp_client::{TcpClient, TcpClientStat}, tcp_errors::tcp_errors, tcp_settings::TcpSettings, - tcp_upload_qos::TcpUploadFairness, }; fn build_quota(settings: &TcpSettings) -> Quota { @@ -53,7 +53,7 @@ struct Worker { cancel: CancellationToken, settings: TcpSettings, stats: AgentStats, - upload_fairness: TcpUploadFairness, + upload_fairness: UploadFairness, clients: Vec, next_client_id: u64, @@ -112,11 +112,11 @@ impl TcpClients { lookup: Arc, stats: AgentStats, cancel: CancellationToken, + upload_fairness: UploadFairness, ) -> Self { let quota = build_quota(&settings); let (events_tx, events_rx) = channel(1024); let worker_cancel = cancel.child_token(); - let upload_fairness = TcpUploadFairness::new(worker_cancel.child_token()); tokio::spawn( Worker { diff --git a/packages/agent_core/src/network/tcp/tcp_pipe.rs b/packages/agent_core/src/network/tcp/tcp_pipe.rs index c0a159ae..501dc7e3 100644 --- a/packages/agent_core/src/network/tcp/tcp_pipe.rs +++ b/packages/agent_core/src/network/tcp/tcp_pipe.rs @@ -9,7 +9,7 @@ use tokio_util::sync::CancellationToken; use crate::stats::AgentStats; use crate::utils::now_milli; -use super::tcp_upload_qos::{TCP_UPLOAD_QOS_SLICE_SIZE, TcpUploadFlow}; +use crate::network::upload_qos::{UPLOAD_QOS_SLICE_SIZE, UploadFlow}; const TCP_PIPE_BUFFER_SIZE: usize = 16 * 1024; @@ -73,7 +73,7 @@ impl TcpPipe { to: W, stats: Option, direction: PipeDirection, - upload_flow: Option, + upload_flow: Option, ) -> Self { let shared = Arc::new(Shared { last_activity: AtomicU64::new(now_milli()), @@ -130,13 +130,13 @@ struct Worker { to: W, stats: Option, direction: PipeDirection, - upload_flow: Option, + upload_flow: Option, } impl Worker { pub async fn start(mut self) { let buffer_size = if self.upload_flow.is_some() { - TCP_UPLOAD_QOS_SLICE_SIZE + UPLOAD_QOS_SLICE_SIZE } else { TCP_PIPE_BUFFER_SIZE }; @@ -224,7 +224,7 @@ mod tests { }; use super::*; - use crate::network::tcp::tcp_upload_qos::{TCP_UPLOAD_QOS_SLICE_SIZE, TcpUploadFairness}; + use crate::network::upload_qos::{UPLOAD_QOS_SLICE_SIZE, UploadFairness}; async fn wait_for_bytes(pipe: &TcpPipe, expected: u64) { timeout(Duration::from_secs(1), async { @@ -239,10 +239,10 @@ mod tests { #[tokio::test] async fn upload_qos_pipe_reads_in_qos_sized_slices() { let cancel = CancellationToken::new(); - let fairness = TcpUploadFairness::new(cancel.child_token()); + let fairness = UploadFairness::new(cancel.child_token()); let upload_flow = fairness.register(); - let (mut source_write, source_read) = tokio::io::duplex(TCP_UPLOAD_QOS_SLICE_SIZE * 4); - let (sink_write, mut sink_read) = tokio::io::duplex(TCP_UPLOAD_QOS_SLICE_SIZE); + let (mut source_write, source_read) = tokio::io::duplex(UPLOAD_QOS_SLICE_SIZE * 4); + let (sink_write, mut sink_read) = tokio::io::duplex(UPLOAD_QOS_SLICE_SIZE); let pipe = TcpPipe::new_with_stats_and_upload_flow( cancel.clone(), @@ -253,7 +253,7 @@ mod tests { Some(upload_flow), ); - let payload = vec![7u8; TCP_UPLOAD_QOS_SLICE_SIZE * 2]; + let payload = vec![7u8; UPLOAD_QOS_SLICE_SIZE * 2]; let write_task = tokio::spawn(async move { source_write .write_all(&payload) @@ -261,15 +261,15 @@ mod tests { .expect("source write should succeed"); }); - wait_for_bytes(&pipe, TCP_UPLOAD_QOS_SLICE_SIZE as u64).await; + wait_for_bytes(&pipe, UPLOAD_QOS_SLICE_SIZE as u64).await; - let mut read_buffer = vec![0u8; TCP_UPLOAD_QOS_SLICE_SIZE]; + let mut read_buffer = vec![0u8; UPLOAD_QOS_SLICE_SIZE]; sink_read .read_exact(&mut read_buffer) .await .expect("sink read should succeed"); - wait_for_bytes(&pipe, (TCP_UPLOAD_QOS_SLICE_SIZE * 2) as u64).await; + wait_for_bytes(&pipe, (UPLOAD_QOS_SLICE_SIZE * 2) as u64).await; write_task.await.expect("source writer should finish"); cancel.cancel(); diff --git a/packages/agent_core/src/network/udp/udp_clients.rs b/packages/agent_core/src/network/udp/udp_clients.rs index d99184cf..1858ccce 100644 --- a/packages/agent_core/src/network/udp/udp_clients.rs +++ b/packages/agent_core/src/network/udp/udp_clients.rs @@ -15,9 +15,13 @@ use tokio::{ net::UdpSocket, sync::mpsc::{Receiver, channel}, }; +use tokio_util::sync::CancellationToken; use crate::network::{ - lan_address::LanAddress, origin_lookup::OriginLookup, proxy_protocol::ProxyProtocolHeader, + lan_address::LanAddress, + origin_lookup::OriginLookup, + proxy_protocol::ProxyProtocolHeader, + upload_qos::{UploadFairness, UploadFlow}, }; use crate::stats::AgentStats; use playit_agent_proto::udp_proto::UdpFlow; @@ -40,6 +44,8 @@ pub struct UdpClients { new_client_limiter: DefaultDirectRateLimiter, stats: AgentStats, + cancel: CancellationToken, + upload_fairness: UploadFairness, } struct Client { @@ -49,6 +55,7 @@ struct Client { target_addr: SocketAddr, port_offset: u16, flow: UdpFlow, + upload_flow: UploadFlow, /* when dropped, rx task get killed */ receiver: UdpReceiver, @@ -100,6 +107,8 @@ impl UdpClients { lookup: Arc, packets: Packets, stats: AgentStats, + cancel: CancellationToken, + upload_fairness: UploadFairness, ) -> Self { let (origin_tx, origin_rx) = channel(2048); @@ -115,6 +124,8 @@ impl UdpClients { rx: origin_rx, new_client_limiter: RateLimiter::direct(build_quota(&settings)), stats, + cancel, + upload_fairness, } } @@ -186,10 +197,6 @@ impl UdpClients { client.from_origin_ts = now_ms; - // Track bytes going out (from origin to tunnel) - let packet_len = packet.packet.len() as u64; - self.stats.add_bytes_out(packet_len); - let mut flow = client.flow; match &mut flow { UdpFlow::V4 { @@ -211,6 +218,16 @@ impl UdpClients { _ => unreachable!(), } + let packet_len = packet.packet.len(); + let upload_len = packet_len + flow.footer_len(); + if !client.upload_flow.acquire(upload_len, &self.cancel).await { + tracing::info!("UDP upload QoS acquire failed"); + return None; + } + + // Track bytes going out (from origin to tunnel) + self.stats.add_bytes_out(packet_len as u64); + Some((flow, packet.packet)) } @@ -324,6 +341,7 @@ impl UdpClients { port_offset: extension.port_offset, receiver, flow: client_flow, + upload_flow: self.upload_fairness.register(), from_tunnel_ts: now_ms, from_origin_ts: now_ms, }; diff --git a/packages/agent_core/src/network/tcp/tcp_upload_qos.rs b/packages/agent_core/src/network/upload_qos.rs similarity index 89% rename from packages/agent_core/src/network/tcp/tcp_upload_qos.rs rename to packages/agent_core/src/network/upload_qos.rs index 2a9abe67..2490e4ad 100644 --- a/packages/agent_core/src/network/tcp/tcp_upload_qos.rs +++ b/packages/agent_core/src/network/upload_qos.rs @@ -9,16 +9,16 @@ use std::{ use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; -pub(super) const TCP_UPLOAD_QOS_SLICE_SIZE: usize = 2048; -const TCP_UPLOAD_QOS_CHANNEL_SIZE: usize = 1024; +pub(crate) const UPLOAD_QOS_SLICE_SIZE: usize = 2048; +const UPLOAD_QOS_CHANNEL_SIZE: usize = 1024; #[derive(Clone)] -pub(super) struct TcpUploadFairness { +pub struct UploadFairness { events_tx: mpsc::Sender, next_flow_id: Arc, } -pub(super) struct TcpUploadFlow { +pub(crate) struct UploadFlow { id: u64, events_tx: mpsc::Sender, } @@ -50,9 +50,9 @@ struct Worker { ready: VecDeque, } -impl TcpUploadFairness { - pub(super) fn new(cancel: CancellationToken) -> Self { - let (events_tx, events_rx) = mpsc::channel(TCP_UPLOAD_QOS_CHANNEL_SIZE); +impl UploadFairness { + pub fn new(cancel: CancellationToken) -> Self { + let (events_tx, events_rx) = mpsc::channel(UPLOAD_QOS_CHANNEL_SIZE); tokio::spawn( Worker { @@ -70,16 +70,16 @@ impl TcpUploadFairness { } } - pub(super) fn register(&self) -> TcpUploadFlow { - TcpUploadFlow { + pub(crate) fn register(&self) -> UploadFlow { + UploadFlow { id: self.next_flow_id.fetch_add(1, Ordering::Relaxed), events_tx: self.events_tx.clone(), } } } -impl TcpUploadFlow { - pub(super) async fn acquire(&self, bytes: usize, cancel: &CancellationToken) -> bool { +impl UploadFlow { + pub(crate) async fn acquire(&self, bytes: usize, cancel: &CancellationToken) -> bool { if bytes == 0 { return true; } @@ -107,7 +107,7 @@ impl TcpUploadFlow { } } -impl Drop for TcpUploadFlow { +impl Drop for UploadFlow { fn drop(&mut self) { let _ = self .events_tx @@ -208,7 +208,7 @@ mod tests { #[tokio::test] async fn grants_are_round_robin_when_two_flows_are_pending() { let cancel = CancellationToken::new(); - let fairness = TcpUploadFairness::new(cancel.clone()); + let fairness = UploadFairness::new(cancel.clone()); let first = fairness.register(); let second = fairness.register(); @@ -227,7 +227,7 @@ mod tests { #[tokio::test] async fn single_active_flow_receives_all_grants() { let cancel = CancellationToken::new(); - let fairness = TcpUploadFairness::new(cancel.clone()); + let fairness = UploadFairness::new(cancel.clone()); let flow = fairness.register(); for _ in 0..8 { @@ -238,7 +238,7 @@ mod tests { #[tokio::test] async fn dropped_flow_removes_pending_request() { let cancel = CancellationToken::new(); - let fairness = TcpUploadFairness::new(cancel.clone()); + let fairness = UploadFairness::new(cancel.clone()); let dropped = fairness.register(); let active = fairness.register(); @@ -266,7 +266,7 @@ mod tests { #[tokio::test] async fn cancellation_stops_pending_acquires() { let cancel = CancellationToken::new(); - let fairness = TcpUploadFairness::new(cancel.clone()); + let fairness = UploadFairness::new(cancel.clone()); let flow = fairness.register(); cancel.cancel(); diff --git a/packages/agent_core/src/playit_agent.rs b/packages/agent_core/src/playit_agent.rs index 29cc2fe3..308bc852 100644 --- a/packages/agent_core/src/playit_agent.rs +++ b/packages/agent_core/src/playit_agent.rs @@ -19,6 +19,7 @@ use crate::network::udp::packets::Packets; use crate::network::udp::udp_channel::UdpChannel; use crate::network::udp::udp_clients::UdpClients; use crate::network::udp::udp_settings::UdpSettings; +use crate::network::upload_qos::UploadFairness; use crate::stats::AgentStats; use crate::utils::now_milli; @@ -57,18 +58,22 @@ impl PlayitAgent { .map_err(SetupError::IoError)?; let stats = AgentStats::new(); + let cancel_token = CancellationToken::new(); + let upload_fairness = UploadFairness::new(cancel_token.child_token()); let udp_clients = UdpClients::new( settings.udp_settings, lookup.clone(), origin_packets, stats.clone(), + cancel_token.child_token(), + upload_fairness.clone(), ); - let cancel_token = CancellationToken::new(); let tcp_clients = TcpClients::new( settings.tcp_settings, lookup.clone(), stats.clone(), cancel_token.child_token(), + upload_fairness, ); Ok(PlayitAgent { diff --git a/packages/agent_core/tests/udp_tunnel_integration.rs b/packages/agent_core/tests/udp_tunnel_integration.rs index e9c99c1f..fadc56db 100644 --- a/packages/agent_core/tests/udp_tunnel_integration.rs +++ b/packages/agent_core/tests/udp_tunnel_integration.rs @@ -14,6 +14,7 @@ use playit_agent_core::{ packets::Packets, udp_channel::UdpChannel, udp_clients::UdpClients, udp_settings::UdpSettings, }, + upload_qos::UploadFairness, }, stats::AgentStats, }; @@ -23,6 +24,7 @@ use playit_agent_proto::{ udp_proto::{UDP_CHANNEL_ESTABLISH_ID, UdpFlow, UdpFlowExtension}, }; use tokio::{net::UdpSocket, time::timeout}; +use tokio_util::sync::CancellationToken; const TEST_TIMEOUT: Duration = Duration::from_secs(3); const STRESS_TIMEOUT: Duration = Duration::from_secs(30); @@ -57,11 +59,15 @@ async fn encapsulated_udp_tunnel_relays_in_both_directions_and_recovers_same_flo .await; let stats = AgentStats::new(); + let cancel = CancellationToken::new(); + let upload_fairness = UploadFairness::new(cancel.child_token()); let mut udp_clients = UdpClients::new( UdpSettings::default(), lookup, Packets::new(64), stats.clone(), + cancel, + upload_fairness, ); let mut udp_channel = UdpChannel::new(Packets::new(64)) .await @@ -188,11 +194,15 @@ async fn encapsulated_udp_tunnel_supports_ipv6_origin_addresses() { .await; let stats = AgentStats::new(); + let cancel = CancellationToken::new(); + let upload_fairness = UploadFairness::new(cancel.child_token()); let mut udp_clients = UdpClients::new( UdpSettings::default(), lookup, Packets::new(64), stats.clone(), + cancel, + upload_fairness, ); let mut udp_channel = UdpChannel::new(Packets::new(64)) .await @@ -278,11 +288,15 @@ async fn encapsulated_udp_tunnel_isolates_multiple_parallel_flows_and_recovers_t .await; let stats = AgentStats::new(); + let cancel = CancellationToken::new(); + let upload_fairness = UploadFairness::new(cancel.child_token()); let mut udp_clients = UdpClients::new( UdpSettings::default(), lookup, Packets::new(128), stats.clone(), + cancel, + upload_fairness, ); let mut udp_channel = UdpChannel::new(Packets::new(128)) .await @@ -388,11 +402,15 @@ async fn udp_tunnel_stress_reports_bitrate_by_packet_size() { .await; let stats = AgentStats::new(); + let cancel = CancellationToken::new(); + let upload_fairness = UploadFairness::new(cancel.child_token()); let mut udp_clients = UdpClients::new( UdpSettings::default(), lookup, Packets::new(4096), stats.clone(), + cancel, + upload_fairness, ); let mut udp_channel = UdpChannel::new(Packets::new(4096)) .await