Skip to content

Fixes for new interop tests#28

Open
sqt wants to merge 21 commits into
mainfrom
fixes_for_new_interop_tests
Open

Fixes for new interop tests#28
sqt wants to merge 21 commits into
mainfrom
fixes_for_new_interop_tests

Conversation

@sqt

@sqt sqt commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

No description provided.

@greptile-apps

greptile-apps Bot commented Jun 12, 2026

Copy link
Copy Markdown

Greptile Summary

This PR overhauls coherent-set delivery to fix interop failures with RTI Connext and OpenDDS: it switches from an end-marker scheme (coherent_set_sn = last_sn) to a CS-transition scheme (coherent_set_sn = first_sn), adds heartbeat-driven WIP flushing via flush_target_sn, replaces the stale coherent_writer_seen bool with the properly-cleared coherent_writer_guids set, and switches wait_for_acknowledgments from a busy-poll to a condvar wait. It also fixes DEADLINE/LIFESPAN wire encoding (RTPS fraction → DDS nanoseconds), corrects PID values for group_coherent_set/group_seq_num, adds PID_GROUP_GUID for GROUP-scope publishers, and improves fragmented-sample delivery pacing for reliable readers.

  • CS-transition coherent flush + flush_target_sn: Replaces the previous end-marker detection; a deferred HB callback completes sets that finish without a subsequent CS transition, guarding the race where the HB arrives before the last DATA datagram.
  • waitAllAcked condvar: Eliminates the 1 ms busy-poll loop; ack_cond.broadcast() is now fired in both handleAckNack and removeMatchedReader.
  • SEDP wire-format fixes: DEADLINE and LIFESPAN PIDs now encode/decode DDS Duration_t (direct nanoseconds) rather than the RTPS 1/2^32-fraction format; PID numbers for group inline-QoS are corrected to match the spec.

Confidence Score: 3/5

The coherent-set rework corrects several real interop failures but introduces a new correctness gap in the flush-target mechanism and leaves a liveness hole in the GROUP begin_access gate.

Two issues on the coherent delivery path: (1) A heartbeat from the just-completed set N can arrive at the reader after the first sample of set N+1 triggers a CS transition, populate flush_target_sn with a value smaller than all SNs in set N+1, and cause the still-incomplete set N+1 to be committed as soon as any further sample arrives. (2) The coherent_writer_guids branch in begin_access blocks GROUP delivery from all other readers whenever a coherent writer is still matched but has gone idle between sets; if such a writer never sends again, committed sets from sibling readers are withheld indefinitely.

src/dcps/reader.zig (onHeartbeatCb stale-HB guard) and src/dcps/subscriber.zig (coherent_writer_guids idle-writer liveness)

Important Files Changed

Filename Overview
src/dcps/reader.zig Major rework: replaces end-marker coherent-set detection with CS-transition detection; adds CoherentWipEntry with flush_target_sn; introduces onHeartbeatCb for deferred WIP flush; adds LIFESPAN expiry tracking per sample. New stale-HB bug where endCoherent HB from set N can prematurely flush set N+1's WIP after a CS transition.
src/dcps/subscriber.zig Replaces wip_count-based GROUP gate with coherent_writer_guids + pending length check; fixes writer-departure stall but introduces a new liveness gap when a coherent writer stays matched while idle.
src/rtps/writer_sm.zig Replaces busy-poll wait_for_acknowledgments with condvar-based waitAllAcked; adds ack_cond.broadcast() in removeMatchedReader and handleAckNack; adds GAP alongside HB for KEEP_LAST eviction; primes reliable readers with frag-1 + HEARTBEAT_FRAG for fragmented history. Logic appears sound.
src/dcps/participant.zig Adds LIFESPAN QoS propagation from writer announcement to MatchedWriterInfo; derives publisher group GUID for GROUP-scope coherent publishers (PID_GROUP_GUID); fixes DISPOSED_UNREGISTERED (0x3) status-info priority so unregistered is checked before disposed.
src/discovery/sedp.zig Switches DEADLINE and LIFESPAN wire encoding from RTPS binary-fraction format to DDS Duration_t (direct nanoseconds); adds PID_GROUP_GUID encoding; adds guidFromDisposalPayload fallback when PID_KEY_HASH is absent. Tests updated accordingly.
src/rtps/message/submessage.zig Corrects PID values for group_coherent_set (0x0063→0x0039) and group_seq_num (0x0064→0x0038) to match DDS-RTPS spec and RTI Connext interop.
src/rtps/reader_sm.zig Adds on_heartbeat callback to DataCallback and fires it after processing a valid non-duplicate HEARTBEAT from a matched writer, enabling the DDS layer to flush coherent WIPs.
src/dcps/publisher.zig Passes publisher_handle to announce_writer so the participant can derive the group GUID; changes coherent flush mode to coherent_only for INSTANCE/TOPIC scope (was incorrectly .full).

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant W as StatefulWriter
    participant R as StatefulReader
    participant DR as DataReaderImpl
    participant SUB as SubscriberImpl

    Note over W,SUB: Coherent set flush — CS-transition + HB-driven path

    W->>R: "DATA SN=1 (coherent_set_sn=1)"
    R->>DR: "onDataCb(SN=1, cs=1) — WIP created"
    W->>R: "DATA SN=2 (coherent_set_sn=1)"
    R->>DR: "onDataCb(SN=2, cs=1) — WIP highest=2"
    W->>R: "HEARTBEAT last_sn=2"
    R->>DR: "onHeartbeatCb — highest>=last_sn → flush → coherent_committed"
    DR-->>SUB: DATA_AVAILABLE_STATUS set

    Note over W,SUB: HB-before-DATA race (flush_target_sn path)
    W->>R: "DATA SN=3 (coherent_set_sn=3)"
    R->>DR: "onDataCb(SN=3, cs=3) — new WIP"
    W->>R: "HEARTBEAT last_sn=4 arrives before SN=4"
    R->>DR: "onHeartbeatCb — highest(3)<4 → flush_target_sn=4"
    W->>R: "DATA SN=4 (coherent_set_sn=3)"
    R->>DR: "onDataCb(SN=4) — highest=4>=flush_target_sn=4 → flush"

    Note over W,SUB: begin_access GROUP commit gate
    SUB->>DR: check all readers coherent_committed_ready
    DR-->>SUB: "all_ready=true → commitCoherentPendingLocked"
    SUB-->>SUB: DATA_AVAILABLE fired per reader
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant W as StatefulWriter
    participant R as StatefulReader
    participant DR as DataReaderImpl
    participant SUB as SubscriberImpl

    Note over W,SUB: Coherent set flush — CS-transition + HB-driven path

    W->>R: "DATA SN=1 (coherent_set_sn=1)"
    R->>DR: "onDataCb(SN=1, cs=1) — WIP created"
    W->>R: "DATA SN=2 (coherent_set_sn=1)"
    R->>DR: "onDataCb(SN=2, cs=1) — WIP highest=2"
    W->>R: "HEARTBEAT last_sn=2"
    R->>DR: "onHeartbeatCb — highest>=last_sn → flush → coherent_committed"
    DR-->>SUB: DATA_AVAILABLE_STATUS set

    Note over W,SUB: HB-before-DATA race (flush_target_sn path)
    W->>R: "DATA SN=3 (coherent_set_sn=3)"
    R->>DR: "onDataCb(SN=3, cs=3) — new WIP"
    W->>R: "HEARTBEAT last_sn=4 arrives before SN=4"
    R->>DR: "onHeartbeatCb — highest(3)<4 → flush_target_sn=4"
    W->>R: "DATA SN=4 (coherent_set_sn=3)"
    R->>DR: "onDataCb(SN=4) — highest=4>=flush_target_sn=4 → flush"

    Note over W,SUB: begin_access GROUP commit gate
    SUB->>DR: check all readers coherent_committed_ready
    DR-->>SUB: "all_ready=true → commitCoherentPendingLocked"
    SUB-->>SUB: DATA_AVAILABLE fired per reader
Loading

Comments Outside Diff (1)

  1. src/dcps/reader.zig, line 429-441 (link)

    P1 Stale heartbeat from prior coherent set can prematurely flush current-set WIP

    When a CS transition occurs in onDataCb, the new WIP entry is created with flush_target_sn = null. If the endCoherent heartbeat from set N (with last_sn = N_end) arrives after the first sample of set N+1 (which triggered the CS transition), onHeartbeatCb finds the new WIP entry and records flush_target_sn = N_end. Because all SNs in set N+1 are strictly greater than N_end, the condition highest_sn >= flush_target_sn becomes true after only one or two samples of the new set arrive — causing the still-incomplete set N+1 to be committed.

    Concrete sequence: set 1 = {SN1, SN2, SN3} (CS=1), set 2 = {SN4, SN5, SN6} (CS=4). SN4 arrives first → CS transition, new WIP (cs=4, flush_target_sn=null). Delayed endCoherent HB with last_sn=3 arrives → flush_target_sn = 3. SN5 arrives → highest_sn = 5 ≥ 3 → set 2 is committed with only {SN4, SN5}, missing SN6.

    The guard needed is to discard any incoming HB whose last_sn is less than entry.cs (the first SN of the current set), since such a HB belongs to a previous set and is no longer relevant.

Reviews (18): Last reviewed commit: "resolving review comments" | Re-trigger Greptile

Comment thread src/dcps/subscriber.zig Outdated
@codecov

codecov Bot commented Jun 12, 2026

Copy link
Copy Markdown

Comment thread src/rtps/writer_sm.zig
Comment thread src/dcps/reader.zig Outdated
Comment thread src/dcps/reader.zig Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant