diff --git a/lib/mongo/server/monitor.rb b/lib/mongo/server/monitor.rb index 572484ed15..75f3d51e79 100644 --- a/lib/mongo/server/monitor.rb +++ b/lib/mongo/server/monitor.rb @@ -210,28 +210,41 @@ def scan! @mutex.synchronize do throttle_scan_frequency! + # When the streaming protocol is active the PushMonitor is the + # authoritative SDAM source and this scan only measures RTT on the + # dedicated connection. Per the Server Monitoring spec, an RTT + # command MUST NOT publish events or update the topology. Compute + # this before do_scan, which may (re)connect and change the state. + rtt_only = rtt_measurement_only? + begin - result = do_scan + result = do_scan(publish_heartbeat: !rtt_only) rescue StandardError => e - run_sdam_flow({}, scan_error: e) + run_sdam_flow({}, scan_error: e, rtt_only: rtt_only) else - run_sdam_flow(result) + run_sdam_flow(result, rtt_only: rtt_only) end end end - def run_sdam_flow(result, awaited: false, scan_error: nil) + def run_sdam_flow(result, awaited: false, scan_error: nil, rtt_only: false) @sdam_mutex.synchronize do old_description = server.description - new_description = Description.new( - server.address, - result, - average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time, - minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time - ) - - server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error) + # An RTT-only measurement (streaming protocol active) must not update + # the topology or publish SDAM events. The RTT it gathered is + # incorporated into the next streaming-hello description via the + # shared RTT calculator. The scheduling below still runs so the + # monitor keeps pacing its checks. + unless rtt_only + new_description = Description.new( + server.address, + result, + average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time, + minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time + ) + server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error) + end server.description.tap do |new_description| unless awaited @@ -272,8 +285,12 @@ def pre_stop server.scan_semaphore.signal end - def do_scan - monitoring.publish_heartbeat(server) do + def do_scan(publish_heartbeat: true) + if publish_heartbeat + monitoring.publish_heartbeat(server) do + check + end + else check end rescue StandardError => e @@ -285,6 +302,28 @@ def do_scan raise e end + # Returns whether this scan is only an RTT measurement, which is the case + # when the streaming protocol is active: a dedicated connection is already + # established and the PushMonitor is running as the authoritative SDAM + # source. In the polling protocol there is no running PushMonitor, so the + # connection-reuse check is a real server check and not RTT-only. + # + # @return [ true | false ] + def rtt_measurement_only? + return false if @connection.nil? + + # Only suppress the check while the server is in a known state and the + # PushMonitor is the authoritative streaming source. If the server is + # Unknown (e.g. an operation error or a streaming failure just marked + # it so), the polling Monitor must run a full check to recover it + # rather than waiting for the next streaming response - otherwise the + # server can stay Unknown long enough to fail server selection. + return false if server.unknown? + + pm = push_monitor + !pm.nil? && pm.running? + end + def check if @connection && @connection.pid != Process.pid log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}") diff --git a/lib/mongo/server/push_monitor.rb b/lib/mongo/server/push_monitor.rb index 280565416f..a2db56504a 100644 --- a/lib/mongo/server/push_monitor.rb +++ b/lib/mongo/server/push_monitor.rb @@ -118,6 +118,13 @@ def do_work log_prefix: options[:log_prefix], bg_error_backtrace: options[:bg_error_backtrace]) + # The streaming check is the authoritative SDAM source, so its failure + # must mark the server Unknown and clear the pool here. While streaming + # is active the polling Monitor only measures RTT and ignores errors + # (per the Server Monitoring spec), so it can no longer be relied on to + # notice the failure. + monitor.run_sdam_flow({}, scan_error: e, awaited: true) + # If a request failed on a connection, stop push monitoring. # In case the server is dead we don't want to have two connections # trying to connect unsuccessfully at the same time. @@ -142,6 +149,13 @@ def check @server_pushing = false connection = PushMonitor::Connection.new(server.address, options) connection.connect! + # Identify the streaming connection to the server with a metadata + # handshake (carrying the appName) before streaming, mirroring the + # polling Monitor and the Server Monitoring spec, which establishes + # the monitoring connection with a handshake before issuing the + # awaitable hello. Without this, appName-scoped server behaviour + # never applies to the streaming hello. + connection.handshake! @connection = connection end end diff --git a/spec/mongo/server/monitor_spec.rb b/spec/mongo/server/monitor_spec.rb index 2978ae3bff..562085bcae 100644 --- a/spec/mongo/server/monitor_spec.rb +++ b/spec/mongo/server/monitor_spec.rb @@ -122,6 +122,151 @@ end end end + + context 'when streaming is active and the scan is an RTT-only measurement' do + # Steady-state streaming: the Monitor already has an established + # connection (used only for RTT measurement) and a running PushMonitor + # that is the authoritative SDAM source. Per the Server Monitoring spec + # ("Measuring RTT" / "SDAM Monitoring"), clients MUST NOT publish any + # events and MUST NOT update the topology when running an RTT command. + + let(:hello_reply) do + { + 'isWritablePrimary' => true, + 'ok' => 1.0, + 'minWireVersion' => 0, + 'maxWireVersion' => 21, + } + end + + let(:rtt_connection) do + double('monitor connection').tap do |conn| + allow(conn).to receive(:pid).and_return(Process.pid) + allow(conn).to receive(:check_document).and_return({ 'hello' => 1 }) + allow(conn).to receive(:dispatch_bytes).and_return( + double('message', documents: [ hello_reply ]) + ) + allow(conn).to receive(:disconnect!) + end + end + + let(:running_push_monitor) do + double('push monitor').tap do |push_monitor| + allow(push_monitor).to receive(:running?).and_return(true) + allow(push_monitor).to receive(:stop!) + end + end + + before do + # The server must be in a known state for RTT-only suppression to + # apply; an Unknown server is always given a full recovery check. + server.update_description(Mongo::Server::Description.new(address, hello_reply)) + expect(server.description).not_to be_unknown + monitor.instance_variable_set(:@connection, rtt_connection) + monitor.instance_variable_set(:@push_monitor, running_push_monitor) + end + + it 'does not run the SDAM flow' do + expect(cluster).not_to receive(:run_sdam_flow) + monitor.scan! + end + + it 'does not publish heartbeat events' do + expect(monitor.monitoring).not_to receive(:publish_heartbeat) + monitor.scan! + end + end + + context 'when streaming is active but the server is Unknown' do + # Recovery path: even with an established connection and a running + # PushMonitor, an Unknown server must get a full check so it is + # recovered promptly, rather than waiting for the next streaming + # response (which could be up to heartbeatFrequencyMS away and would + # let server selection fail in the meantime). + + let(:hello_reply) do + { + 'isWritablePrimary' => true, + 'ok' => 1.0, + 'minWireVersion' => 0, + 'maxWireVersion' => 21, + } + end + + let(:rtt_connection) do + double('monitor connection').tap do |conn| + allow(conn).to receive(:pid).and_return(Process.pid) + allow(conn).to receive(:check_document).and_return({ 'hello' => 1 }) + allow(conn).to receive(:dispatch_bytes).and_return( + double('message', documents: [ hello_reply ]) + ) + allow(conn).to receive(:disconnect!) + end + end + + let(:running_push_monitor) do + double('push monitor').tap do |push_monitor| + allow(push_monitor).to receive(:running?).and_return(true) + allow(push_monitor).to receive(:stop!) + end + end + + before do + # Server starts Unknown (monitoring_io is disabled). + expect(server.description).to be_unknown + monitor.instance_variable_set(:@connection, rtt_connection) + monitor.instance_variable_set(:@push_monitor, running_push_monitor) + end + + it 'runs the SDAM flow to recover the server' do + expect(cluster).to receive(:run_sdam_flow) + monitor.scan! + end + end + + context 'when in the polling protocol with an established connection' do + # Polling protocol (serverMonitoringMode=poll, FaaS, or pre-4.4): there + # is no PushMonitor, so reusing the connection to run hello is a real + # server check, not an RTT-only measurement. It must keep publishing + # events and running the SDAM flow exactly as before. This guards + # against the RTT-only suppression leaking into polling mode. + + let(:hello_reply) do + { + 'isWritablePrimary' => true, + 'ok' => 1.0, + 'minWireVersion' => 0, + 'maxWireVersion' => 21, + } + end + + let(:polling_connection) do + double('monitor connection').tap do |conn| + allow(conn).to receive(:pid).and_return(Process.pid) + allow(conn).to receive(:check_document).and_return({ 'hello' => 1 }) + allow(conn).to receive(:dispatch_bytes).and_return( + double('message', documents: [ hello_reply ]) + ) + allow(conn).to receive(:disconnect!) + end + end + + before do + monitor.instance_variable_set(:@connection, polling_connection) + # No PushMonitor exists in polling mode. + expect(monitor.push_monitor).to be_nil + end + + it 'runs the SDAM flow' do + expect(cluster).to receive(:run_sdam_flow) + monitor.scan! + end + + it 'publishes heartbeat events' do + expect(monitor.monitoring).to receive(:publish_heartbeat).and_call_original + monitor.scan! + end + end end # heartbeat interval is now taken out of cluster, monitor has no useful options diff --git a/spec/mongo/server/push_monitor_spec.rb b/spec/mongo/server/push_monitor_spec.rb index cab0d41065..56a15fd231 100644 --- a/spec/mongo/server/push_monitor_spec.rb +++ b/spec/mongo/server/push_monitor_spec.rb @@ -67,6 +67,19 @@ end.should_not raise_error end + # The streaming connection must identify itself to the server with a + # metadata handshake (carrying the appName) before streaming, like the + # polling Monitor's connection does. Otherwise appName-scoped server + # behaviour (e.g. failCommand fail points used by the SDAM spec tests) + # never applies to the streaming hello, and the streaming check cannot + # observe monitor errors. + it 'performs a metadata handshake before streaming' do + expect_any_instance_of(Mongo::Server::PushMonitor::Connection) + .to receive(:handshake!).and_call_original + + push_monitor.do_work + end + context 'network error during check' do it 'does not propagate the exception' do push_monitor @@ -89,6 +102,27 @@ push_monitor.running?.should be false end + + # Per the Server Monitoring spec ("Network or command error during server + # check"), the streaming monitor must mark the server Unknown and clear + # the pool when its check fails. In the Ruby driver the PushMonitor owns + # the streaming check, so it must run the SDAM flow with the scan error + # rather than relying on the polling Monitor to notice the failure. + it 'marks the server unknown via the SDAM flow' do + push_monitor + + expect(Socket).to receive(:getaddrinfo).and_raise(SocketError.new('Test exception')) + + expect(cluster).to receive(:run_sdam_flow) do |_previous_desc, updated_desc, options| + expect(updated_desc).to be_unknown + expect(options[:awaited]).to be true + expect(options[:scan_error]).to be_a(Mongo::Error::SocketError) + end + + lambda do + push_monitor.do_work + end.should_not raise_error + end end end end diff --git a/spec/spec_tests/data/sdam_unified/interruptInUse-pool-clear.yml b/spec/spec_tests/data/sdam_unified/interruptInUse-pool-clear.yml index 9293a986d5..67cd7d3ae3 100644 --- a/spec/spec_tests/data/sdam_unified/interruptInUse-pool-clear.yml +++ b/spec/spec_tests/data/sdam_unified/interruptInUse-pool-clear.yml @@ -81,7 +81,7 @@ tests: failPoint: configureFailPoint: failCommand mode: - times: 1 + times: 4 data: failCommands: - hello @@ -182,7 +182,7 @@ tests: failPoint: configureFailPoint: failCommand mode: - times: 1 + times: 4 data: failCommands: - hello @@ -291,7 +291,7 @@ tests: failPoint: configureFailPoint: failCommand mode: - times: 1 + times: 4 data: failCommands: - hello