diff --git a/lib/handler/retry-handler.js b/lib/handler/retry-handler.js index 7ec506e21eb..7fbc1a6ad77 100644 --- a/lib/handler/retry-handler.js +++ b/lib/handler/retry-handler.js @@ -14,6 +14,28 @@ function calculateRetryAfterHeader (retryAfter) { return isNaN(retryTime) ? 0 : retryTime - Date.now() } +// A stable controller handed to the downstream handler for the lifetime of the +// request. Each transparent retry/resume is a *separate* dispatch with its +// *own* connection controller. Without a stable proxy the downstream body keeps +// flow-controlling the original (now-dead) controller while data flows on the +// new one: backpressure pauses the new connection's controller, but the +// consumer's resume() targets the old one, so the resumed body stalls forever. +// The proxy always forwards to the controller of the currently active connection. +class RetryController { + constructor () { + this.target = null + } + + pause () { this.target?.pause() } + resume () { this.target?.resume() } + abort (reason) { this.target?.abort(reason) } + get paused () { return this.target?.paused ?? false } + get aborted () { return this.target?.aborted ?? false } + get reason () { return this.target?.reason ?? null } + get rawHeaders () { return this.target?.rawHeaders ?? null } + get rawTrailers () { return this.target?.rawTrailers ?? null } +} + class RetryHandler { constructor (opts, { dispatch, handler }) { const { retryOptions, ...dispatchOpts } = opts @@ -70,6 +92,7 @@ class RetryHandler { this.etag = null this.statusCode = null this.headers = null + this.controllerProxy = new RetryController() } onResponseStartWithRetry (controller, statusCode, headers, statusMessage, err) { @@ -77,7 +100,7 @@ class RetryHandler { // Preserve old behavior for status codes that are not eligible for retry if (this.retryOpts.statusCodes.includes(statusCode) === false) { this.headersSent = true - this.handler.onResponseStart?.(controller, statusCode, headers, statusMessage) + this.handler.onResponseStart?.(this.controllerProxy, statusCode, headers, statusMessage) } else { this.error = err } @@ -87,14 +110,14 @@ class RetryHandler { if (isDisturbed(this.opts.body)) { this.headersSent = true - this.handler.onResponseStart?.(controller, statusCode, headers, statusMessage) + this.handler.onResponseStart?.(this.controllerProxy, statusCode, headers, statusMessage) return } function shouldRetry (passedErr) { if (passedErr) { this.headersSent = true - this.handler.onResponseStart?.(controller, statusCode, headers, statusMessage) + this.handler.onResponseStart?.(this.controllerProxy, statusCode, headers, statusMessage) controller.resume() return } @@ -103,6 +126,13 @@ class RetryHandler { controller.resume() } + // The pause()/resume() pair (here and in shouldRetry) acts on THIS + // connection's controller -- never the downstream proxy. We hold this exact + // connection while the retry policy decides (possibly after a timeout) and + // must resume the same one. Routing this through controllerProxy would risk + // resuming a different connection if a later dispatch re-points the proxy in + // between, leaving this one paused forever -- the very stall the proxy exists + // to prevent. controller.pause() this.retryOpts.retry( err, @@ -115,13 +145,19 @@ class RetryHandler { } onRequestStart (controller, context) { + // request.js creates a fresh RequestController per dispatch and passes that + // same instance to every later callback of the dispatch. onRequestStart is + // the first callback (it is where the controller is created), so re-pointing + // the proxy here is enough to keep it on the active connection across every + // transparent retry/resume. + this.controllerProxy.target = controller if (!this.headersSent) { - this.handler.onRequestStart?.(controller, context) + this.handler.onRequestStart?.(this.controllerProxy, context) } } - onRequestUpgrade (controller, statusCode, headers, socket) { - this.handler.onRequestUpgrade?.(controller, statusCode, headers, socket) + onRequestUpgrade (_controller, statusCode, headers, socket) { + this.handler.onRequestUpgrade?.(this.controllerProxy, statusCode, headers, socket) } static [kRetryHandlerDefaultRetry] (err, { state, opts }, cb) { @@ -248,7 +284,7 @@ class RetryHandler { if (range == null) { this.headersSent = true this.handler.onResponseStart?.( - controller, + this.controllerProxy, statusCode, headers, statusMessage @@ -295,7 +331,7 @@ class RetryHandler { this.headersSent = true this.handler.onResponseStart?.( - controller, + this.controllerProxy, statusCode, headers, statusMessage @@ -308,17 +344,17 @@ class RetryHandler { } } - onResponseData (controller, chunk) { + onResponseData (_controller, chunk) { if (this.error) { return } this.start += chunk.length - this.handler.onResponseData?.(controller, chunk) + this.handler.onResponseData?.(this.controllerProxy, chunk) } - onResponseEnd (controller, trailers) { + onResponseEnd (_controller, trailers) { if (this.error && this.retryOpts.throwOnError) { throw this.error } @@ -335,13 +371,13 @@ class RetryHandler { } } this.retryCount = 0 - return this.handler.onResponseEnd?.(controller, trailers) + return this.handler.onResponseEnd?.(this.controllerProxy, trailers) } - this.retry(controller) + this.retry() } - retry (controller) { + retry () { if (this.start !== 0) { const headers = { range: `bytes=${this.start}-${this.end ?? ''}` } @@ -363,23 +399,25 @@ class RetryHandler { this.retryCountCheckpoint = this.retryCount this.dispatch(this.opts, this) } catch (err) { - this.handler.onResponseError?.(controller, err) + this.handler.onResponseError?.(this.controllerProxy, err) } } onResponseError (controller, err) { + // controller is THIS failed connection (not the proxy): we inspect whether + // the consumer aborted it to decide retry-vs-propagate. if (controller?.aborted || isDisturbed(this.opts.body)) { - this.handler.onResponseError?.(controller, err) + this.handler.onResponseError?.(this.controllerProxy, err) return } function shouldRetry (returnedErr) { if (!returnedErr) { - this.retry(controller) + this.retry() return } - this.handler?.onResponseError?.(controller, returnedErr) + this.handler?.onResponseError?.(this.controllerProxy, returnedErr) } // We reconcile in case of a mix between network errors diff --git a/test/client-retry-resume-backpressure.js b/test/client-retry-resume-backpressure.js new file mode 100644 index 00000000000..1e3519ac9f0 --- /dev/null +++ b/test/client-retry-resume-backpressure.js @@ -0,0 +1,127 @@ +'use strict' + +const { tspl } = require('@matteo.collina/tspl') +const { test, after } = require('node:test') +const { createServer } = require('node:http') +const { Writable } = require('node:stream') +const { pipeline } = require('node:stream/promises') +const { once } = require('node:events') +const { Agent, RetryAgent, request } = require('..') + +const TOTAL = 4 * 1024 * 1024 +const PART = 1 * 1024 * 1024 +const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms)) + +// Both tests exercise the RetryHandler controller-proxy fix. A download is +// interrupted mid-body with a retryable error (ECONNRESET); RetryAgent +// transparently resumes it with a Range request on a NEW connection. Before the +// fix the downstream body kept flow-controlling the ORIGINAL (now-dead) +// connection's controller while data/pause flowed on the new one, so: +// - under backpressure the resumed body paused and was never resumed (it hung); +// - an abort hit the dead original instead of the live resumed connection. +// With the fix both follow the active connection. + +// Faithful reproduction of the reported hang. If the bug returns, pipeline() +// never resolves and the test is failed by the runner timeout (the npm scripts +// run borp with --timeout). Note: `node --test --test-timeout=0` disables that +// timeout, so run this under the npm test scripts. +test('RetryAgent resumes a backpressured body after a mid-stream connection drop', async (t) => { + t = tspl(t, { plan: 3 }) + + let requests = 0 + const server = createServer((req, res) => { + requests++ + if (!req.headers.range) { + res.writeHead(200, { 'content-length': String(TOTAL) }) + res.write(Buffer.alloc(PART)) + setTimeout(() => res.socket.resetAndDestroy(), 100) + } else { + const start = Number(/bytes=(\d+)-/.exec(req.headers.range)[1]) + res.writeHead(206, { + 'content-range': `bytes ${start}-${TOTAL - 1}/${TOTAL}`, + 'content-length': String(TOTAL - start) + }) + res.end(Buffer.alloc(TOTAL - start)) + } + }) + after(() => server.close()) + server.listen(0) + await once(server, 'listening') + + const dispatcher = new RetryAgent(new Agent(), { maxRetries: 5, minTimeout: 100, timeoutFactor: 1 }) + after(() => dispatcher.close()) + + const { statusCode, body } = await request(`http://localhost:${server.address().port}`, { dispatcher }) + t.strictEqual(statusCode, 200) + + // Slow consumer -> sustained backpressure on the resumed connection (the trigger). + let received = 0 + const slow = new Writable({ + highWaterMark: 16 * 1024, + write (chunk, enc, cb) { received += chunk.length; setTimeout(cb, 5) } + }) + + await pipeline(body, slow) // before the fix this never resolves + + t.strictEqual(received, TOTAL) + t.ok(requests >= 2, 'the download was actually resumed on a new connection') + + await t.completed +}) + +// Companion check on the abort path. A flowing consumer keeps the resumed +// connection in-flight (no backpressure pause), then aborts. The server-side +// socket of the resumed connection must close: with the fix the abort reaches +// the live connection; before it, it hit the dead original and the resumed +// socket leaked. Bounded so a regression fails fast instead of waiting. +test('RetryAgent aborts the resumed connection (not the dead original) after a drop', async (t) => { + t = tspl(t, { plan: 2 }) + + let resumeSocket = null + let onResume + const resumed = new Promise(resolve => { onResume = resolve }) + + const server = createServer((req, res) => { + if (!req.headers.range) { + res.writeHead(200, { 'content-length': String(TOTAL) }) + res.write(Buffer.alloc(PART)) + setTimeout(() => res.socket.resetAndDestroy(), 100) + } else { + resumeSocket = res.socket + const start = Number(/bytes=(\d+)-/.exec(req.headers.range)[1]) + res.writeHead(206, { + 'content-range': `bytes ${start}-${TOTAL - 1}/${TOTAL}`, + 'content-length': String(TOTAL - start) + }) + res.write(Buffer.alloc(64 * 1024)) // keep the resumed connection in-flight + onResume() + } + }) + after(() => server.close()) + server.listen(0) + await once(server, 'listening') + + const dispatcher = new RetryAgent(new Agent(), { maxRetries: 5, minTimeout: 100, timeoutFactor: 1 }) + after(() => dispatcher.destroy()) + + const ac = new AbortController() + const { statusCode, body } = await request(`http://localhost:${server.address().port}`, { dispatcher, signal: ac.signal }) + t.strictEqual(statusCode, 200) + + body.on('data', () => {}).on('error', () => {}) // flowing consumer; swallow the abort error + + await resumed // downloading on the resumed (206) connection now + await sleep(50) // let a little data flow on it + + const closed = once(resumeSocket, 'close') // attach before aborting + ac.abort() + + let timer + const bound = new Promise(resolve => { timer = setTimeout(() => resolve(false), 4000) }) + const wasClosed = await Promise.race([closed.then(() => true), bound]) + clearTimeout(timer) + + t.ok(wasClosed, 'aborting closed the resumed connection, not the dead original') + + await t.completed +}) diff --git a/test/retry-handler-controller-proxy.js b/test/retry-handler-controller-proxy.js new file mode 100644 index 00000000000..04f432664a5 --- /dev/null +++ b/test/retry-handler-controller-proxy.js @@ -0,0 +1,167 @@ +'use strict' + +const { tspl } = require('@matteo.collina/tspl') +const { test } = require('node:test') + +const { RetryHandler } = require('..') + +// These tests pin down the RetryController proxy contract introduced to keep +// flow-control wired to the active connection across transparent retries/resumes. +// Each retry/resume is a separate dispatch with its own connection controller; +// the downstream handler is handed ONE stable proxy that always forwards to the +// controller of the currently active connection. See lib/handler/retry-handler.js. + +const baseOpts = { + method: 'GET', + path: '/', + retryOptions: {} +} + +// Stand-in for the per-dispatch RequestController of an active connection. It +// records the flow-control calls the proxy forwards to it. +function activeConnectionController () { + const calls = [] + return { + calls, + paused: true, + aborted: true, + reason: new Error('boom'), + rawHeaders: ['content-length', '2'], + rawTrailers: ['x-trailer', 'value'], + pause () { calls.push('pause') }, + resume () { calls.push('resume') }, + abort (reason) { calls.push(['abort', reason]) } + } +} + +test('controller proxy returns safe defaults and is a no-op before a connection is active', (t) => { + t = tspl(t, { plan: 6 }) + + const handler = new RetryHandler(baseOpts, { + dispatch: () => {}, + handler: {} + }) + + // No dispatch has happened yet, so the proxy has no active connection to + // forward to. Reads must fall back to safe defaults instead of throwing. + const proxy = handler.controllerProxy + t.strictEqual(proxy.paused, false) + t.strictEqual(proxy.aborted, false) + t.strictEqual(proxy.reason, null) + t.strictEqual(proxy.rawHeaders, null) + t.strictEqual(proxy.rawTrailers, null) + + // Methods must be inert (not throw) while there is nothing to forward to. + t.doesNotThrow(() => { + proxy.pause() + proxy.resume() + proxy.abort(new Error('ignored')) + }) +}) + +test('controller proxy forwards reads/writes to the active connection and stays stable across callbacks', (t) => { + t = tspl(t, { plan: 9 }) + + let downstreamController = null + let upgradeController = null + const upgradeArgs = [] + + const handler = new RetryHandler(baseOpts, { + dispatch: () => {}, + handler: { + onRequestStart (controller) { + downstreamController = controller + }, + onRequestUpgrade (controller, statusCode, headers, socket) { + upgradeController = controller + upgradeArgs.push(statusCode, headers, socket) + } + } + }) + + const connection = activeConnectionController() + + // onRequestStart is the first callback of a dispatch; it re-points the proxy + // at this connection's controller and hands the (stable) proxy downstream. + handler.onRequestStart(connection, {}) + + // The downstream handler must receive the proxy, never the raw per-connection + // controller, so flow-control survives the next resume. + t.notStrictEqual(downstreamController, connection) + + // Reads forward to the active connection's controller. + t.deepStrictEqual(downstreamController.rawHeaders, ['content-length', '2']) + t.deepStrictEqual(downstreamController.rawTrailers, ['x-trailer', 'value']) + t.strictEqual(downstreamController.paused, true) + t.strictEqual(downstreamController.aborted, true) + t.strictEqual(downstreamController.reason, connection.reason) + + // Writes forward to the active connection's controller too. + downstreamController.pause() + downstreamController.resume() + downstreamController.abort('stop') + t.deepStrictEqual(connection.calls, ['pause', 'resume', ['abort', 'stop']]) + + // An upgrade on the same dispatch is forwarded through the very same proxy + // instance (not the raw controller), keeping the downstream wiring stable. + handler.onRequestUpgrade(connection, 101, { upgrade: 'websocket' }, 'SOCKET') + t.strictEqual(upgradeController, downstreamController) + t.deepStrictEqual(upgradeArgs, [101, { upgrade: 'websocket' }, 'SOCKET']) +}) + +test('controller proxy is forwarded downstream on a 206 whose content-range is unparseable', (t) => { + t = tspl(t, { plan: 3 }) + + let startController = null + const startArgs = [] + + const handler = new RetryHandler(baseOpts, { + dispatch: () => {}, + handler: { + onResponseStart (controller, statusCode, headers) { + startController = controller + startArgs.push(statusCode, headers) + } + } + }) + + const connection = activeConnectionController() + handler.onRequestStart(connection, {}) + + // A 206 whose content-range cannot be parsed: the handler cannot set up a + // resume, so it gives up and forwards the response downstream as-is -- still + // through the proxy, not the raw per-connection controller. + handler.onResponseStart(connection, 206, { 'content-range': 'invalid' }, 'Partial Content') + + t.strictEqual(startController, handler.controllerProxy) + t.strictEqual(startArgs[0], 206) + t.deepStrictEqual(startArgs[1], { 'content-range': 'invalid' }) +}) + +test('controller proxy carries a synchronous dispatch failure to the downstream handler', (t) => { + t = tspl(t, { plan: 2 }) + + const dispatchError = new Error('dispatch failed synchronously') + let errController = null + let receivedErr = null + + const handler = new RetryHandler(baseOpts, { + dispatch: () => { throw dispatchError }, + handler: { + onResponseError (controller, err) { + errController = controller + receivedErr = err + } + } + }) + + const connection = activeConnectionController() + handler.onRequestStart(connection, {}) + + // retry() re-dispatches; when that dispatch throws synchronously the error is + // surfaced to the downstream handler through the proxy. + handler.retry() + + t.strictEqual(errController, handler.controllerProxy) + t.strictEqual(receivedErr, dispatchError) +})