Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 56 additions & 18 deletions lib/handler/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,28 @@ function calculateRetryAfterHeader (retryAfter) {
return isNaN(retryTime) ? 0 : retryTime - Date.now()
}

// A stable controller handed to the downstream handler for the lifetime of the
// request. Each transparent retry/resume is a *separate* dispatch with its
// *own* connection controller. Without a stable proxy the downstream body keeps
// flow-controlling the original (now-dead) controller while data flows on the
// new one: backpressure pauses the new connection's controller, but the
// consumer's resume() targets the old one, so the resumed body stalls forever.
// The proxy always forwards to the controller of the currently active connection.
class RetryController {
constructor () {
this.target = null
}

pause () { this.target?.pause() }
resume () { this.target?.resume() }
abort (reason) { this.target?.abort(reason) }
get paused () { return this.target?.paused ?? false }
get aborted () { return this.target?.aborted ?? false }
get reason () { return this.target?.reason ?? null }
get rawHeaders () { return this.target?.rawHeaders ?? null }
get rawTrailers () { return this.target?.rawTrailers ?? null }
}

class RetryHandler {
constructor (opts, { dispatch, handler }) {
const { retryOptions, ...dispatchOpts } = opts
Expand Down Expand Up @@ -70,14 +92,15 @@ class RetryHandler {
this.etag = null
this.statusCode = null
this.headers = null
this.controllerProxy = new RetryController()
}

onResponseStartWithRetry (controller, statusCode, headers, statusMessage, err) {
if (this.retryOpts.throwOnError) {
// Preserve old behavior for status codes that are not eligible for retry
if (this.retryOpts.statusCodes.includes(statusCode) === false) {
this.headersSent = true
this.handler.onResponseStart?.(controller, statusCode, headers, statusMessage)
this.handler.onResponseStart?.(this.controllerProxy, statusCode, headers, statusMessage)
} else {
this.error = err
}
Expand All @@ -87,14 +110,14 @@ class RetryHandler {

if (isDisturbed(this.opts.body)) {
this.headersSent = true
this.handler.onResponseStart?.(controller, statusCode, headers, statusMessage)
this.handler.onResponseStart?.(this.controllerProxy, statusCode, headers, statusMessage)
return
}

function shouldRetry (passedErr) {
if (passedErr) {
this.headersSent = true
this.handler.onResponseStart?.(controller, statusCode, headers, statusMessage)
this.handler.onResponseStart?.(this.controllerProxy, statusCode, headers, statusMessage)
controller.resume()
return
}
Expand All @@ -103,6 +126,13 @@ class RetryHandler {
controller.resume()
}

// The pause()/resume() pair (here and in shouldRetry) acts on THIS
// connection's controller -- never the downstream proxy. We hold this exact
// connection while the retry policy decides (possibly after a timeout) and
// must resume the same one. Routing this through controllerProxy would risk
// resuming a different connection if a later dispatch re-points the proxy in
// between, leaving this one paused forever -- the very stall the proxy exists
// to prevent.
controller.pause()
this.retryOpts.retry(
err,
Expand All @@ -115,13 +145,19 @@ class RetryHandler {
}

onRequestStart (controller, context) {
// request.js creates a fresh RequestController per dispatch and passes that
// same instance to every later callback of the dispatch. onRequestStart is
// the first callback (it is where the controller is created), so re-pointing
// the proxy here is enough to keep it on the active connection across every
// transparent retry/resume.
this.controllerProxy.target = controller
if (!this.headersSent) {
this.handler.onRequestStart?.(controller, context)
this.handler.onRequestStart?.(this.controllerProxy, context)
}
}

onRequestUpgrade (controller, statusCode, headers, socket) {
this.handler.onRequestUpgrade?.(controller, statusCode, headers, socket)
onRequestUpgrade (_controller, statusCode, headers, socket) {
this.handler.onRequestUpgrade?.(this.controllerProxy, statusCode, headers, socket)
}

static [kRetryHandlerDefaultRetry] (err, { state, opts }, cb) {
Expand Down Expand Up @@ -248,7 +284,7 @@ class RetryHandler {
if (range == null) {
this.headersSent = true
this.handler.onResponseStart?.(
controller,
this.controllerProxy,
statusCode,
headers,
statusMessage
Expand Down Expand Up @@ -295,7 +331,7 @@ class RetryHandler {

this.headersSent = true
this.handler.onResponseStart?.(
controller,
this.controllerProxy,
statusCode,
headers,
statusMessage
Expand All @@ -308,17 +344,17 @@ class RetryHandler {
}
}

onResponseData (controller, chunk) {
onResponseData (_controller, chunk) {
if (this.error) {
return
}

this.start += chunk.length

this.handler.onResponseData?.(controller, chunk)
this.handler.onResponseData?.(this.controllerProxy, chunk)
}

onResponseEnd (controller, trailers) {
onResponseEnd (_controller, trailers) {
if (this.error && this.retryOpts.throwOnError) {
throw this.error
}
Expand All @@ -335,13 +371,13 @@ class RetryHandler {
}
}
this.retryCount = 0
return this.handler.onResponseEnd?.(controller, trailers)
return this.handler.onResponseEnd?.(this.controllerProxy, trailers)
}

this.retry(controller)
this.retry()
}

retry (controller) {
retry () {
if (this.start !== 0) {
const headers = { range: `bytes=${this.start}-${this.end ?? ''}` }

Expand All @@ -363,23 +399,25 @@ class RetryHandler {
this.retryCountCheckpoint = this.retryCount
this.dispatch(this.opts, this)
} catch (err) {
this.handler.onResponseError?.(controller, err)
this.handler.onResponseError?.(this.controllerProxy, err)
}
}

onResponseError (controller, err) {
// controller is THIS failed connection (not the proxy): we inspect whether
// the consumer aborted it to decide retry-vs-propagate.
if (controller?.aborted || isDisturbed(this.opts.body)) {
this.handler.onResponseError?.(controller, err)
this.handler.onResponseError?.(this.controllerProxy, err)
return
}

function shouldRetry (returnedErr) {
if (!returnedErr) {
this.retry(controller)
this.retry()
return
}

this.handler?.onResponseError?.(controller, returnedErr)
this.handler?.onResponseError?.(this.controllerProxy, returnedErr)
}

// We reconcile in case of a mix between network errors
Expand Down
127 changes: 127 additions & 0 deletions test/client-retry-resume-backpressure.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
'use strict'

const { tspl } = require('@matteo.collina/tspl')
const { test, after } = require('node:test')
const { createServer } = require('node:http')
const { Writable } = require('node:stream')
const { pipeline } = require('node:stream/promises')
const { once } = require('node:events')
const { Agent, RetryAgent, request } = require('..')

const TOTAL = 4 * 1024 * 1024
const PART = 1 * 1024 * 1024
const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms))

// Both tests exercise the RetryHandler controller-proxy fix. A download is
// interrupted mid-body with a retryable error (ECONNRESET); RetryAgent
// transparently resumes it with a Range request on a NEW connection. Before the
// fix the downstream body kept flow-controlling the ORIGINAL (now-dead)
// connection's controller while data/pause flowed on the new one, so:
// - under backpressure the resumed body paused and was never resumed (it hung);
// - an abort hit the dead original instead of the live resumed connection.
// With the fix both follow the active connection.

// Faithful reproduction of the reported hang. If the bug returns, pipeline()
// never resolves and the test is failed by the runner timeout (the npm scripts
// run borp with --timeout). Note: `node --test --test-timeout=0` disables that
// timeout, so run this under the npm test scripts.
test('RetryAgent resumes a backpressured body after a mid-stream connection drop', async (t) => {
t = tspl(t, { plan: 3 })

let requests = 0
const server = createServer((req, res) => {
requests++
if (!req.headers.range) {
res.writeHead(200, { 'content-length': String(TOTAL) })
res.write(Buffer.alloc(PART))
setTimeout(() => res.socket.resetAndDestroy(), 100)
} else {
const start = Number(/bytes=(\d+)-/.exec(req.headers.range)[1])
res.writeHead(206, {
'content-range': `bytes ${start}-${TOTAL - 1}/${TOTAL}`,
'content-length': String(TOTAL - start)
})
res.end(Buffer.alloc(TOTAL - start))
}
})
after(() => server.close())
server.listen(0)
await once(server, 'listening')

const dispatcher = new RetryAgent(new Agent(), { maxRetries: 5, minTimeout: 100, timeoutFactor: 1 })
after(() => dispatcher.close())

const { statusCode, body } = await request(`http://localhost:${server.address().port}`, { dispatcher })
t.strictEqual(statusCode, 200)

// Slow consumer -> sustained backpressure on the resumed connection (the trigger).
let received = 0
const slow = new Writable({
highWaterMark: 16 * 1024,
write (chunk, enc, cb) { received += chunk.length; setTimeout(cb, 5) }
})

await pipeline(body, slow) // before the fix this never resolves

t.strictEqual(received, TOTAL)
t.ok(requests >= 2, 'the download was actually resumed on a new connection')

await t.completed
})

// Companion check on the abort path. A flowing consumer keeps the resumed
// connection in-flight (no backpressure pause), then aborts. The server-side
// socket of the resumed connection must close: with the fix the abort reaches
// the live connection; before it, it hit the dead original and the resumed
// socket leaked. Bounded so a regression fails fast instead of waiting.
test('RetryAgent aborts the resumed connection (not the dead original) after a drop', async (t) => {
t = tspl(t, { plan: 2 })

let resumeSocket = null
let onResume
const resumed = new Promise(resolve => { onResume = resolve })

const server = createServer((req, res) => {
if (!req.headers.range) {
res.writeHead(200, { 'content-length': String(TOTAL) })
res.write(Buffer.alloc(PART))
setTimeout(() => res.socket.resetAndDestroy(), 100)
} else {
resumeSocket = res.socket
const start = Number(/bytes=(\d+)-/.exec(req.headers.range)[1])
res.writeHead(206, {
'content-range': `bytes ${start}-${TOTAL - 1}/${TOTAL}`,
'content-length': String(TOTAL - start)
})
res.write(Buffer.alloc(64 * 1024)) // keep the resumed connection in-flight
onResume()
}
})
after(() => server.close())
server.listen(0)
await once(server, 'listening')

const dispatcher = new RetryAgent(new Agent(), { maxRetries: 5, minTimeout: 100, timeoutFactor: 1 })
after(() => dispatcher.destroy())

const ac = new AbortController()
const { statusCode, body } = await request(`http://localhost:${server.address().port}`, { dispatcher, signal: ac.signal })
t.strictEqual(statusCode, 200)

body.on('data', () => {}).on('error', () => {}) // flowing consumer; swallow the abort error

await resumed // downloading on the resumed (206) connection now
await sleep(50) // let a little data flow on it

const closed = once(resumeSocket, 'close') // attach before aborting
ac.abort()

let timer
const bound = new Promise(resolve => { timer = setTimeout(() => resolve(false), 4000) })
const wasClosed = await Promise.race([closed.then(() => true), bound])
clearTimeout(timer)

t.ok(wasClosed, 'aborting closed the resumed connection, not the dead original')

await t.completed
})
Loading