Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 69 additions & 4 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const assert = require('node:assert')
const { addAbortListener } = require('node:events')
const { Readable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors')
const util = require('../core/util')
const { ReadableStreamFrom } = require('../core/util')
Expand All @@ -15,6 +16,9 @@ const kContentType = Symbol('kContentType')
const kContentLength = Symbol('kContentLength')
const kUsed = Symbol('kUsed')
const kBytesRead = Symbol('kBytesRead')
const kDecoder = Symbol('kDecoder')
const kEncoding = Symbol('kEncoding')
const kDecoderEnded = Symbol('kDecoderEnded')

const noop = () => {}

Expand Down Expand Up @@ -55,6 +59,15 @@ class BodyReadable extends Readable {
/** @type {number} */
this[kBytesRead] = 0

/** @type {StringDecoder|null} */
this[kDecoder] = null

/** @type {BufferEncoding|undefined} */
this[kEncoding] = undefined

/** @type {boolean} */
this[kDecoderEnded] = false

/** @type {ReadableStream|null} */
this[kBody] = null

Expand Down Expand Up @@ -161,11 +174,39 @@ class BodyReadable extends Readable {
consumePush(this[kConsume], chunk)
return this[kReading] ? super.push(chunk) : true
}

return super.push(chunk)
}

return super.push(chunk)
}

/**
* @param {number} [size]
* @returns {Buffer|string|null}
*/
read (size) {
const chunk = super.read(size)

if (!this[kDecoder]) {
return chunk
}

if (chunk !== null) {
return this[kDecoder].write(chunk)
}

if (this._readableState.endEmitted && !this[kDecoderEnded]) {
this[kDecoderEnded] = true
const tail = this[kDecoder].end()
if (tail !== '') {
return tail
}
}

return chunk
}

/**
* Consumes and returns the body as a string.
*
Expand Down Expand Up @@ -324,10 +365,22 @@ class BodyReadable extends Readable {
* @returns {this}
*/
setEncoding (encoding) {
const state = this._readableState
const buffer = state.buffer.slice()
const bufferIndex = state.bufferIndex
const length = state.length
const ret = super.setEncoding(encoding)
if (Buffer.isEncoding(encoding)) {
this._readableState.encoding = encoding
const decoder = new StringDecoder(encoding)
this[kDecoder] = decoder
this[kEncoding] = decoder.encoding
state.buffer = buffer
state.bufferIndex = bufferIndex
state.length = length
state.decoder = null
state.encoding = null
}
return this
return ret
}
}

Expand Down Expand Up @@ -396,6 +449,11 @@ function consume (stream, type) {
reject(rState.errored ?? new TypeError('unusable'))
}
} else {
if (stream[kEncoding]) {
stream._readableState.decoder = null
stream._readableState.encoding = null
}

queueMicrotask(() => {
stream[kConsume] = {
type,
Expand Down Expand Up @@ -432,6 +490,10 @@ function consumeStart (consume) {
}

const { _readableState: state } = consume.stream
if (consume.stream[kEncoding]) {
state.decoder = null
state.encoding = null
}

if (state.bufferIndex) {
const start = state.bufferIndex
Expand All @@ -446,10 +508,10 @@ function consumeStart (consume) {
}

if (state.endEmitted) {
consumeEnd(this[kConsume], this._readableState.encoding)
consumeEnd(this[kConsume], this[kEncoding])
} else {
consume.stream.on('end', function () {
consumeEnd(this[kConsume], this._readableState.encoding)
consumeEnd(this[kConsume], this[kEncoding])
})
}

Expand Down Expand Up @@ -546,6 +608,9 @@ function consumeEnd (consume, encoding) {
* @returns {void}
*/
function consumePush (consume, chunk) {
if (typeof chunk === 'string') {
chunk = Buffer.from(chunk, consume.stream[kEncoding])
}
consume.length += chunk.length
consume.body.push(chunk)
}
Expand Down
33 changes: 33 additions & 0 deletions test/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,39 @@ test('request multibyte text with setEncoding', async (t) => {
await t.completed
})

test('request multibyte text with async iteration and setEncoding', async (t) => {
t = tspl(t, { plan: 1 })

const data = Buffer.from('abc傳def')
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.write(data.subarray(0, 5))
setTimeout(() => {
res.end(data.subarray(5))
}, 100)
})
after(server.close.bind(server))

server.listen(0, async () => {
const client = new Client(`http://localhost:${server.address().port}`)
after(client.destroy.bind(client))

const { body } = await client.request({
path: '/',
method: 'GET'
})
body.setEncoding('utf8')

let text = ''
for await (const chunk of body) {
text += chunk
}

t.deepStrictEqual(text, data.toString('utf8'))
})

await t.completed
})

test('request multibyte text with setEncoding', async (t) => {
t = tspl(t, { plan: 1 })

Expand Down
Loading