From f99bd60b958b00a0922931bf7e42671c8fca0ee6 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Thu, 25 Jun 2026 16:47:06 +0200 Subject: [PATCH 1/2] RUBY-3822 Stop publishing SDAM events from the Monitor's RTT checks When the streaming protocol is active the PushMonitor streams hello responses as the authoritative SDAM source, and the polling Monitor only measures RTT on its connection. Per the Server Monitoring spec, an RTT command must not publish events or update the topology, and its errors must not mark the server Unknown. Until now the polling Monitor still ran the cluster SDAM flow and published heartbeat events for these RTT checks. Gate the polling Monitor: when a connection is established and the PushMonitor is running, scan! skips heartbeat publishing and the cluster SDAM flow (on both success and error) while still pacing the next check. The initial handshake and reconnect paths are unaffected, since the gate is false when there is no connection yet, so topology discovery stays prompt. Polling mode is unaffected: with no running PushMonitor the gate is always false. Marking a server Unknown on a streaming-check failure now belongs to the PushMonitor, which previously only stopped and relied on the polling Monitor to notice. The PushMonitor connection also performs a metadata handshake before streaming, mirroring the polling Monitor and the spec; without it the server never learns the connection's appName, so appName-scoped behaviour (including the failCommand fail points used by the SDAM spec tests) never applies to the streaming hello. Refresh the stale interruptInUse-pool-clear fixture from the spec (times: 1 -> times: 4). The RTT hello now consumes fail point budget alongside the streaming check, which is exactly why upstream uses times: 4. --- lib/mongo/server/monitor.rb | 59 +++++++++--- lib/mongo/server/push_monitor.rb | 14 +++ spec/mongo/server/monitor_spec.rb | 94 +++++++++++++++++++ spec/mongo/server/push_monitor_spec.rb | 34 +++++++ .../interruptInUse-pool-clear.yml | 6 +- 5 files changed, 190 insertions(+), 17 deletions(-) diff --git a/lib/mongo/server/monitor.rb b/lib/mongo/server/monitor.rb index 572484ed15..1a0304bbb1 100644 --- a/lib/mongo/server/monitor.rb +++ b/lib/mongo/server/monitor.rb @@ -210,28 +210,41 @@ def scan! @mutex.synchronize do throttle_scan_frequency! + # When the streaming protocol is active the PushMonitor is the + # authoritative SDAM source and this scan only measures RTT on the + # dedicated connection. Per the Server Monitoring spec, an RTT + # command MUST NOT publish events or update the topology. Compute + # this before do_scan, which may (re)connect and change the state. + rtt_only = rtt_measurement_only? + begin - result = do_scan + result = do_scan(publish_heartbeat: !rtt_only) rescue StandardError => e - run_sdam_flow({}, scan_error: e) + run_sdam_flow({}, scan_error: e, rtt_only: rtt_only) else - run_sdam_flow(result) + run_sdam_flow(result, rtt_only: rtt_only) end end end - def run_sdam_flow(result, awaited: false, scan_error: nil) + def run_sdam_flow(result, awaited: false, scan_error: nil, rtt_only: false) @sdam_mutex.synchronize do old_description = server.description - new_description = Description.new( - server.address, - result, - average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time, - minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time - ) - - server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error) + # An RTT-only measurement (streaming protocol active) must not update + # the topology or publish SDAM events. The RTT it gathered is + # incorporated into the next streaming-hello description via the + # shared RTT calculator. The scheduling below still runs so the + # monitor keeps pacing its checks. + unless rtt_only + new_description = Description.new( + server.address, + result, + average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time, + minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time + ) + server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error) + end server.description.tap do |new_description| unless awaited @@ -272,8 +285,12 @@ def pre_stop server.scan_semaphore.signal end - def do_scan - monitoring.publish_heartbeat(server) do + def do_scan(publish_heartbeat: true) + if publish_heartbeat + monitoring.publish_heartbeat(server) do + check + end + else check end rescue StandardError => e @@ -285,6 +302,20 @@ def do_scan raise e end + # Returns whether this scan is only an RTT measurement, which is the case + # when the streaming protocol is active: a dedicated connection is already + # established and the PushMonitor is running as the authoritative SDAM + # source. In the polling protocol there is no running PushMonitor, so the + # connection-reuse check is a real server check and not RTT-only. + # + # @return [ true | false ] + def rtt_measurement_only? + return false if @connection.nil? + + pm = push_monitor + !pm.nil? && pm.running? + end + def check if @connection && @connection.pid != Process.pid log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}") diff --git a/lib/mongo/server/push_monitor.rb b/lib/mongo/server/push_monitor.rb index 280565416f..a2db56504a 100644 --- a/lib/mongo/server/push_monitor.rb +++ b/lib/mongo/server/push_monitor.rb @@ -118,6 +118,13 @@ def do_work log_prefix: options[:log_prefix], bg_error_backtrace: options[:bg_error_backtrace]) + # The streaming check is the authoritative SDAM source, so its failure + # must mark the server Unknown and clear the pool here. While streaming + # is active the polling Monitor only measures RTT and ignores errors + # (per the Server Monitoring spec), so it can no longer be relied on to + # notice the failure. + monitor.run_sdam_flow({}, scan_error: e, awaited: true) + # If a request failed on a connection, stop push monitoring. # In case the server is dead we don't want to have two connections # trying to connect unsuccessfully at the same time. @@ -142,6 +149,13 @@ def check @server_pushing = false connection = PushMonitor::Connection.new(server.address, options) connection.connect! + # Identify the streaming connection to the server with a metadata + # handshake (carrying the appName) before streaming, mirroring the + # polling Monitor and the Server Monitoring spec, which establishes + # the monitoring connection with a handshake before issuing the + # awaitable hello. Without this, appName-scoped server behaviour + # never applies to the streaming hello. + connection.handshake! @connection = connection end end diff --git a/spec/mongo/server/monitor_spec.rb b/spec/mongo/server/monitor_spec.rb index 2978ae3bff..6c69bfeea2 100644 --- a/spec/mongo/server/monitor_spec.rb +++ b/spec/mongo/server/monitor_spec.rb @@ -122,6 +122,100 @@ end end end + + context 'when streaming is active and the scan is an RTT-only measurement' do + # Steady-state streaming: the Monitor already has an established + # connection (used only for RTT measurement) and a running PushMonitor + # that is the authoritative SDAM source. Per the Server Monitoring spec + # ("Measuring RTT" / "SDAM Monitoring"), clients MUST NOT publish any + # events and MUST NOT update the topology when running an RTT command. + + let(:hello_reply) do + { + 'isWritablePrimary' => true, + 'ok' => 1.0, + 'minWireVersion' => 0, + 'maxWireVersion' => 21, + } + end + + let(:rtt_connection) do + double('monitor connection').tap do |conn| + allow(conn).to receive(:pid).and_return(Process.pid) + allow(conn).to receive(:check_document).and_return({ 'hello' => 1 }) + allow(conn).to receive(:dispatch_bytes).and_return( + double('message', documents: [ hello_reply ]) + ) + allow(conn).to receive(:disconnect!) + end + end + + let(:running_push_monitor) do + double('push monitor').tap do |push_monitor| + allow(push_monitor).to receive(:running?).and_return(true) + allow(push_monitor).to receive(:stop!) + end + end + + before do + monitor.instance_variable_set(:@connection, rtt_connection) + monitor.instance_variable_set(:@push_monitor, running_push_monitor) + end + + it 'does not run the SDAM flow' do + expect(cluster).not_to receive(:run_sdam_flow) + monitor.scan! + end + + it 'does not publish heartbeat events' do + expect(monitor.monitoring).not_to receive(:publish_heartbeat) + monitor.scan! + end + end + + context 'when in the polling protocol with an established connection' do + # Polling protocol (serverMonitoringMode=poll, FaaS, or pre-4.4): there + # is no PushMonitor, so reusing the connection to run hello is a real + # server check, not an RTT-only measurement. It must keep publishing + # events and running the SDAM flow exactly as before. This guards + # against the RTT-only suppression leaking into polling mode. + + let(:hello_reply) do + { + 'isWritablePrimary' => true, + 'ok' => 1.0, + 'minWireVersion' => 0, + 'maxWireVersion' => 21, + } + end + + let(:polling_connection) do + double('monitor connection').tap do |conn| + allow(conn).to receive(:pid).and_return(Process.pid) + allow(conn).to receive(:check_document).and_return({ 'hello' => 1 }) + allow(conn).to receive(:dispatch_bytes).and_return( + double('message', documents: [ hello_reply ]) + ) + allow(conn).to receive(:disconnect!) + end + end + + before do + monitor.instance_variable_set(:@connection, polling_connection) + # No PushMonitor exists in polling mode. + expect(monitor.push_monitor).to be_nil + end + + it 'runs the SDAM flow' do + expect(cluster).to receive(:run_sdam_flow) + monitor.scan! + end + + it 'publishes heartbeat events' do + expect(monitor.monitoring).to receive(:publish_heartbeat).and_call_original + monitor.scan! + end + end end # heartbeat interval is now taken out of cluster, monitor has no useful options diff --git a/spec/mongo/server/push_monitor_spec.rb b/spec/mongo/server/push_monitor_spec.rb index cab0d41065..56a15fd231 100644 --- a/spec/mongo/server/push_monitor_spec.rb +++ b/spec/mongo/server/push_monitor_spec.rb @@ -67,6 +67,19 @@ end.should_not raise_error end + # The streaming connection must identify itself to the server with a + # metadata handshake (carrying the appName) before streaming, like the + # polling Monitor's connection does. Otherwise appName-scoped server + # behaviour (e.g. failCommand fail points used by the SDAM spec tests) + # never applies to the streaming hello, and the streaming check cannot + # observe monitor errors. + it 'performs a metadata handshake before streaming' do + expect_any_instance_of(Mongo::Server::PushMonitor::Connection) + .to receive(:handshake!).and_call_original + + push_monitor.do_work + end + context 'network error during check' do it 'does not propagate the exception' do push_monitor @@ -89,6 +102,27 @@ push_monitor.running?.should be false end + + # Per the Server Monitoring spec ("Network or command error during server + # check"), the streaming monitor must mark the server Unknown and clear + # the pool when its check fails. In the Ruby driver the PushMonitor owns + # the streaming check, so it must run the SDAM flow with the scan error + # rather than relying on the polling Monitor to notice the failure. + it 'marks the server unknown via the SDAM flow' do + push_monitor + + expect(Socket).to receive(:getaddrinfo).and_raise(SocketError.new('Test exception')) + + expect(cluster).to receive(:run_sdam_flow) do |_previous_desc, updated_desc, options| + expect(updated_desc).to be_unknown + expect(options[:awaited]).to be true + expect(options[:scan_error]).to be_a(Mongo::Error::SocketError) + end + + lambda do + push_monitor.do_work + end.should_not raise_error + end end end end diff --git a/spec/spec_tests/data/sdam_unified/interruptInUse-pool-clear.yml b/spec/spec_tests/data/sdam_unified/interruptInUse-pool-clear.yml index 9293a986d5..67cd7d3ae3 100644 --- a/spec/spec_tests/data/sdam_unified/interruptInUse-pool-clear.yml +++ b/spec/spec_tests/data/sdam_unified/interruptInUse-pool-clear.yml @@ -81,7 +81,7 @@ tests: failPoint: configureFailPoint: failCommand mode: - times: 1 + times: 4 data: failCommands: - hello @@ -182,7 +182,7 @@ tests: failPoint: configureFailPoint: failCommand mode: - times: 1 + times: 4 data: failCommands: - hello @@ -291,7 +291,7 @@ tests: failPoint: configureFailPoint: failCommand mode: - times: 1 + times: 4 data: failCommands: - hello From bf4014d4a9afc4244ebdd49366640647a35cc827 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Thu, 25 Jun 2026 18:24:32 +0200 Subject: [PATCH 2/2] RUBY-3822 Recover Unknown servers via the polling Monitor in streaming mode The RTT-only gate suppressed the polling Monitor's check whenever the PushMonitor was running, including while the server was Unknown. A server marked Unknown by an operation error or a streaming failure then had to wait for the next streaming response to recover, which could be up to heartbeatFrequencyMS away - long enough for server selection to fail with NoServerAvailable and cascade across later operations. Restrict the suppression to known servers: when the server is Unknown the polling Monitor runs a full check again, restoring the prompt recovery it performed before this change. Steady-state RTT suppression (the ticket's goal) still applies while the server is known and the PushMonitor is the authoritative streaming source. --- lib/mongo/server/monitor.rb | 8 +++++ spec/mongo/server/monitor_spec.rb | 51 +++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/lib/mongo/server/monitor.rb b/lib/mongo/server/monitor.rb index 1a0304bbb1..75f3d51e79 100644 --- a/lib/mongo/server/monitor.rb +++ b/lib/mongo/server/monitor.rb @@ -312,6 +312,14 @@ def do_scan(publish_heartbeat: true) def rtt_measurement_only? return false if @connection.nil? + # Only suppress the check while the server is in a known state and the + # PushMonitor is the authoritative streaming source. If the server is + # Unknown (e.g. an operation error or a streaming failure just marked + # it so), the polling Monitor must run a full check to recover it + # rather than waiting for the next streaming response - otherwise the + # server can stay Unknown long enough to fail server selection. + return false if server.unknown? + pm = push_monitor !pm.nil? && pm.running? end diff --git a/spec/mongo/server/monitor_spec.rb b/spec/mongo/server/monitor_spec.rb index 6c69bfeea2..562085bcae 100644 --- a/spec/mongo/server/monitor_spec.rb +++ b/spec/mongo/server/monitor_spec.rb @@ -158,6 +158,10 @@ end before do + # The server must be in a known state for RTT-only suppression to + # apply; an Unknown server is always given a full recovery check. + server.update_description(Mongo::Server::Description.new(address, hello_reply)) + expect(server.description).not_to be_unknown monitor.instance_variable_set(:@connection, rtt_connection) monitor.instance_variable_set(:@push_monitor, running_push_monitor) end @@ -173,6 +177,53 @@ end end + context 'when streaming is active but the server is Unknown' do + # Recovery path: even with an established connection and a running + # PushMonitor, an Unknown server must get a full check so it is + # recovered promptly, rather than waiting for the next streaming + # response (which could be up to heartbeatFrequencyMS away and would + # let server selection fail in the meantime). + + let(:hello_reply) do + { + 'isWritablePrimary' => true, + 'ok' => 1.0, + 'minWireVersion' => 0, + 'maxWireVersion' => 21, + } + end + + let(:rtt_connection) do + double('monitor connection').tap do |conn| + allow(conn).to receive(:pid).and_return(Process.pid) + allow(conn).to receive(:check_document).and_return({ 'hello' => 1 }) + allow(conn).to receive(:dispatch_bytes).and_return( + double('message', documents: [ hello_reply ]) + ) + allow(conn).to receive(:disconnect!) + end + end + + let(:running_push_monitor) do + double('push monitor').tap do |push_monitor| + allow(push_monitor).to receive(:running?).and_return(true) + allow(push_monitor).to receive(:stop!) + end + end + + before do + # Server starts Unknown (monitoring_io is disabled). + expect(server.description).to be_unknown + monitor.instance_variable_set(:@connection, rtt_connection) + monitor.instance_variable_set(:@push_monitor, running_push_monitor) + end + + it 'runs the SDAM flow to recover the server' do + expect(cluster).to receive(:run_sdam_flow) + monitor.scan! + end + end + context 'when in the polling protocol with an established connection' do # Polling protocol (serverMonitoringMode=poll, FaaS, or pre-4.4): there # is no PushMonitor, so reusing the connection to run hello is a real