diff --git a/lib/api/readable.js b/lib/api/readable.js index 5ebd04ebe83..61cae82b12f 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -3,6 +3,7 @@ const assert = require('node:assert') const { addAbortListener } = require('node:events') const { Readable } = require('node:stream') +const { StringDecoder } = require('node:string_decoder') const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors') const util = require('../core/util') const { ReadableStreamFrom } = require('../core/util') @@ -15,6 +16,9 @@ const kContentType = Symbol('kContentType') const kContentLength = Symbol('kContentLength') const kUsed = Symbol('kUsed') const kBytesRead = Symbol('kBytesRead') +const kDecoder = Symbol('kDecoder') +const kEncoding = Symbol('kEncoding') +const kDecoderEnded = Symbol('kDecoderEnded') const noop = () => {} @@ -55,6 +59,15 @@ class BodyReadable extends Readable { /** @type {number} */ this[kBytesRead] = 0 + /** @type {StringDecoder|null} */ + this[kDecoder] = null + + /** @type {BufferEncoding|undefined} */ + this[kEncoding] = undefined + + /** @type {boolean} */ + this[kDecoderEnded] = false + /** @type {ReadableStream|null} */ this[kBody] = null @@ -161,11 +174,39 @@ class BodyReadable extends Readable { consumePush(this[kConsume], chunk) return this[kReading] ? super.push(chunk) : true } + + return super.push(chunk) } return super.push(chunk) } + /** + * @param {number} [size] + * @returns {Buffer|string|null} + */ + read (size) { + const chunk = super.read(size) + + if (!this[kDecoder]) { + return chunk + } + + if (chunk !== null) { + return this[kDecoder].write(chunk) + } + + if (this._readableState.endEmitted && !this[kDecoderEnded]) { + this[kDecoderEnded] = true + const tail = this[kDecoder].end() + if (tail !== '') { + return tail + } + } + + return chunk + } + /** * Consumes and returns the body as a string. * @@ -324,10 +365,22 @@ class BodyReadable extends Readable { * @returns {this} */ setEncoding (encoding) { + const state = this._readableState + const buffer = state.buffer.slice() + const bufferIndex = state.bufferIndex + const length = state.length + const ret = super.setEncoding(encoding) if (Buffer.isEncoding(encoding)) { - this._readableState.encoding = encoding + const decoder = new StringDecoder(encoding) + this[kDecoder] = decoder + this[kEncoding] = decoder.encoding + state.buffer = buffer + state.bufferIndex = bufferIndex + state.length = length + state.decoder = null + state.encoding = null } - return this + return ret } } @@ -396,6 +449,11 @@ function consume (stream, type) { reject(rState.errored ?? new TypeError('unusable')) } } else { + if (stream[kEncoding]) { + stream._readableState.decoder = null + stream._readableState.encoding = null + } + queueMicrotask(() => { stream[kConsume] = { type, @@ -432,6 +490,10 @@ function consumeStart (consume) { } const { _readableState: state } = consume.stream + if (consume.stream[kEncoding]) { + state.decoder = null + state.encoding = null + } if (state.bufferIndex) { const start = state.bufferIndex @@ -446,10 +508,10 @@ function consumeStart (consume) { } if (state.endEmitted) { - consumeEnd(this[kConsume], this._readableState.encoding) + consumeEnd(this[kConsume], this[kEncoding]) } else { consume.stream.on('end', function () { - consumeEnd(this[kConsume], this._readableState.encoding) + consumeEnd(this[kConsume], this[kEncoding]) }) } @@ -546,6 +608,9 @@ function consumeEnd (consume, encoding) { * @returns {void} */ function consumePush (consume, chunk) { + if (typeof chunk === 'string') { + chunk = Buffer.from(chunk, consume.stream[kEncoding]) + } consume.length += chunk.length consume.body.push(chunk) } diff --git a/test/client-request.js b/test/client-request.js index 3e154aca6b1..22007c5d947 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -1494,6 +1494,39 @@ test('request multibyte text with setEncoding', async (t) => { await t.completed }) +test('request multibyte text with async iteration and setEncoding', async (t) => { + t = tspl(t, { plan: 1 }) + + const data = Buffer.from('abc傳def') + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.write(data.subarray(0, 5)) + setTimeout(() => { + res.end(data.subarray(5)) + }, 100) + }) + after(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Client(`http://localhost:${server.address().port}`) + after(client.destroy.bind(client)) + + const { body } = await client.request({ + path: '/', + method: 'GET' + }) + body.setEncoding('utf8') + + let text = '' + for await (const chunk of body) { + text += chunk + } + + t.deepStrictEqual(text, data.toString('utf8')) + }) + + await t.completed +}) + test('request multibyte text with setEncoding', async (t) => { t = tspl(t, { plan: 1 })