From 8a5956108332133dd44264c15839ca7dd4c72414 Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Mon, 1 Jun 2026 06:45:55 -0500 Subject: [PATCH 1/3] feat: add support for Server-Sent Events with eventStream method and ServerSentEvent class --- src/Http/Response.php | 44 ++++++++++++++ src/Http/ServerSentEvent.php | 61 ++++++++++++++++++++ src/Testing/Concerns/InteractWithHeaders.php | 14 +++++ tests/Feature/RequestTest.php | 60 +++++++++++++++++++ tests/Unit/Http/ResponseTest.php | 40 +++++++++++++ 5 files changed, 219 insertions(+) create mode 100644 src/Http/ServerSentEvent.php diff --git a/src/Http/Response.php b/src/Http/Response.php index 4862b940..e08c251b 100644 --- a/src/Http/Response.php +++ b/src/Http/Response.php @@ -4,6 +4,7 @@ namespace Phenix\Http; +use Amp\ByteStream\ReadableIterableStream; use Amp\ByteStream\ReadableStream; use Amp\Http\Server\Response as ServerResponse; use Amp\Http\Server\Trailers; @@ -75,6 +76,27 @@ public function redirect(string $location, HttpStatus $status = HttpStatus::FOUN return $this; } + /** + * @param iterable $events + */ + public function eventStream( + iterable $events, + HttpStatus $status = HttpStatus::OK, + array $headers = [] + ): self { + $this->body = new ReadableIterableStream($this->formatEventStream($events)); + $this->status = $status; + $this->headers = [ + ...[ + 'content-type' => 'text/event-stream; charset=utf-8', + 'cache-control' => 'no-cache', + ], + ...$headers, + ]; + + return $this; + } + public function send(): ServerResponse { return new ServerResponse( @@ -84,4 +106,26 @@ public function send(): ServerResponse $this->trailers ); } + + /** + * @param iterable $events + * @return iterable + */ + protected function formatEventStream(iterable $events): iterable + { + foreach ($events as $event) { + yield $event instanceof ServerSentEvent + ? $event->toString() + : $this->normalizeEventFrame($event); + } + } + + protected function normalizeEventFrame(string $event): string + { + if (str_ends_with($event, "\n\n") || str_ends_with($event, "\r\n\r\n")) { + return $event; + } + + return rtrim($event, "\r\n") . "\n\n"; + } } diff --git a/src/Http/ServerSentEvent.php b/src/Http/ServerSentEvent.php new file mode 100644 index 00000000..e29f5155 --- /dev/null +++ b/src/Http/ServerSentEvent.php @@ -0,0 +1,61 @@ +toString(); + } + + public function toString(): string + { + $lines = []; + + if ($this->comment !== null) { + foreach ($this->lines($this->comment) as $line) { + $lines[] = ": {$line}"; + } + } + + if ($this->event !== null) { + $lines[] = "event: {$this->event}"; + } + + if ($this->id !== null) { + $lines[] = "id: {$this->id}"; + } + + if ($this->retry !== null) { + $lines[] = "retry: {$this->retry}"; + } + + foreach ($this->lines($this->data) as $line) { + $lines[] = "data: {$line}"; + } + + return implode("\n", $lines) . "\n\n"; + } + + /** + * @return array + */ + private function lines(string $value): array + { + return explode("\n", str_replace(["\r\n", "\r"], "\n", $value)); + } +} diff --git a/src/Testing/Concerns/InteractWithHeaders.php b/src/Testing/Concerns/InteractWithHeaders.php index 354d5e3b..e3776da3 100644 --- a/src/Testing/Concerns/InteractWithHeaders.php +++ b/src/Testing/Concerns/InteractWithHeaders.php @@ -87,4 +87,18 @@ public function assertIsPlainText(): self return $this; } + + public function assertIsEventStream(): self + { + $contentType = $this->response->getHeader('content-type'); + + Assert::assertNotNull($contentType, $this->missingHeaderMessage); + Assert::assertStringContainsString( + 'text/event-stream', + $contentType, + 'Response does not have an event stream content type.' + ); + + return $this; + } } diff --git a/tests/Feature/RequestTest.php b/tests/Feature/RequestTest.php index 24e3a351..013ca99d 100644 --- a/tests/Feature/RequestTest.php +++ b/tests/Feature/RequestTest.php @@ -13,6 +13,7 @@ use Phenix\Http\Constants\HttpStatus; use Phenix\Http\Request; use Phenix\Http\Response; +use Phenix\Http\ServerSentEvent; use Phenix\Testing\TestResponse; use Tests\Feature\Requests\LimitedBodyRequest; use Tests\Feature\Requests\LimitedStreamedRequest; @@ -219,6 +220,65 @@ ->assertBodyContains('plain text'); }); +it('can send server sent events', function (): void { + Route::get('/events', function (): Response { + return response()->eventStream((function (): iterable { + for ($index = 0; $index < 3; $index++) { + yield new ServerSentEvent( + data: "Event {$index}", + event: 'notification' + ); + } + })()); + }); + + $this->app->run(); + + $this->get('/events') + ->assertOk() + ->assertIsEventStream() + ->assertBodyContains([ + "event: notification\ndata: Event 0\n\n", + "event: notification\ndata: Event 2\n\n", + ]); +}); + +it('can resume server sent events using last event id', function (): void { + Route::get('/resumable-events', function (Request $request): Response { + $lastEventId = $request->getHeader('Last-Event-ID'); + $start = $lastEventId === null ? 0 : ((int) str_replace('event-', '', $lastEventId)) + 1; + + return response()->eventStream((function () use ($start): iterable { + for ($index = $start; $index < 4; $index++) { + yield new ServerSentEvent( + data: "Event {$index}", + event: 'notification', + id: "event-{$index}" + ); + } + })()); + }); + + $this->app->run(); + + $response = $this->get('/resumable-events', [ + 'Last-Event-ID' => 'event-1', + ]); + + $response + ->assertOk() + ->assertIsEventStream() + ->assertBodyContains([ + "id: event-2\n", + "data: Event 2\n\n", + "id: event-3\n", + "data: Event 3\n\n", + ]); + + expect($response->getBody())->not->toContain('id: event-0') + ->and($response->getBody())->not->toContain('id: event-1'); +}); + it('can assert json contains', function (): void { Route::get('/api/user', function (): Response { return response()->json([ diff --git a/tests/Unit/Http/ResponseTest.php b/tests/Unit/Http/ResponseTest.php index 051ec239..908d65e3 100644 --- a/tests/Unit/Http/ResponseTest.php +++ b/tests/Unit/Http/ResponseTest.php @@ -5,6 +5,7 @@ use Amp\Http\Server\Response as ServerResponse; use Phenix\Data\Collection; use Phenix\Http\Response; +use Phenix\Http\ServerSentEvent; it('responds plain text', function () { $response = new Response(); @@ -41,3 +42,42 @@ expect($serverResponse)->toBeInstanceOf(ServerResponse::class); expect($serverResponse->getBody()->read())->toContain(json_encode($data)); }); + +it('responds event streams from raw frames', function () { + $response = new Response(); + + $serverResponse = $response->eventStream([ + "event: notification\ndata: Event 0", + "event: notification\ndata: Event 1\n\n", + ])->send(); + + expect($serverResponse)->toBeInstanceOf(ServerResponse::class); + expect($serverResponse->getHeader('Content-Type'))->toBe('text/event-stream; charset=utf-8'); + expect($serverResponse->getHeader('Cache-Control'))->toBe('no-cache'); + expect($serverResponse->getBody()->read())->toBe("event: notification\ndata: Event 0\n\n"); + expect($serverResponse->getBody()->read())->toBe("event: notification\ndata: Event 1\n\n"); +}); + +it('responds event streams from server sent events', function () { + $response = new Response(); + + $serverResponse = $response->eventStream([ + new ServerSentEvent( + data: "First line\nSecond line", + event: 'notification', + id: 'event-1', + retry: 500, + comment: 'initial event' + ), + ])->send(); + + expect($serverResponse)->toBeInstanceOf(ServerResponse::class); + expect($serverResponse->getBody()->read())->toBe( + ": initial event\n" + . "event: notification\n" + . "id: event-1\n" + . "retry: 500\n" + . "data: First line\n" + . "data: Second line\n\n" + ); +}); From 7bbee5cec03f88ba95453ed3c64010051f602ec3 Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Mon, 1 Jun 2026 08:06:11 -0500 Subject: [PATCH 2/3] feat: enhance eventStream method to accept closures and add resolveEventStream for iterable validation --- src/Http/Response.php | 27 ++++++++++++++++++++++++--- tests/Feature/RequestTest.php | 8 ++++---- tests/Unit/Http/ResponseTest.php | 26 ++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/Http/Response.php b/src/Http/Response.php index e08c251b..a4bb266b 100644 --- a/src/Http/Response.php +++ b/src/Http/Response.php @@ -8,6 +8,8 @@ use Amp\ByteStream\ReadableStream; use Amp\Http\Server\Response as ServerResponse; use Amp\Http\Server\Trailers; +use Closure; +use InvalidArgumentException; use Phenix\Contracts\Arrayable; use Phenix\Facades\View; use Phenix\Http\Constants\HttpStatus; @@ -77,14 +79,14 @@ public function redirect(string $location, HttpStatus $status = HttpStatus::FOUN } /** - * @param iterable $events + * @param Closure(): iterable|iterable $events */ public function eventStream( - iterable $events, + Closure|iterable $events, HttpStatus $status = HttpStatus::OK, array $headers = [] ): self { - $this->body = new ReadableIterableStream($this->formatEventStream($events)); + $this->body = new ReadableIterableStream($this->formatEventStream($this->resolveEventStream($events))); $this->status = $status; $this->headers = [ ...[ @@ -107,6 +109,25 @@ public function send(): ServerResponse ); } + /** + * @param Closure(): iterable|iterable $events + * @return iterable + */ + protected function resolveEventStream(Closure|iterable $events): iterable + { + if (! $events instanceof Closure) { + return $events; + } + + $events = $events(); + + if (! is_iterable($events)) { + throw new InvalidArgumentException('The event stream closure must return an iterable.'); + } + + return $events; + } + /** * @param iterable $events * @return iterable diff --git a/tests/Feature/RequestTest.php b/tests/Feature/RequestTest.php index 013ca99d..926e17dd 100644 --- a/tests/Feature/RequestTest.php +++ b/tests/Feature/RequestTest.php @@ -222,14 +222,14 @@ it('can send server sent events', function (): void { Route::get('/events', function (): Response { - return response()->eventStream((function (): iterable { + return response()->eventStream(function (): iterable { for ($index = 0; $index < 3; $index++) { yield new ServerSentEvent( data: "Event {$index}", event: 'notification' ); } - })()); + }); }); $this->app->run(); @@ -248,7 +248,7 @@ $lastEventId = $request->getHeader('Last-Event-ID'); $start = $lastEventId === null ? 0 : ((int) str_replace('event-', '', $lastEventId)) + 1; - return response()->eventStream((function () use ($start): iterable { + return response()->eventStream(function () use ($start): iterable { for ($index = $start; $index < 4; $index++) { yield new ServerSentEvent( data: "Event {$index}", @@ -256,7 +256,7 @@ id: "event-{$index}" ); } - })()); + }); }); $this->app->run(); diff --git a/tests/Unit/Http/ResponseTest.php b/tests/Unit/Http/ResponseTest.php index 908d65e3..c8be0e5e 100644 --- a/tests/Unit/Http/ResponseTest.php +++ b/tests/Unit/Http/ResponseTest.php @@ -81,3 +81,29 @@ . "data: Second line\n\n" ); }); + +it('responds event streams from closures', function () { + $response = new Response(); + + $serverResponse = $response->eventStream(function (): iterable { + yield new ServerSentEvent( + data: 'Event 0', + event: 'notification', + id: 'event-0' + ); + })->send(); + + expect($serverResponse)->toBeInstanceOf(ServerResponse::class); + expect($serverResponse->getBody()->read())->toBe( + "event: notification\n" + . "id: event-0\n" + . "data: Event 0\n\n" + ); +}); + +it('rejects event stream closures that do not return iterables', function () { + $response = new Response(); + + expect(fn (): Response => $response->eventStream(fn (): string => 'invalid')) + ->toThrow(InvalidArgumentException::class, 'The event stream closure must return an iterable.'); +}); From 33e18521bfdef2bc3ce347bd884ef99fd8066bdd Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Mon, 1 Jun 2026 08:09:13 -0500 Subject: [PATCH 3/3] fix: cast ServerSentEvent to string in formatEventStream method --- src/Http/Response.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Http/Response.php b/src/Http/Response.php index a4bb266b..d81a67c2 100644 --- a/src/Http/Response.php +++ b/src/Http/Response.php @@ -136,7 +136,7 @@ protected function formatEventStream(iterable $events): iterable { foreach ($events as $event) { yield $event instanceof ServerSentEvent - ? $event->toString() + ? (string) $event : $this->normalizeEventFrame($event); } }