diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cfd8e6..cd60424 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,17 +8,15 @@ In this document, all notable changes are listed, including bug fixes, breaking - breaking: `NetworkStats::kbps_sent` has been removed; `network_stats()` now reports queue length, RTT, and frame-advantage data only ### Bug fixes +- fix: lockstep mode (`max_prediction = 0`) now requires only one-way latency worth of input delay instead of round-trip; `advance_frame` now unconditionally advances the input queue (keeping packets flowing) while the game frame only steps forward when all players have confirmed inputs, so a single network one-way trip is sufficient to confirm and advance each game frame (fixes [#116](https://github.com/gschup/ggrs/issues/116)) +- fix: desync detection no longer panics with sparse saving when the configured checksum interval frame was not saved exactly (fixes [#107](https://github.com/gschup/ggrs/issues/107)) - fix: malformed input packets are now discarded with warnings instead of panicking when connection status lengths, start frames, compressed payloads, or decoded player input shapes are invalid -- fix: input packets with a start frame ahead of the last received frame are no longer rejected; the gap check was incorrectly blocking the first packet from a peer using input delay, freezing the session -- fix: increasing input delay mid-session now sends generated gap-fill inputs to remote peers, preventing sessions from freezing after the delay change -- fix: input delay changes now work when multiple local players share one endpoint; outgoing local inputs are queued by effective frame until every local player has input for that frame +- fix: increasing input delay mid-session now sends generated gap-fill inputs to remote peers, and the first outgoing packet from a peer using input delay is no longer incorrectly rejected; both issues would freeze the session +- fix: input delay changes now work when multiple local players share one endpoint; outgoing local inputs are queued by effective frame until every local player has input for that frame, and the queue correctly finds the first complete frame rather than assuming the earliest buffered frame is complete - fix: `SessionBuilder::with_num_players()` now revalidates already-registered handles, so changing the player count after adding players cannot leave invalid local, remote, or spectator handles in the builder - fix: `SessionBuilder::start_p2p_session()` now rejects `DesyncDetection::On { interval: 0 }`, which cannot produce a valid checksum reporting cadence -- fix: spectator catch-up no longer attempts to read past `SPECTATOR_BUFFER_SIZE` frames when the spectator is very far behind; `frames_to_advance` is now also bounded by the buffer capacity -- fix: spectator catch-up now caps the number of frames advanced to the number of confirmed frames available from the host, so `catchup_speed` can safely exceed `max_frames_behind` +- fix: spectator catch-up no longer attempts to read past `SPECTATOR_BUFFER_SIZE` frames when the spectator is very far behind, and now caps the number of frames advanced to the number of confirmed frames available from the host so `catchup_speed` can safely exceed `max_frames_behind` - fix: `P2PSession` no longer leaks memory in all-local-player sessions; outgoing inputs were queued into an unbounded buffer that was never drained when there were no remote peers -- fix: `P2PSession` with multiple local players at different input delays no longer stalls on the first frame; the outgoing queue now scans for the first complete frame rather than assuming the earliest buffered frame has inputs from all players -- fix: desync detection no longer panics with sparse saving when the configured checksum interval frame was not saved exactly (fixes [#107](https://github.com/gschup/ggrs/issues/107)) ### Improvements - feat: `P2PSession::set_input_delay()` allows changing the input delay for a local player mid-session (closes [#106](https://github.com/gschup/ggrs/issues/106)) diff --git a/docs/main-loop.md b/docs/main-loop.md index 6f3560b..6a963d3 100644 --- a/docs/main-loop.md +++ b/docs/main-loop.md @@ -70,9 +70,12 @@ while time_since_last_frame >= fps_delta { for request in requests { handle_ggrs_request(request); } + // In lockstep mode, requests may be empty — remote inputs have not + // arrived yet. This is not an error; the session will advance on the + // next tick once poll_remote_clients delivers the missing inputs. } Err(GgrsError::PredictionThreshold) => { - // remote peer is too far behind; skip this frame + // remote peer is too far behind; skip this frame (rollback mode only) } Err(e) => return Err(e), } diff --git a/docs/sessions.md b/docs/sessions.md index a3d1200..15e4925 100644 --- a/docs/sessions.md +++ b/docs/sessions.md @@ -69,8 +69,8 @@ let mut session = SessionBuilder::::new() |---|---|---| | `with_num_players(n)` | 2 | Total number of players (not counting spectators). Must be at least 1. Revalidates already-added handles. | | `with_fps(fps)` | 60 | Expected update frequency. Used for frame synchronization heuristics. | -| `with_input_delay(n)` | 0 | Frames of artificial delay applied to local input. Reduces rollbacks at the cost of added latency. | -| `with_max_prediction_window(n)` | 8 | Maximum frames GGRS will predict ahead. Set to `0` for lockstep mode (no rollbacks, no prediction). | +| `with_input_delay(n)` | 0 | Frames of artificial delay applied to local input. Reduces rollbacks in rollback mode. In lockstep mode, set this to at least `ceil(one_way_latency_in_frames)` so remote inputs arrive before they are needed. | +| `with_max_prediction_window(n)` | 8 | Maximum frames GGRS will predict ahead. Set to `0` for lockstep mode: no prediction, no rollbacks, game stalls until all remote inputs are confirmed. | | `with_sparse_saving_mode(bool)` | false | Only save state at the last confirmed frame. See [Sparse Saving](sparse-saving.md). | | `with_desync_detection_mode(mode)` | Off | Enable checksum-based desync detection. `DesyncDetection::On` requires an interval higher than 0. See [`DesyncDetection`](https://docs.rs/ggrs/latest/ggrs/enum.DesyncDetection.html). | | `with_disconnect_timeout(duration)` | 2s | How long without packets before a remote peer is disconnected. | diff --git a/src/input_queue.rs b/src/input_queue.rs index 2aa6ef4..92e839f 100644 --- a/src/input_queue.rs +++ b/src/input_queue.rs @@ -68,6 +68,10 @@ impl InputQueue { self.first_incorrect_frame } + pub(crate) fn frame_delay(&self) -> usize { + self.frame_delay + } + /// Changes the frame delay and returns any fill inputs that were implicitly added to bridge the /// gap. The caller is responsible for sending these to remote peers so they see consecutive /// frame numbers. diff --git a/src/sessions/builder.rs b/src/sessions/builder.rs index 8fd7fa3..5cd29f1 100644 --- a/src/sessions/builder.rs +++ b/src/sessions/builder.rs @@ -144,16 +144,14 @@ impl SessionBuilder { /// /// ## Lockstep mode /// - /// As a special case, if you set this to 0, GGRS will run in lockstep mode: - /// * ggrs will only request that you advance the gamestate if the current frame has inputs - /// confirmed from all other clients. - /// * ggrs will never request you to save or roll back the gamestate. + /// Setting this to 0 enables lockstep mode: + /// - `AdvanceFrame` is only emitted once all remote inputs for the current frame are confirmed. + /// - `SaveGameState` and `LoadGameState` are never emitted — no rollback occurs. /// - /// Lockstep mode can significantly reduce the (GGRS) framerate of your game, but may be - /// appropriate for games where a GGRS frame does not correspond to a rendered frame, such as a - /// game where GGRS frames are only advanced once a second; with input delay set to zero, the - /// framerate impact is approximately equivalent to taking the highest latency client and adding - /// its latency to the current time to tick a frame. + /// In lockstep mode the game stalls until remote inputs arrive, so smooth playback requires + /// enough input delay to cover the one-way network trip. Set + /// `with_input_delay(ceil(one_way_latency_in_frames))` — roughly half the round-trip time. + /// The semantics match rollback mode: input pressed on frame `F` takes effect on frame `F + D`. pub fn with_max_prediction_window(mut self, window: usize) -> Self { self.max_prediction = window; self @@ -161,10 +159,13 @@ impl SessionBuilder { /// Change the amount of frames GGRS will delay the inputs for local players. Default is 0. /// - /// Adding a small amount of input delay (typically 2–4 frames) reduces the number of rollbacks - /// by giving remote inputs time to arrive before they are needed. The trade-off is a small - /// but constant increase in perceived input latency. This is usually preferable to frequent - /// rollbacks at higher network latencies. + /// In rollback mode, input delay (typically 2–4 frames) reduces rollbacks by letting remote + /// inputs arrive before they are needed, at the cost of constant perceived latency. + /// + /// In lockstep mode, input delay determines how far ahead the input queue runs relative to + /// the confirmed game frame. Set this to at least `ceil(one_way_latency_in_frames)` — + /// roughly half the round-trip time — so remote inputs arrive before the game needs them + /// and the session can advance without stalling. /// /// There is no enforced upper bound, but values above ~8 frames will produce noticeable /// input lag. Setting this higher than `max_prediction_window` is not recommended — diff --git a/src/sessions/p2p_session.rs b/src/sessions/p2p_session.rs index a5ee15f..db34654 100644 --- a/src/sessions/p2p_session.rs +++ b/src/sessions/p2p_session.rs @@ -7,8 +7,8 @@ use crate::sessions::builder::MAX_EVENT_QUEUE_SIZE; use crate::sync_layer::SyncLayer; use crate::DesyncDetection; use crate::{ - network::protocol::Event, Config, Frame, GgrsEvent, GgrsRequest, NonBlockingSocket, - PlayerHandle, PlayerType, SessionState, NULL_FRAME, + network::protocol::Event, Config, Frame, GgrsEvent, GgrsRequest, InputStatus, + NonBlockingSocket, PlayerHandle, PlayerType, SessionState, NULL_FRAME, }; use tracing::{debug, trace, warn}; @@ -124,6 +124,11 @@ where /// If we receive a disconnect from another client, we have to rollback from that frame on in order to prevent wrong predictions disconnect_frame: Frame, + /// In lockstep mode, the game frame that still needs to be confirmed and emitted. + /// Starts at 0 and increments each time an AdvanceFrame request is pushed to the user. + /// Separate from sync_layer.current_frame(), which advances unconditionally every call. + lockstep_game_frame: Frame, + /// Internal State of the Session. state: SessionState, @@ -216,6 +221,7 @@ impl P2PSession { frames_ahead: 0, sync_layer, disconnect_frame: NULL_FRAME, + lockstep_game_frame: 0, player_reg: players, event_queue: VecDeque::new(), pending_local_inputs: HashMap::new(), @@ -257,17 +263,27 @@ impl P2PSession { Ok(()) } - /// You should call this to notify GGRS that you are ready to advance your gamestate by a single frame. - /// Returns an order-sensitive [`Vec`]. You should fulfill all requests in the exact order they are provided. - /// Failure to do so will cause panics later. + /// Advance the session by one frame. Returns an order-sensitive [`Vec`] that + /// must be fulfilled in exact order. + /// + /// Internally calls [`poll_remote_clients`] before doing anything else, so you do not need to + /// call it separately on the same tick. + /// + /// In **lockstep mode** (`max_prediction = 0`), the returned vec may contain no + /// `AdvanceFrame` request if remote inputs have not yet arrived — the session stalls until + /// all peers confirm the current frame. Call [`poll_remote_clients`] and retry on the next + /// tick. Unlike rollback mode this is not an error; it is the normal wait behaviour. /// /// # Errors - /// - Returns [`InvalidRequest`] if the provided player handle refers to a remote player. - /// - Returns [`NotSynchronized`] if the session is not yet ready to accept input. In this case, you either need to start the session or wait for synchronization between clients. + /// - Returns [`InvalidRequest`] if a local input is missing for any registered local player. + /// - Returns [`NotSynchronized`] if the session is not yet synchronized with remote peers. + /// - Returns [`PredictionThreshold`] (rollback mode only) if the remote peer is too far behind. /// + /// [`poll_remote_clients`]: Self::poll_remote_clients /// [`Vec`]: GgrsRequest /// [`InvalidRequest`]: GgrsError::InvalidRequest /// [`NotSynchronized`]: GgrsError::NotSynchronized + /// [`PredictionThreshold`]: GgrsError::PredictionThreshold pub fn advance_frame(&mut self) -> Result>, GgrsError> { // receive info from remote players, trigger events and send messages self.poll_remote_clients(); @@ -329,25 +345,7 @@ impl P2PSession { // check game consistency and roll back, if necessary if !lockstep { - // the disconnect frame indicates if a rollback is necessary due to a previously - // disconnected player (whose input would have been incorrectly predicted). - let first_incorrect = self - .sync_layer - .check_simulation_consistency(self.disconnect_frame); - // if we have an incorrect frame, then we need to rollback - if first_incorrect != NULL_FRAME { - self.adjust_gamestate(first_incorrect, confirmed_frame, &mut requests); - self.disconnect_frame = NULL_FRAME; - } - - // request gamestate save of current frame - let last_saved = self.sync_layer.last_saved_frame(); - if self.sparse_saving { - self.check_last_saved_state(last_saved, confirmed_frame, &mut requests); - } else { - // without sparse saving, always save the current frame after correcting and rollbacking - requests.push(self.sync_layer.save_current_state()); - } + self.handle_rollback_and_save(confirmed_frame, &mut requests); } /* @@ -372,62 +370,17 @@ impl P2PSession { * INPUTS */ - // register local inputs in the system and send them - for handle in self.player_reg.local_player_handles() { - // we have checked that these all exist - let queued_input = { - let player_input = self - .pending_local_inputs - .get_mut(&handle) - .expect("Missing local input while calling advance_frame()."); - // send the input into the sync layer - let actual_frame = self.sync_layer.add_local_input(handle, *player_input); - player_input.frame = actual_frame; - *player_input - }; - // if the input has not been dropped - if queued_input.frame != NULL_FRAME { - self.local_connect_status[handle].last_frame = queued_input.frame; - self.queue_outgoing_local_input(handle, queued_input); + if lockstep { + // In lockstep mode the input queue may be capped (remote unresponsive): only + // register inputs when the queue has room, so the user re-submits next call with + // the same frame number — matching rollback behaviour at the prediction threshold. + if !self.lockstep_input_queue_at_cap() { + self.register_local_inputs(); } - } - - self.send_ready_outgoing_inputs_to_remotes(); - - /* - * ADVANCE THE STATE - */ - - let can_advance = if lockstep { - // lockstep mode: only advance if the current frame has inputs confirmed from all other - // players. - self.sync_layer.last_confirmed_frame() == self.sync_layer.current_frame() + self.advance_lockstep_frame(&mut requests); } else { - // rollback mode: advance as long as we aren't past our prediction window - let frames_ahead = if self.sync_layer.last_confirmed_frame() == NULL_FRAME { - // we haven't had any frames confirmed, so all frames we've advanced are "ahead" - self.sync_layer.current_frame() - } else { - // we're not at the first frame, so we have to subtract the last confirmed frame - self.sync_layer.current_frame() - self.sync_layer.last_confirmed_frame() - }; - frames_ahead < self.max_prediction as i32 - }; - if can_advance { - // get correct inputs for the current frame - let inputs = self - .sync_layer - .synchronized_inputs(&self.local_connect_status); - // advance the frame count - self.sync_layer.advance_frame(); - // clear the local inputs after advancing the frame to allow new inputs to be ingested - self.pending_local_inputs.clear(); - requests.push(GgrsRequest::AdvanceFrame { inputs }); - } else { - debug!( - "Prediction Threshold reached. Skipping on frame {}", - self.sync_layer.current_frame() - ); + self.register_local_inputs(); + self.advance_rollback_frame(&mut requests); } Ok(requests) @@ -603,6 +556,32 @@ impl P2PSession { confirmed_frame } + /// Returns true if the lockstep input queue has reached its maximum depth and should not + /// accept more inputs until the remote catches up. + fn lockstep_input_queue_at_cap(&self) -> bool { + let depth = self.sync_layer.current_frame() - self.lockstep_game_frame; + let local_handles = self.player_reg.local_player_handles(); + depth > self.sync_layer.min_local_input_delay(&local_handles) as i32 + } + + /// Returns the highest frame confirmed by all remote players only. + /// In lockstep mode this is the gate for advancing a game frame: the local player's + /// status is excluded because local inputs are always registered unconditionally, + /// so including them in the min would cause a deadlock. + fn remote_confirmed_frame(&self) -> Frame { + let remote_handles = self.player_reg.remote_player_handles(); + if remote_handles.is_empty() { + // all-local session: every frame is trivially confirmed + return self.sync_layer.current_frame(); + } + remote_handles + .iter() + .filter(|&&h| !self.local_connect_status[h].disconnected) + .map(|&h| self.local_connect_status[h].last_frame) + .min() + .unwrap_or(NULL_FRAME) + } + /// Returns the current frame of a session. pub fn current_frame(&self) -> Frame { self.sync_layer.current_frame() @@ -641,6 +620,25 @@ impl P2PSession { self.player_reg.num_spectators() } + fn register_local_inputs(&mut self) { + for handle in self.player_reg.local_player_handles() { + let queued_input = { + let player_input = self + .pending_local_inputs + .get_mut(&handle) + .expect("Missing local input while calling advance_frame()."); + let actual_frame = self.sync_layer.add_local_input(handle, *player_input); + player_input.frame = actual_frame; + *player_input + }; + if queued_input.frame != NULL_FRAME { + self.local_connect_status[handle].last_frame = queued_input.frame; + self.queue_outgoing_local_input(handle, queued_input); + } + } + self.send_ready_outgoing_inputs_to_remotes(); + } + fn queue_outgoing_local_input( &mut self, player_handle: PlayerHandle, @@ -805,7 +803,107 @@ impl P2PSession { self.state = SessionState::Running; } + /// Lockstep advance: the input queue always moves forward so packets keep flowing, but + /// the game frame only steps when all remote players have confirmed it. + /// + /// `sync_layer.current_frame` (the input queue frame) advances unconditionally every call. + /// This breaks the bootstrap deadlock: with input_delay D ≥ one-way latency L, side A + /// commits frame D on its first call; the packet reaches side B after L frames, so both + /// sides can confirm game frame 0 after a single one-way trip. `confirmed_inputs` is used + /// instead of `synchronized_inputs` so no prediction ever occurs and no rollback is needed. + fn advance_lockstep_frame(&mut self, requests: &mut Vec>) { + if self.lockstep_input_queue_at_cap() { + let queue_depth = self.sync_layer.current_frame() - self.lockstep_game_frame; + warn!( + "Lockstep input queue cap reached: remote has not confirmed frame {} \ + after {} queued frames. Remote may be unresponsive.", + self.lockstep_game_frame, queue_depth, + ); + } else { + self.sync_layer.advance_frame(); + self.pending_local_inputs.clear(); + } + + let game_frame = self.lockstep_game_frame; + let remote_confirmed = self.remote_confirmed_frame(); + if remote_confirmed >= game_frame { + let inputs = self + .sync_layer + .confirmed_inputs(game_frame, &self.local_connect_status) + .into_iter() + .enumerate() + .map(|(handle, pi)| { + debug_assert_eq!( + pi.frame == NULL_FRAME, + self.local_connect_status[handle].disconnected + && self.local_connect_status[handle].last_frame < game_frame, + "confirmed_inputs returned NULL_FRAME for a connected player or \ + a real frame for a disconnected player (handle {handle})" + ); + if pi.frame == NULL_FRAME { + (pi.input, InputStatus::Disconnected) + } else { + (pi.input, InputStatus::Confirmed) + } + }) + .collect(); + self.lockstep_game_frame += 1; + requests.push(GgrsRequest::AdvanceFrame { inputs }); + } else { + debug!( + "Lockstep stall: waiting for confirmation of frame {} (confirmed up to {})", + game_frame, remote_confirmed, + ); + } + } + + /// Rollback advance: inputs are predicted and corrected on mismatch. The session may run + /// up to `max_prediction` frames ahead of the last confirmed frame. + /// NULL_FRAME for `last_confirmed_frame` is treated as 0 frames ahead so prediction can + /// start immediately from frame 0 without any prior confirmation. + fn advance_rollback_frame(&mut self, requests: &mut Vec>) { + let frames_ahead = if self.sync_layer.last_confirmed_frame() == NULL_FRAME { + self.sync_layer.current_frame() + } else { + self.sync_layer.current_frame() - self.sync_layer.last_confirmed_frame() + }; + if frames_ahead < self.max_prediction as i32 { + let inputs = self + .sync_layer + .synchronized_inputs(&self.local_connect_status); + self.sync_layer.advance_frame(); + self.pending_local_inputs.clear(); + requests.push(GgrsRequest::AdvanceFrame { inputs }); + } else { + debug!( + "Prediction Threshold reached. Skipping on frame {}", + self.sync_layer.current_frame() + ); + } + } + /// Roll back to `min_confirmed` frame and resimulate the game with most up-to-date input data. + fn handle_rollback_and_save( + &mut self, + confirmed_frame: Frame, + requests: &mut Vec>, + ) { + let first_incorrect = self + .sync_layer + .check_simulation_consistency(self.disconnect_frame); + if first_incorrect != NULL_FRAME { + self.adjust_gamestate(first_incorrect, confirmed_frame, requests); + self.disconnect_frame = NULL_FRAME; + } + + let last_saved = self.sync_layer.last_saved_frame(); + if self.sparse_saving { + self.check_last_saved_state(last_saved, confirmed_frame, requests); + } else { + requests.push(self.sync_layer.save_current_state()); + } + } + fn adjust_gamestate( &mut self, first_incorrect: Frame, diff --git a/src/sync_layer.rs b/src/sync_layer.rs index 77afd00..4360f84 100644 --- a/src/sync_layer.rs +++ b/src/sync_layer.rs @@ -219,6 +219,17 @@ impl SyncLayer { self.input_queues[player_handle].set_frame_delay(delay) } + /// Returns the minimum input delay across the given local player handles. + /// Remote queues always have delay 0 and must be excluded; only local queues + /// have a meaningful delay configured by the user. + pub(crate) fn min_local_input_delay(&self, local_handles: &[PlayerHandle]) -> usize { + local_handles + .iter() + .map(|&h| self.input_queues[h].frame_delay()) + .min() + .unwrap_or(0) + } + pub(crate) fn reset_prediction(&mut self) { for i in 0..self.num_players { self.input_queues[i].reset_prediction(); diff --git a/tests/stubs.rs b/tests/stubs.rs index 15c9e1f..58dbc8b 100644 --- a/tests/stubs.rs +++ b/tests/stubs.rs @@ -224,6 +224,34 @@ pub fn make_p2p_sessions( (s1, s2) } +/// Build two connected lockstep `P2PSession`s (`max_prediction = 0`) with the given input delay. +#[allow(dead_code)] +pub fn make_lockstep_sessions( + port1: u16, + port2: u16, + input_delay: usize, +) -> (P2PSession, P2PSession) { + let s1 = SessionBuilder::::new() + .with_max_prediction_window(0) + .with_input_delay(input_delay) + .add_player(PlayerType::Local, 0) + .unwrap() + .add_player(PlayerType::Remote(localhost(port2)), 1) + .unwrap() + .start_p2p_session(UdpNonBlockingSocket::bind_to_port(port1).unwrap()) + .unwrap(); + let s2 = SessionBuilder::::new() + .with_max_prediction_window(0) + .with_input_delay(input_delay) + .add_player(PlayerType::Remote(localhost(port1)), 0) + .unwrap() + .add_player(PlayerType::Local, 1) + .unwrap() + .start_p2p_session(UdpNonBlockingSocket::bind_to_port(port2).unwrap()) + .unwrap(); + (s1, s2) +} + /// Poll both sessions until they reach `Running` state or the sync timeout expires. #[allow(dead_code)] pub fn sync_p2p_sessions(s1: &mut P2PSession, s2: &mut P2PSession) { diff --git a/tests/test_p2p_session.rs b/tests/test_p2p_session.rs index 73a04f1..6452af5 100644 --- a/tests/test_p2p_session.rs +++ b/tests/test_p2p_session.rs @@ -750,3 +750,246 @@ fn test_desync_detection_off_by_default() -> Result<(), GgrsError> { Ok(()) } + +// ── Lockstep mode ───────────────────────────────────────────────────────────── + +// Lockstep stalls when the remote's input has not been received. +// When sess1 calls advance_frame first its internal poll finds nothing (sess2 has not +// sent yet), so sess1 stalls. Only sess1's stall is asserted: sess2 calls advance_frame +// second and its internal poll may already find sess1's just-sent packet (depending on +// OS loopback timing), so sess2's outcome is deliberately not asserted here. +// The full stall→poll→advance round-trip is covered by +// test_lockstep_advances_with_zero_latency_zero_delay. +#[test] +#[serial] +fn test_lockstep_stalls_without_remote_input() -> Result<(), GgrsError> { + let (mut sess1, mut sess2) = stubs::make_lockstep_sessions(7733, 7734, 0); + stubs::sync_p2p_sessions(&mut sess1, &mut sess2); + + // sess1 calls advance_frame before sess2 has sent anything — its internal poll + // finds no remote input, so it must stall. + sess1.add_local_input(0, StubInput { inp: 0 })?; + let requests1 = sess1.advance_frame()?; + + let advanced1 = requests1 + .iter() + .any(|r| matches!(r, ggrs::GgrsRequest::AdvanceFrame { .. })); + + assert!( + !advanced1, + "sess1 should stall when remote input has not arrived" + ); + + // sess2 is not called here; its stall behaviour is covered by + // test_lockstep_advances_with_zero_latency_zero_delay (first iteration). + let _ = sess2; + + Ok(()) +} + +// Lockstep advances every frame with zero latency and zero input delay. +// With input_delay=0, each session must have the other's frame confirmed before it +// can advance. advance_frame sends the local input packet unconditionally before +// evaluating the gate, but poll_remote_clients is what delivers the other side's +// packet and updates confirmed_frame. So the required sequence per frame is: +// 1. Both sessions queue input and call advance_frame — input packets are sent, +// but the gate stalls because the remote input is not yet confirmed. +// 2. Both sessions cross-poll — each now has the other's input confirmed. +// 3. Both sessions call advance_frame again — gate passes, AdvanceFrame emitted. +#[test] +#[serial] +fn test_lockstep_advances_with_zero_latency_zero_delay() -> Result<(), GgrsError> { + let (mut sess1, mut sess2) = stubs::make_lockstep_sessions(7735, 7736, 0); + stubs::sync_p2p_sessions(&mut sess1, &mut sess2); + + let reps = 20; + for i in 0..reps { + // first advance: queues and sends the local input, stalls (remote not confirmed yet) + sess1.add_local_input(0, StubInput { inp: i })?; + sess2.add_local_input(1, StubInput { inp: i })?; + sess1.advance_frame()?; + sess2.advance_frame()?; + + // poll until both sessions have confirmed the other's input, then advance. + // a single poll is not reliable under load (packets may arrive late on loopback), + // so we retry up to the sync timeout. + let deadline = std::time::Instant::now() + stubs::SYNC_TIMEOUT; + let mut advanced1; + let mut advanced2; + loop { + sess1.poll_remote_clients(); + sess2.poll_remote_clients(); + + sess1.add_local_input(0, StubInput { inp: i })?; + sess2.add_local_input(1, StubInput { inp: i })?; + let r1 = sess1.advance_frame()?; + let r2 = sess2.advance_frame()?; + + advanced1 = r1 + .iter() + .any(|r| matches!(r, ggrs::GgrsRequest::AdvanceFrame { .. })); + advanced2 = r2 + .iter() + .any(|r| matches!(r, ggrs::GgrsRequest::AdvanceFrame { .. })); + + if advanced1 && advanced2 { + break; + } + assert!( + std::time::Instant::now() < deadline, + "sess stalled on frame {i} — did not advance within timeout" + ); + } + } + + Ok(()) +} + +// Lockstep with input_delay=1 advances without stalling after the first exchange. +// With input_delay D, each side queues D frames ahead on its first call, so a single +// one-way trip is enough for both sides to confirm and advance every game frame up to D. +#[test] +#[serial] +fn test_lockstep_advances_with_input_delay() -> Result<(), GgrsError> { + let (mut sess1, mut sess2) = stubs::make_lockstep_sessions(7737, 7738, 1); + stubs::sync_p2p_sessions(&mut sess1, &mut sess2); + + let reps = 20; + for i in 0..reps { + // Retry until both sessions emit AdvanceFrame; loopback packet delivery is not + // instantaneous so a single poll may not be enough under load. + let deadline = std::time::Instant::now() + stubs::SYNC_TIMEOUT; + let mut advanced1 = false; + let mut advanced2 = false; + + while !advanced1 || !advanced2 { + sess1.poll_remote_clients(); + sess2.poll_remote_clients(); + + sess1.add_local_input(0, StubInput { inp: i })?; + sess2.add_local_input(1, StubInput { inp: i })?; + let r1 = sess1.advance_frame()?; + let r2 = sess2.advance_frame()?; + + if r1 + .iter() + .any(|r| matches!(r, ggrs::GgrsRequest::AdvanceFrame { .. })) + { + advanced1 = true; + } + if r2 + .iter() + .any(|r| matches!(r, ggrs::GgrsRequest::AdvanceFrame { .. })) + { + advanced2 = true; + } + + assert!( + std::time::Instant::now() < deadline, + "sess stalled on iteration {i} — did not advance within timeout" + ); + } + } + + Ok(()) +} + +// With input_delay >= 1 and a silent remote, two invariants must hold: +// a) No AdvanceFrame is ever emitted — the game must not advance without confirmed inputs. +// b) The input queue must not overflow — it is capped at input_delay frames ahead of the +// game frame, so the 128-slot queue is never exhausted. +#[test] +#[serial] +fn test_lockstep_stalls_with_input_delay_and_no_remote_input() -> Result<(), GgrsError> { + let input_delay = 3; + let (mut sess1, mut sess2) = stubs::make_lockstep_sessions(7739, 7740, input_delay); + stubs::sync_p2p_sessions(&mut sess1, &mut sess2); + + // Drive sess1 well past 128 iterations (the input queue length) without ever + // polling for sess2's packets. The input queue cap must prevent an overflow, + // and the gate must prevent any game advancement. + let mut game_frames_advanced = 0usize; + for _ in 0..200 { + sess1.add_local_input(0, StubInput { inp: 0 })?; + let requests = sess1.advance_frame()?; + if requests + .iter() + .any(|r| matches!(r, ggrs::GgrsRequest::AdvanceFrame { .. })) + { + game_frames_advanced += 1; + } + } + + assert_eq!( + game_frames_advanced, 0, + "sess1 should not advance any game frames without remote input, \ + even though the input queue is running ahead with input_delay={input_delay}" + ); + + // The input queue should have advanced exactly input_delay+1 frames before hitting the + // cap (game_frame=0 is unconfirmed, so the queue stops at depth input_delay+1). + assert_eq!( + sess1.current_frame(), + input_delay as i32 + 1, + "input queue should advance to input_delay+1 frames before capping, \ + got current_frame={} with input_delay={input_delay}", + sess1.current_frame(), + ); + + Ok(()) +} + +// After the input queue cap is hit, the session recovers correctly once the remote +// starts confirming frames again. Both sessions spin for many frames with no +// cross-poll (cap reached, no AdvanceFrame), then cross-poll and both should be able +// to advance the game frame normally. +#[test] +#[serial] +fn test_lockstep_recovers_after_input_queue_cap() -> Result<(), GgrsError> { + let input_delay = 2; + let (mut sess1, mut sess2) = stubs::make_lockstep_sessions(7741, 7742, input_delay); + stubs::sync_p2p_sessions(&mut sess1, &mut sess2); + + // Spin both sessions well past the cap without any cross-poll. + for _ in 0..input_delay + 10 { + sess1.add_local_input(0, StubInput { inp: 0 })?; + sess2.add_local_input(1, StubInput { inp: 0 })?; + sess1.advance_frame()?; + sess2.advance_frame()?; + } + + // Now cross-poll and advance — both should eventually emit AdvanceFrame. + let deadline = std::time::Instant::now() + stubs::SYNC_TIMEOUT; + let mut advanced1 = false; + let mut advanced2 = false; + + while !advanced1 || !advanced2 { + sess1.poll_remote_clients(); + sess2.poll_remote_clients(); + + sess1.add_local_input(0, StubInput { inp: 0 })?; + sess2.add_local_input(1, StubInput { inp: 0 })?; + let r1 = sess1.advance_frame()?; + let r2 = sess2.advance_frame()?; + + if r1 + .iter() + .any(|r| matches!(r, ggrs::GgrsRequest::AdvanceFrame { .. })) + { + advanced1 = true; + } + if r2 + .iter() + .any(|r| matches!(r, ggrs::GgrsRequest::AdvanceFrame { .. })) + { + advanced2 = true; + } + + assert!( + std::time::Instant::now() < deadline, + "session did not recover after input queue cap was hit" + ); + } + + Ok(()) +}