Skip to content

Implement multipart for netty-future, netty-cats & netty-zio#5315

Open
ajozwik wants to merge 1 commit into
softwaremill:masterfrom
ajozwik:netty-multipart-future-4851
Open

Implement multipart for netty-future, netty-cats & netty-zio#5315
ajozwik wants to merge 1 commit into
softwaremill:masterfrom
ajozwik:netty-multipart-future-4851

Conversation

@ajozwik

@ajozwik ajozwik commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Proposal of multipart implementation for netty - see #4851 - only multipart.

Implementation based on nettyServerSync implementation - but without OxStreams.

According to Note: Netty's multipart decoder does not expose other part headers, nor other disposition params. the test with netty should ignoring part headers - partOtherHeaderSupport = false

Issue for support headers has been requested: FileUpload or HttpData should contain HTTP-Headers

What is missing?

  • Part headers if netty api expose it
  • Configuration for multipart temp directory and minimum disk size
  • Streaming - but it is not part of multipart.

For sync implementation multipart depends on OxStreams - what is not needed and can be moved to other submodule.

@ajozwik ajozwik force-pushed the netty-multipart-future-4851 branch 3 times, most recently from 78ea9a8 to 8d9e390 Compare June 15, 2026 15:54
@ajozwik

ajozwik commented Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

PR is ready. It can contains more changes - just on master, because I squashed commits with just merged (to master) changes ....

Main part is based on MonadSubscriber - implementation of Subscriber - who subscribe to Netty publisher. In implementation java.util.concurrent.Condition is used.

@ajozwik ajozwik force-pushed the netty-multipart-future-4851 branch from 3f3a735 to 265995f Compare June 16, 2026 11:55
@ajozwik ajozwik force-pushed the netty-multipart-future-4851 branch from 265995f to 03fedda Compare June 16, 2026 15:02
@adamw

adamw commented Jun 16, 2026

Copy link
Copy Markdown
Member

Code Review

Overview

This PR adds multipart request body support to the three async Netty backends (netty-future, netty-cats, netty-zio), which previously threw UnsupportedOperationException("Multipart requests are not supported"). It does so largely by extracting and generalizing the multipart logic that already existed in the sync backend (which depended on OxStreams) into a reusable, effect-polymorphic hierarchy:

Request side (decoding multipart):

  • NettyRequestBodyWithMultipart — holds the Netty HttpDataFactory (temp-dir / min-disk-size config).
  • NettyRequestBodyWithMultipartF[F, S] — generic publisherToMultipart + writeBytesToFile implemented via CompletableFuture / scala-java8-compat. Each backend supplies a listMonadToMonadOfList (Future.sequence / _.sequence / ZIO.collectAll).
  • NettyStreamingRequestBody changed from a trait to an abstract class extending the new F base; sync now extends the non-F base.

Response side (encoding multipart):

  • NettyToResponseBodyCommon / NettyToResponseBodyBase — pulls the fromRawValue + MultipartEntityBuilder logic (previously duplicated in NettySyncToResponseBody) into shared traits used by all backends.

Tests: Enables multipart = true for the Future backend and flips partOtherHeaderSupport = false for all three (Netty's decoder doesn't expose per-part headers — tracked upstream in netty/netty#15445).

The de-duplication is the strongest part of this PR — the response-body multipart logic was genuinely copy-pasted before, and consolidating it is a clear win.

🔴 Significant concerns

1. Lost-wakeup deadlock / NPE race in publisherToMultipart

In NettyRequestBodyWithMultipartF:

nettyRequest.subscribe(subscriber)   // runs on a Netty event-loop thread
locked {
  condition.await()
  result.get()
}

where the subscriber calls release()locked(condition.signal()) on onComplete/onError.

There's a classic lost-wakeup race: subscribe(...) may run the subscriber to completion on the Netty I/O thread before the calling thread acquires the lock and reaches condition.await(). If signal() fires before anyone is await()-ing, the wakeup is lost and the calling thread blocks forever.

Even when it doesn't deadlock, condition.await() is not guarded by a while (!done) loop, so a spurious wakeup can return early, after which result.get() returns null → NPE. Conditions should always be awaited in a loop that re-checks a state predicate.

Suggested: set a done flag (or check result.get() != null) under the lock and loop: while (result.get() == null) condition.await(), with the subscriber setting the result before signalling, under the same lock.

2. Blocking a thread in async/non-blocking backends

Because NettyStreamingRequestBody (cats & zio) now extends NettyRequestBodyWithMultipartF, this synchronous condition.await() blocks the thread that invokes publisherToMultipart for cats-effect and ZIO as well as Future. Blocking a compute/event-loop thread defeats the purpose of those runtimes and risks thread-pool starvation under load.

The idiomatic approach is to thread the result through the effect system instead of a lock+condition — e.g. complete a Deferred (cats), a Promise (zio), or a scala.concurrent.Promise (Future) from the subscriber callbacks, and return that. That also makes concern #1 disappear.

It's worth confirming whether the passing tests exercise concurrency at all, or just sequential single-request cases that happen to dodge the race.

🟡 Minor issues

  • Formatting likely fails scalafmt CI. NettyToResponseBody.scala has non-standard reformatting: the Scaladoc block was re-aligned from * to *, and method parameters use a deep continuation indent that looks like IntelliJ's default rather than the repo's scalafmt. Worth running sbt scalafmtAll before merge.
  • Stray blank lines introduced in NettySyncRequestBody.scala (double blank after the extends clause) and NettySyncToResponseBody.scala.
  • scala.compat.java8.FutureConverters is deprecated in favor of scala.jdk.FutureConverters on 2.13; since netty-future still cross-builds 2.12 this is probably deliberate — a short comment would help.
  • Unrelated cleanups bundled in: URLURI, NioEventLoopGroupEventLoopGroup, SAM conversions for Sleeper, removed unused imports, the new testSettings warning suppression. All reasonable, but they enlarge the diff and mix concerns with the feature.
  • part.fileName.get in rawValueToContentBody (moved, not new) will throw if a ByteArrayBody/InputStreamBody part has no filename. Pre-existing behavior carried over from sync, so not a regression, but worth hardening since this path now serves more backends.

✅ Strengths

  • Removes real duplication (response multipart logic was copy-pasted in sync).
  • Cleanly generalizes over effect types via a single listMonadToMonadOfList hook per backend.
  • maxBytes enforcement and decoder cleanup (RawValue.cleanup = decoder.destroy(), httpContent.release() after decoder.offer) are handled — relevant given this repo's history of Netty ByteBuf leaks.
  • Honest scoping in the description: part headers, temp-dir config, and streaming are explicitly called out as not-yet-done.

Recommendation

The feature direction and refactoring are good, but the lock/condition.await() mechanism in NettyRequestBodyWithMultipartF (#1 + #2) is the crux: it has a real lost-wakeup/NPE race and blocks async runtimes. Reworking it to complete a promise/deferred from the subscriber (rather than blocking on a condition) would address both, plus a scalafmt pass. As a follow-up, the sync backend's multipart need no longer depend on OxStreams and could move to a shared module.

🤖 Generated with Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants