Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 53 additions & 14 deletions lib/mongo/server/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,28 +210,41 @@ def scan!
@mutex.synchronize do
throttle_scan_frequency!

# When the streaming protocol is active the PushMonitor is the
# authoritative SDAM source and this scan only measures RTT on the
# dedicated connection. Per the Server Monitoring spec, an RTT
# command MUST NOT publish events or update the topology. Compute
# this before do_scan, which may (re)connect and change the state.
rtt_only = rtt_measurement_only?

begin
result = do_scan
result = do_scan(publish_heartbeat: !rtt_only)
rescue StandardError => e
run_sdam_flow({}, scan_error: e)
run_sdam_flow({}, scan_error: e, rtt_only: rtt_only)
else
run_sdam_flow(result)
run_sdam_flow(result, rtt_only: rtt_only)
end
end
end

def run_sdam_flow(result, awaited: false, scan_error: nil)
def run_sdam_flow(result, awaited: false, scan_error: nil, rtt_only: false)
@sdam_mutex.synchronize do
old_description = server.description

new_description = Description.new(
server.address,
result,
average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time,
minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time
)

server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error)
# An RTT-only measurement (streaming protocol active) must not update
# the topology or publish SDAM events. The RTT it gathered is
# incorporated into the next streaming-hello description via the
# shared RTT calculator. The scheduling below still runs so the
# monitor keeps pacing its checks.
unless rtt_only
new_description = Description.new(
server.address,
result,
average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time,
minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time
)
server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error)
end

server.description.tap do |new_description|
unless awaited
Expand Down Expand Up @@ -272,8 +285,12 @@ def pre_stop
server.scan_semaphore.signal
end

def do_scan
monitoring.publish_heartbeat(server) do
def do_scan(publish_heartbeat: true)
if publish_heartbeat
monitoring.publish_heartbeat(server) do
check
end
else
check
end
rescue StandardError => e
Expand All @@ -285,6 +302,28 @@ def do_scan
raise e
end

# Returns whether this scan is only an RTT measurement, which is the case
# when the streaming protocol is active: a dedicated connection is already
# established and the PushMonitor is running as the authoritative SDAM
# source. In the polling protocol there is no running PushMonitor, so the
# connection-reuse check is a real server check and not RTT-only.
#
# @return [ true | false ]
def rtt_measurement_only?
return false if @connection.nil?

# Only suppress the check while the server is in a known state and the
# PushMonitor is the authoritative streaming source. If the server is
# Unknown (e.g. an operation error or a streaming failure just marked
# it so), the polling Monitor must run a full check to recover it
# rather than waiting for the next streaming response - otherwise the
# server can stay Unknown long enough to fail server selection.
return false if server.unknown?

pm = push_monitor
!pm.nil? && pm.running?
end

def check
if @connection && @connection.pid != Process.pid
log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}")
Expand Down
14 changes: 14 additions & 0 deletions lib/mongo/server/push_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ def do_work
log_prefix: options[:log_prefix],
bg_error_backtrace: options[:bg_error_backtrace])

# The streaming check is the authoritative SDAM source, so its failure
# must mark the server Unknown and clear the pool here. While streaming
# is active the polling Monitor only measures RTT and ignores errors
# (per the Server Monitoring spec), so it can no longer be relied on to
# notice the failure.
monitor.run_sdam_flow({}, scan_error: e, awaited: true)

# If a request failed on a connection, stop push monitoring.
# In case the server is dead we don't want to have two connections
# trying to connect unsuccessfully at the same time.
Expand All @@ -142,6 +149,13 @@ def check
@server_pushing = false
connection = PushMonitor::Connection.new(server.address, options)
connection.connect!
# Identify the streaming connection to the server with a metadata
# handshake (carrying the appName) before streaming, mirroring the
# polling Monitor and the Server Monitoring spec, which establishes
# the monitoring connection with a handshake before issuing the
# awaitable hello. Without this, appName-scoped server behaviour
# never applies to the streaming hello.
connection.handshake!
@connection = connection
end
end
Expand Down
145 changes: 145 additions & 0 deletions spec/mongo/server/monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,151 @@
end
end
end

context 'when streaming is active and the scan is an RTT-only measurement' do
# Steady-state streaming: the Monitor already has an established
# connection (used only for RTT measurement) and a running PushMonitor
# that is the authoritative SDAM source. Per the Server Monitoring spec
# ("Measuring RTT" / "SDAM Monitoring"), clients MUST NOT publish any
# events and MUST NOT update the topology when running an RTT command.

let(:hello_reply) do
{
'isWritablePrimary' => true,
'ok' => 1.0,
'minWireVersion' => 0,
'maxWireVersion' => 21,
}
end

let(:rtt_connection) do
double('monitor connection').tap do |conn|
allow(conn).to receive(:pid).and_return(Process.pid)
allow(conn).to receive(:check_document).and_return({ 'hello' => 1 })
allow(conn).to receive(:dispatch_bytes).and_return(
double('message', documents: [ hello_reply ])
)
allow(conn).to receive(:disconnect!)
end
end

let(:running_push_monitor) do
double('push monitor').tap do |push_monitor|
allow(push_monitor).to receive(:running?).and_return(true)
allow(push_monitor).to receive(:stop!)
end
end

before do
# The server must be in a known state for RTT-only suppression to
# apply; an Unknown server is always given a full recovery check.
server.update_description(Mongo::Server::Description.new(address, hello_reply))
expect(server.description).not_to be_unknown
monitor.instance_variable_set(:@connection, rtt_connection)
monitor.instance_variable_set(:@push_monitor, running_push_monitor)
end

it 'does not run the SDAM flow' do
expect(cluster).not_to receive(:run_sdam_flow)
monitor.scan!
end

it 'does not publish heartbeat events' do
expect(monitor.monitoring).not_to receive(:publish_heartbeat)
monitor.scan!
end
end

context 'when streaming is active but the server is Unknown' do
# Recovery path: even with an established connection and a running
# PushMonitor, an Unknown server must get a full check so it is
# recovered promptly, rather than waiting for the next streaming
# response (which could be up to heartbeatFrequencyMS away and would
# let server selection fail in the meantime).

let(:hello_reply) do
{
'isWritablePrimary' => true,
'ok' => 1.0,
'minWireVersion' => 0,
'maxWireVersion' => 21,
}
end

let(:rtt_connection) do
double('monitor connection').tap do |conn|
allow(conn).to receive(:pid).and_return(Process.pid)
allow(conn).to receive(:check_document).and_return({ 'hello' => 1 })
allow(conn).to receive(:dispatch_bytes).and_return(
double('message', documents: [ hello_reply ])
)
allow(conn).to receive(:disconnect!)
end
end

let(:running_push_monitor) do
double('push monitor').tap do |push_monitor|
allow(push_monitor).to receive(:running?).and_return(true)
allow(push_monitor).to receive(:stop!)
end
end

before do
# Server starts Unknown (monitoring_io is disabled).
expect(server.description).to be_unknown
monitor.instance_variable_set(:@connection, rtt_connection)
monitor.instance_variable_set(:@push_monitor, running_push_monitor)
end

it 'runs the SDAM flow to recover the server' do
expect(cluster).to receive(:run_sdam_flow)
monitor.scan!
end
end

context 'when in the polling protocol with an established connection' do
# Polling protocol (serverMonitoringMode=poll, FaaS, or pre-4.4): there
# is no PushMonitor, so reusing the connection to run hello is a real
# server check, not an RTT-only measurement. It must keep publishing
# events and running the SDAM flow exactly as before. This guards
# against the RTT-only suppression leaking into polling mode.

let(:hello_reply) do
{
'isWritablePrimary' => true,
'ok' => 1.0,
'minWireVersion' => 0,
'maxWireVersion' => 21,
}
end

let(:polling_connection) do
double('monitor connection').tap do |conn|
allow(conn).to receive(:pid).and_return(Process.pid)
allow(conn).to receive(:check_document).and_return({ 'hello' => 1 })
allow(conn).to receive(:dispatch_bytes).and_return(
double('message', documents: [ hello_reply ])
)
allow(conn).to receive(:disconnect!)
end
end

before do
monitor.instance_variable_set(:@connection, polling_connection)
# No PushMonitor exists in polling mode.
expect(monitor.push_monitor).to be_nil
end

it 'runs the SDAM flow' do
expect(cluster).to receive(:run_sdam_flow)
monitor.scan!
end

it 'publishes heartbeat events' do
expect(monitor.monitoring).to receive(:publish_heartbeat).and_call_original
monitor.scan!
end
end
end

# heartbeat interval is now taken out of cluster, monitor has no useful options
Expand Down
34 changes: 34 additions & 0 deletions spec/mongo/server/push_monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@
end.should_not raise_error
end

# The streaming connection must identify itself to the server with a
# metadata handshake (carrying the appName) before streaming, like the
# polling Monitor's connection does. Otherwise appName-scoped server
# behaviour (e.g. failCommand fail points used by the SDAM spec tests)
# never applies to the streaming hello, and the streaming check cannot
# observe monitor errors.
it 'performs a metadata handshake before streaming' do
expect_any_instance_of(Mongo::Server::PushMonitor::Connection)
.to receive(:handshake!).and_call_original

push_monitor.do_work
end

context 'network error during check' do
it 'does not propagate the exception' do
push_monitor
Expand All @@ -89,6 +102,27 @@

push_monitor.running?.should be false
end

# Per the Server Monitoring spec ("Network or command error during server
# check"), the streaming monitor must mark the server Unknown and clear
# the pool when its check fails. In the Ruby driver the PushMonitor owns
# the streaming check, so it must run the SDAM flow with the scan error
# rather than relying on the polling Monitor to notice the failure.
it 'marks the server unknown via the SDAM flow' do
push_monitor

expect(Socket).to receive(:getaddrinfo).and_raise(SocketError.new('Test exception'))

expect(cluster).to receive(:run_sdam_flow) do |_previous_desc, updated_desc, options|
expect(updated_desc).to be_unknown
expect(options[:awaited]).to be true
expect(options[:scan_error]).to be_a(Mongo::Error::SocketError)
end

lambda do
push_monitor.do_work
end.should_not raise_error
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ tests:
failPoint:
configureFailPoint: failCommand
mode:
times: 1
times: 4
data:
failCommands:
- hello
Expand Down Expand Up @@ -182,7 +182,7 @@ tests:
failPoint:
configureFailPoint: failCommand
mode:
times: 1
times: 4
data:
failCommands:
- hello
Expand Down Expand Up @@ -291,7 +291,7 @@ tests:
failPoint:
configureFailPoint: failCommand
mode:
times: 1
times: 4
data:
failCommands:
- hello
Expand Down
Loading