From a6b72b47aee1bf1c60f3a4a1a4ad298754af1f25 Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Tue, 23 Jun 2026 20:52:29 +0530 Subject: [PATCH 1/6] feat(execution): add execution observer foundation --- .changeset/execution-observer-foundation.md | 14 + apps/local/src/app.ts | 10 +- apps/local/src/main.ts | 5 +- .../core/api/src/server/execution-stack.ts | 13 +- .../execution/src/engine-observer.test.ts | 81 +++++ packages/core/execution/src/engine.ts | 282 ++++++++++++++++-- .../core/sdk/src/execution-observer.test.ts | 75 +++++ packages/core/sdk/src/execution-observer.ts | 162 ++++++++++ packages/core/sdk/src/executor.ts | 6 + packages/core/sdk/src/index.ts | 22 ++ packages/core/sdk/src/plugin.ts | 8 + 11 files changed, 655 insertions(+), 23 deletions(-) create mode 100644 .changeset/execution-observer-foundation.md create mode 100644 packages/core/execution/src/engine-observer.test.ts create mode 100644 packages/core/sdk/src/execution-observer.test.ts create mode 100644 packages/core/sdk/src/execution-observer.ts diff --git a/.changeset/execution-observer-foundation.md b/.changeset/execution-observer-foundation.md new file mode 100644 index 000000000..4a4098b4b --- /dev/null +++ b/.changeset/execution-observer-foundation.md @@ -0,0 +1,14 @@ +--- +"@executor-js/sdk": minor +"@executor-js/execution": minor +"@executor-js/api": minor +--- + +Add the execution-observer foundation. The execution engine now emits a typed +lifecycle stream (`ExecutionStarted`/`Finished`, `ToolCallStarted`/`Finished`, +`InteractionStarted`/`Resolved`), plugins can subscribe via the new +`plugin.runtime.executionObserver` hook, and `makeExecutionStack` composes every +registered plugin's observer onto the engine. Behaviour is unchanged when no +plugin observes, making this the opt-in seam the execution-history and +execution-metrics plugins build on. Also exposes `Executor.owner` and enriches +the `mcp.execute` span with the run id and trigger. diff --git a/apps/local/src/app.ts b/apps/local/src/app.ts index a9915daf5..09eb0e122 100644 --- a/apps/local/src/app.ts +++ b/apps/local/src/app.ts @@ -9,6 +9,7 @@ import { } from "@executor-js/api/server"; import { createExecutionEngine } from "@executor-js/execution"; import { makeQuickJsExecutor } from "@executor-js/runtime-quickjs"; +import { composeExecutionObservers, type AnyPlugin } from "@executor-js/sdk"; import { getExecutorBundle, type LocalExecutor } from "./executor"; import { makeLocalIdentityLayer } from "./identity"; @@ -50,12 +51,17 @@ import { ErrorCaptureLive } from "./observability"; * `HostConfig`/`CodeExecutorProvider` seams — the fixed executor is the whole * execution model. */ -const localFixedExecutionLayer = (executor: LocalExecutor): Layer.Layer => +const localFixedExecutionLayer = ( + executor: LocalExecutor, + plugins: readonly AnyPlugin[], +): Layer.Layer => Layer.succeed(FixedExecutionProvider)({ executor, engine: createExecutionEngine({ executor, codeExecutor: makeQuickJsExecutor(), + // Local bypasses makeExecutionStack, so compose plugin observers here. + observer: composeExecutionObservers(plugins, executor), }), // The executor IS its own plugin-extension map (`executor[pluginId]`); the // fixed middleware reads `executor[id]` to satisfy each plugin's @@ -86,7 +92,7 @@ export const makeLocalApiHandler = async (token: string): Promise ({ + id: "observer-test" as const, + storage: () => ({}), + staticSources: () => [], +})); + +const makeExecutor = () => createExecutor(makeTestConfig({ plugins: [emptyPlugin()] as const })); + +// A code executor that issues one builtin tool call (tools.search) and then +// completes, enough to exercise the full event sequence. +const toolCallingExecutor: CodeExecutor = { + execute: (_code, invoker) => + invoker + .invoke({ path: "search", args: { query: "anything" } }) + .pipe(Effect.as({ result: "ok", logs: [] } satisfies ExecuteResult), Effect.orDie), +}; + +const collectingObserver = () => { + const events: ExecutionEvent[] = []; + const observer: ExecutionObserver = { + handle: (event) => Effect.sync(() => void events.push(event)), + }; + return { events, observer }; +}; + +describe("execution engine observer emission", () => { + it.effect("emits the full lifecycle for a completed run with a tool call", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const { events, observer } = collectingObserver(); + const engine = createExecutionEngine({ + executor, + codeExecutor: toolCallingExecutor, + observer, + }); + + const result = yield* engine.executeWithPause("noop", { trigger: { kind: "test" } }); + expect(result.status).toBe("completed"); + + // First event opens the run, last closes it; tool calls land in between. + // `.find` with isTagged narrows each result, so the assertions read the + // typed fields directly via optional chaining (no conditional blocks). + const started = events.find((e) => Predicate.isTagged(e, "ExecutionStarted")); + const finished = events.find((e) => Predicate.isTagged(e, "ExecutionFinished")); + const toolStarted = events.find((e) => Predicate.isTagged(e, "ToolCallStarted")); + const toolFinished = events.find((e) => Predicate.isTagged(e, "ToolCallFinished")); + + expect(Predicate.isTagged(events[0], "ExecutionStarted")).toBe(true); + expect(Predicate.isTagged(events[events.length - 1], "ExecutionFinished")).toBe(true); + + expect(started?.trigger?.kind).toBe("test"); + expect(started?.owner.tenant).toBeDefined(); + expect(toolStarted).toBeDefined(); + expect(finished?.status).toBe("completed"); + + // Tool-call events share the run's executionId and carry the path. + expect(toolFinished?.path).toBe("search"); + expect(toolFinished?.status).toBe("completed"); + expect(toolFinished?.executionId).toBe(started?.executionId); + }), + ); + + it.effect("does nothing observable when no observer is configured", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const engine = createExecutionEngine({ executor, codeExecutor: toolCallingExecutor }); + const result = yield* engine.executeWithPause("noop"); + expect(result.status).toBe("completed"); + }), + ); +}); diff --git a/packages/core/execution/src/engine.ts b/packages/core/execution/src/engine.ts index 665548852..9597fb794 100644 --- a/packages/core/execution/src/engine.ts +++ b/packages/core/execution/src/engine.ts @@ -1,5 +1,5 @@ import { Deferred, Effect, Fiber, Predicate, Queue } from "effect"; -import type * as Cause from "effect/Cause"; +import * as Cause from "effect/Cause"; import * as Exit from "effect/Exit"; import type { @@ -8,6 +8,21 @@ import type { ElicitationResponse, ElicitationHandler, ElicitationContext, + ExecutionObserver, + ExecutionTrigger, +} from "@executor-js/sdk/core"; +import { + ExecutionId, + ExecutionInteractionId, + ExecutionToolCallId, + ExecutionFinished, + ExecutionStarted, + InteractionResolved, + InteractionStarted, + ToolCallFinished, + ToolCallStarted, + ignoreExecutionObserverErrors, + noopExecutionObserver, } from "@executor-js/sdk/core"; import { CodeExecutionError } from "@executor-js/codemode-core"; import type { CodeExecutor, ExecuteResult, SandboxToolInvoker } from "@executor-js/codemode-core"; @@ -30,6 +45,21 @@ export type ExecutionEngineConfig; readonly toolDiscoveryProvider?: ToolDiscoveryProvider; + /** Optional sink for execution lifecycle events. Defaults to a no-op, so a + * host that registers no observer pays only for constructing the events. */ + readonly observer?: ExecutionObserver; +}; + +/** Per-run options shared by both execute paths. */ +export type ExecutionRunOptions = { + /** What kicked off this run (e.g. `mcp.tool`, `api.http`); recorded on the + * `ExecutionStarted` event for downstream attribution. */ + readonly trigger?: ExecutionTrigger; +}; + +/** Options for the inline-elicitation execute path. */ +export type InlineExecutionOptions = ExecutionRunOptions & { + readonly onElicitation: ElicitationHandler; }; export type ExecutionResult = @@ -363,7 +393,7 @@ export type ExecutionEngine */ readonly execute: ( code: string, - options: { readonly onElicitation: ElicitationHandler }, + options: InlineExecutionOptions, ) => Effect.Effect; /** @@ -371,7 +401,10 @@ export type ExecutionEngine * Use this when the host doesn't support inline elicitation. * Returns either a completed result or a paused execution that can be resumed. */ - readonly executeWithPause: (code: string) => Effect.Effect; + readonly executeWithPause: ( + code: string, + options?: ExecutionRunOptions, + ) => Effect.Effect; /** * Resume a paused execution. Returns a completed result, a new pause, or @@ -423,6 +456,134 @@ export const createExecutionEngine = ExecutionId.make(`exec_${crypto.randomUUID()}`); + const makeToolCallId = (): ExecutionToolCallId => + ExecutionToolCallId.make(`tc_${crypto.randomUUID()}`); + const makeInteractionId = (): ExecutionInteractionId => + ExecutionInteractionId.make(`ix_${crypto.randomUUID()}`); + + const interactionStatusFromAction = (action: ResumeResponse["action"]) => + action === "accept" ? "accepted" : action === "decline" ? "declined" : "cancelled"; + + const finishFromResult = (executionId: ExecutionId, result: ExecuteResult): ExecutionFinished => + new ExecutionFinished({ + executionId, + owner, + status: result.error ? "failed" : "completed", + result: result.result, + error: result.error, + logs: result.logs, + completedAt: new Date(), + }); + + const finishFromCause = (executionId: ExecutionId, cause: Cause.Cause): ExecutionFinished => + new ExecutionFinished({ + executionId, + owner, + status: "failed", + error: Cause.pretty(cause), + completedAt: new Date(), + }); + + /** Wrap an invoker so each tool call brackets `ToolCallStarted`/`Finished`. */ + const observeToolCalls = ( + executionId: ExecutionId, + inner: SandboxToolInvoker, + ): SandboxToolInvoker => ({ + invoke: (call) => + Effect.gen(function* () { + const toolCallId = makeToolCallId(); + yield* emit( + new ToolCallStarted({ + executionId, + toolCallId, + owner, + path: call.path, + args: call.args, + startedAt: new Date(), + }), + ); + return yield* inner.invoke(call).pipe( + Effect.tap((result) => + emit( + new ToolCallFinished({ + executionId, + toolCallId, + owner, + path: call.path, + status: "completed", + result, + completedAt: new Date(), + }), + ), + ), + Effect.tapCause((cause) => + emit( + new ToolCallFinished({ + executionId, + toolCallId, + owner, + path: call.path, + status: "failed", + error: Cause.pretty(cause), + completedAt: new Date(), + }), + ), + ), + ); + }), + }); + + /** Wrap an inline elicitation handler so it brackets `InteractionStarted`/ + * `Resolved`. The pausable path emits these directly (see below). */ + const observeInlineElicitation = + (executionId: ExecutionId, handler: ElicitationHandler): ElicitationHandler => + (ctx) => + Effect.gen(function* () { + const interactionId = makeInteractionId(); + yield* emit( + new InteractionStarted({ + executionId, + interactionId, + owner, + context: ctx, + startedAt: new Date(), + }), + ); + return yield* handler(ctx).pipe( + Effect.tap((response) => + emit( + new InteractionResolved({ + executionId, + interactionId, + owner, + status: interactionStatusFromAction(response.action), + response, + completedAt: new Date(), + }), + ), + ), + Effect.tapCause((cause) => + emit( + new InteractionResolved({ + executionId, + interactionId, + owner, + status: "failed", + error: Cause.pretty(cause), + completedAt: new Date(), + }), + ), + ), + ); + }); + /** * Race a running fiber against the pause queue. Returns when either * the fiber completes or an elicitation handler fires (whichever @@ -455,12 +616,33 @@ export const createExecutionEngine = >(); @@ -476,6 +658,7 @@ export const createExecutionEngine = = { id, @@ -486,19 +669,59 @@ export const createExecutionEngine = + emit( + new InteractionResolved({ + executionId, + interactionId, + owner, + status: interactionStatusFromAction(response.action), + response, + completedAt: new Date(), + }), + ), + ), + Effect.tapCause((cause) => + emit( + new InteractionResolved({ + executionId, + interactionId, + owner, + status: "failed", + error: Cause.pretty(cause), + completedAt: new Date(), + }), + ), + ), + ); }); - const invoker = makeFullInvoker( - executor, - { onElicitation: elicitationHandler }, - toolDiscoveryProvider, + const invoker = observeToolCalls( + executionId, + makeFullInvoker(executor, { onElicitation: elicitationHandler }, toolDiscoveryProvider), ); fiber = yield* Effect.forkDetach( - codeExecutor.execute(code, invoker).pipe(Effect.withSpan("executor.code.exec")), + codeExecutor.execute(code, invoker).pipe( + Effect.withSpan("executor.code.exec"), + Effect.tap((result) => emit(finishFromResult(executionId, result))), + Effect.tapCause((cause) => emit(finishFromCause(executionId, cause))), + ), ); // When the fiber settles on its own (sandbox timeout, failure) while @@ -586,20 +809,41 @@ export const createExecutionEngine = emit(finishFromResult(executionId, result))), + Effect.tapCause((cause) => emit(finishFromCause(executionId, cause))), ); - return yield* codeExecutor.execute(code, invoker).pipe(Effect.withSpan("executor.code.exec")); }); return { diff --git a/packages/core/sdk/src/execution-observer.test.ts b/packages/core/sdk/src/execution-observer.test.ts new file mode 100644 index 000000000..d18e647ea --- /dev/null +++ b/packages/core/sdk/src/execution-observer.test.ts @@ -0,0 +1,75 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import { Subject, Tenant } from "./ids"; +import { ExecutionFinished, ExecutionId, composeExecutionObservers, definePlugin } from "./index"; + +const owner = { tenant: Tenant.make("tenant_test"), subject: Subject.make("subject_test") }; + +let calls: string[] = []; + +const observingPlugin = (id: string, asyncBoundary = false) => + definePlugin(() => ({ + id, + storage: () => ({}), + extension: () => ({ label: id }), + runtime: { + executionObserver: (self: { label: string }) => ({ + handle: () => + (asyncBoundary ? Effect.promise(() => Promise.resolve()) : Effect.void).pipe( + Effect.flatMap(() => Effect.sync(() => calls.push(self.label))), + ), + }), + }, + })); + +const failingPlugin = definePlugin(() => ({ + id: "failing" as const, + storage: () => ({}), + extension: () => ({ label: "failing" }), + runtime: { + executionObserver: () => ({ + handle: () => Effect.die("observer failed"), + }), + }, +})); + +const finishedEvent = () => + new ExecutionFinished({ + executionId: ExecutionId.make("exec_test"), + owner, + status: "completed", + result: "ok", + completedAt: new Date(), + }); + +describe("composeExecutionObservers", () => { + it.effect("dispatches observers sequentially and isolates failures", () => + Effect.gen(function* () { + calls = []; + const first = observingPlugin("first", true)(); + const failing = failingPlugin(); + const last = observingPlugin("last")(); + const observer = composeExecutionObservers([first, failing, last] as const, { + first: { label: "first" }, + failing: { label: "failing" }, + last: { label: "last" }, + }); + + // The failing plugin dies mid-dispatch; the others must still observe. + yield* observer.handle(finishedEvent()); + + expect(calls).toEqual(["first", "last"]); + }), + ); + + it.effect("returns a no-op observer when no plugin registers one", () => + Effect.gen(function* () { + const plain = definePlugin(() => ({ id: "plain", storage: () => ({}) }))(); + const observer = composeExecutionObservers([plain] as const, { plain: {} }); + + // No observer registered: handling is a no-op and never throws. + yield* observer.handle(finishedEvent()); + }), + ); +}); diff --git a/packages/core/sdk/src/execution-observer.ts b/packages/core/sdk/src/execution-observer.ts new file mode 100644 index 000000000..a8ba7fa81 --- /dev/null +++ b/packages/core/sdk/src/execution-observer.ts @@ -0,0 +1,162 @@ +import { Data, Effect, Schema } from "effect"; +import * as Cause from "effect/Cause"; + +import type { ElicitationContext, ElicitationResponse } from "./elicitation"; +import type { AnyPlugin, OwnerBinding, PluginExtensions } from "./plugin"; + +/* The execution-observer contract: a pull-model lifecycle stream the engine + * emits as it runs code. Plugins opt in via `plugin.runtime.executionObserver` + * and receive every event; sinks (history, metrics, tracing) are built on top. + * Emission is dispatched to all registered observers with per-observer error + * logging, so an observer can never break an execution. */ + +export const ExecutionId = Schema.String.pipe(Schema.brand("ExecutionId")); +export type ExecutionId = typeof ExecutionId.Type; + +export const ExecutionToolCallId = Schema.String.pipe(Schema.brand("ExecutionToolCallId")); +export type ExecutionToolCallId = typeof ExecutionToolCallId.Type; + +export const ExecutionInteractionId = Schema.String.pipe(Schema.brand("ExecutionInteractionId")); +export type ExecutionInteractionId = typeof ExecutionInteractionId.Type; + +export type ExecutionTrigger = { + readonly kind: string; + readonly metadata?: Record; +}; + +export type ToolCallStatus = "completed" | "failed"; +export type InteractionStatus = "accepted" | "declined" | "cancelled" | "failed"; +export type ExecutionStatus = "completed" | "failed"; + +export class ExecutionStarted extends Data.TaggedClass("ExecutionStarted")<{ + readonly executionId: ExecutionId; + readonly owner: OwnerBinding; + readonly code: string; + readonly trigger?: ExecutionTrigger; + readonly startedAt: Date; +}> {} + +export class ToolCallStarted extends Data.TaggedClass("ToolCallStarted")<{ + readonly executionId: ExecutionId; + readonly toolCallId: ExecutionToolCallId; + readonly owner: OwnerBinding; + readonly path: string; + readonly args: unknown; + readonly startedAt: Date; +}> {} + +export class ToolCallFinished extends Data.TaggedClass("ToolCallFinished")<{ + readonly executionId: ExecutionId; + readonly toolCallId: ExecutionToolCallId; + readonly owner: OwnerBinding; + readonly path: string; + readonly status: ToolCallStatus; + readonly result?: unknown; + readonly error?: string; + readonly completedAt: Date; +}> {} + +export class InteractionStarted extends Data.TaggedClass("InteractionStarted")<{ + readonly executionId: ExecutionId; + readonly interactionId: ExecutionInteractionId; + readonly owner: OwnerBinding; + readonly context: ElicitationContext; + readonly startedAt: Date; +}> {} + +export class InteractionResolved extends Data.TaggedClass("InteractionResolved")<{ + readonly executionId: ExecutionId; + readonly interactionId: ExecutionInteractionId; + readonly owner: OwnerBinding; + readonly status: InteractionStatus; + readonly response?: ElicitationResponse; + readonly error?: string; + readonly completedAt: Date; +}> {} + +export class ExecutionFinished extends Data.TaggedClass("ExecutionFinished")<{ + readonly executionId: ExecutionId; + readonly owner: OwnerBinding; + readonly status: ExecutionStatus; + readonly result?: unknown; + readonly error?: string; + readonly logs?: readonly string[]; + readonly completedAt: Date; +}> {} + +export type ExecutionEvent = + | ExecutionStarted + | ToolCallStarted + | ToolCallFinished + | InteractionStarted + | InteractionResolved + | ExecutionFinished; + +export interface ExecutionObserver { + readonly handle: (event: ExecutionEvent) => Effect.Effect; +} + +export const noopExecutionObserver: ExecutionObserver = { + handle: () => Effect.void, +}; + +const logExecutionObserverFailure = ( + event: ExecutionEvent, + cause: Cause.Cause, + pluginId?: string, +): Effect.Effect => + Effect.logWarning("execution observer failed", { + cause: Cause.pretty(cause), + event: event._tag, + ...(pluginId ? { pluginId } : {}), + }); + +/** Wrap an observer so any failure (defect or expected error) is logged, and + * an observer must never propagate into the execution it observes. */ +export const ignoreExecutionObserverErrors = ( + observer: ExecutionObserver, +): ExecutionObserver => ({ + handle: (event) => + observer + .handle(event) + .pipe(Effect.catchCause((cause) => logExecutionObserverFailure(event, cause))), +}); + +/** Collect every plugin's `runtime.executionObserver` and fan each event to + * all of them, logging per-observer errors. Returns the no-op observer when no + * plugin registers one, the common opt-out case. */ +export const composeExecutionObservers = ( + plugins: TPlugins, + extensions: PluginExtensions, +): ExecutionObserver => { + const observers: { readonly pluginId: string; readonly observer: ExecutionObserver }[] = + []; + + for (const plugin of plugins) { + const observer = plugin.runtime?.executionObserver?.( + extensions[plugin.id as keyof PluginExtensions] as never, + ); + if (observer) { + observers.push({ pluginId: plugin.id, observer }); + } + } + + if (observers.length === 0) { + return noopExecutionObserver; + } + + return { + handle: (event) => + Effect.forEach( + observers, + ({ pluginId, observer }) => + observer + .handle(event) + .pipe( + Effect.catchCause((cause) => logExecutionObserverFailure(event, cause, pluginId)), + ), + // Preserve plugin order so observers see deterministic sequencing. + { discard: true }, + ), + }; +}; diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 38e12a3c6..71d3920be 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -319,6 +319,11 @@ export type Executor = { ) => Effect.Effect; readonly close: () => Effect.Effect; + + /** The (tenant, subject) this executor acts as. Surfaced so engine-level + * machinery (e.g. execution observers) can attribute work to an owner + * without re-threading identity through every call site. */ + readonly owner: OwnerBinding; } & PluginExtensions; export interface ExecutorDb { @@ -3299,6 +3304,7 @@ export const createExecutor = => value as Executor; diff --git a/packages/core/sdk/src/index.ts b/packages/core/sdk/src/index.ts index a252febdf..b71169c47 100644 --- a/packages/core/sdk/src/index.ts +++ b/packages/core/sdk/src/index.ts @@ -168,6 +168,28 @@ export { type InvokeOptions, } from "./elicitation"; +// Execution observers: the engine lifecycle stream history/metrics/tracing build on. +export { + ExecutionId, + ExecutionToolCallId, + ExecutionInteractionId, + ExecutionStarted, + ToolCallStarted, + ToolCallFinished, + InteractionStarted, + InteractionResolved, + ExecutionFinished, + noopExecutionObserver, + ignoreExecutionObserverErrors, + composeExecutionObservers, + type ExecutionTrigger, + type ToolCallStatus, + type InteractionStatus, + type ExecutionStatus, + type ExecutionEvent, + type ExecutionObserver, +} from "./execution-observer"; + // Blob store — the plugin-facing contract (`BlobStore`/`PluginBlobStore`) // plus the platform-neutral backends (`makeFumaBlobStore` default, // `makeInMemoryBlobStore` for tests). Platform-specific backends live with diff --git a/packages/core/sdk/src/plugin.ts b/packages/core/sdk/src/plugin.ts index 157002ceb..a1a9f96f2 100644 --- a/packages/core/sdk/src/plugin.ts +++ b/packages/core/sdk/src/plugin.ts @@ -43,6 +43,7 @@ import type { IntegrationRemovalNotAllowedError, InvalidConnectionInputError, } from "./errors"; +import type { ExecutionObserver } from "./execution-observer"; import type { OAuthService } from "./oauth-client"; import type { CredentialProvider, ProviderEntry } from "./provider"; import type { PluginStorageConfig, PluginStorageFacade } from "./plugin-storage"; @@ -514,6 +515,13 @@ export interface PluginSpec< | ((ctx: PluginCtx) => readonly CredentialProvider[]) | ((ctx: PluginCtx) => Effect.Effect); + /** Runtime hooks invoked while the engine executes code. `executionObserver` + * receives this plugin's extension and returns an observer for every + * {@link ExecutionEvent}, the seam history/metrics sinks build on. */ + readonly runtime?: { + readonly executionObserver?: (self: NoInfer) => ExecutionObserver; + }; + readonly close?: () => Effect.Effect; } From a74821dbd96eec967be5d8782b8c9193c9c9622a Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Tue, 23 Jun 2026 20:54:38 +0530 Subject: [PATCH 2/6] fix(sdk): avoid manual observer tag logging --- packages/core/sdk/src/execution-observer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/sdk/src/execution-observer.ts b/packages/core/sdk/src/execution-observer.ts index a8ba7fa81..7ff38a9eb 100644 --- a/packages/core/sdk/src/execution-observer.ts +++ b/packages/core/sdk/src/execution-observer.ts @@ -107,7 +107,7 @@ const logExecutionObserverFailure = ( ): Effect.Effect => Effect.logWarning("execution observer failed", { cause: Cause.pretty(cause), - event: event._tag, + event: event.constructor.name, ...(pluginId ? { pluginId } : {}), }); From df4389d55d63650e1c1fb0118560ce1deece554c Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 12:35:22 +0530 Subject: [PATCH 3/6] fix(execution): address observer review feedback --- .../core/api/src/server/execution-stack.ts | 4 -- .../execution/src/engine-observer.test.ts | 64 ++++++++++++++++++- packages/core/sdk/src/execution-observer.ts | 15 ++++- 3 files changed, 75 insertions(+), 8 deletions(-) diff --git a/packages/core/api/src/server/execution-stack.ts b/packages/core/api/src/server/execution-stack.ts index dc9c11957..2fbe9f1f2 100644 --- a/packages/core/api/src/server/execution-stack.ts +++ b/packages/core/api/src/server/execution-stack.ts @@ -110,10 +110,6 @@ export const makeExecutionStack = < const codeExecutor = yield* CodeExecutorProvider; const { decorate } = yield* EngineDecorator; - // Compose every registered plugin's runtime.executionObserver, bound to the - // executor's own extensions. Resolves to a no-op when no plugin observes, - // so a stack with no history/metrics plugin is unaffected. `plugins()` is - // the erased AnyPlugin[]; the caller's TPlugins phantom recovers the tuple. const { plugins } = yield* PluginsProvider; // PluginsProvider erases the tuple to AnyPlugin[]; recover the caller's // TPlugins phantom so the extensions arg (the executor) lines up. diff --git a/packages/core/execution/src/engine-observer.test.ts b/packages/core/execution/src/engine-observer.test.ts index 924ec569e..03b0ad435 100644 --- a/packages/core/execution/src/engine-observer.test.ts +++ b/packages/core/execution/src/engine-observer.test.ts @@ -1,7 +1,7 @@ import { describe, expect, it } from "@effect/vitest"; -import { Effect, Predicate } from "effect"; +import { Effect, Predicate, Schema } from "effect"; -import { createExecutor, definePlugin } from "@executor-js/sdk"; +import { createExecutor, definePlugin, ElicitationResponse, tool } from "@executor-js/sdk"; import type { ExecutionEvent, ExecutionObserver } from "@executor-js/sdk"; import { makeTestConfig } from "@executor-js/sdk/testing"; import type { CodeExecutor, ExecuteResult } from "@executor-js/codemode-core"; @@ -14,8 +14,32 @@ const emptyPlugin = definePlugin(() => ({ staticSources: () => [], })); +const approvalPlugin = definePlugin(() => ({ + id: "observer-approval-test" as const, + storage: () => ({}), + staticSources: () => [ + { + id: "approval.ctl", + kind: "control" as const, + name: "Approval Ctl", + tools: [ + tool({ + name: "run", + description: "Requires approval", + annotations: { requiresApproval: true } as const, + inputSchema: Schema.toStandardSchemaV1(Schema.toStandardJSONSchemaV1(Schema.Struct({}))), + execute: () => Effect.succeed("ran"), + }), + ], + }, + ], +})); + const makeExecutor = () => createExecutor(makeTestConfig({ plugins: [emptyPlugin()] as const })); +const makeApprovalExecutor = () => + createExecutor(makeTestConfig({ plugins: [approvalPlugin()] as const })); + // A code executor that issues one builtin tool call (tools.search) and then // completes, enough to exercise the full event sequence. const toolCallingExecutor: CodeExecutor = { @@ -25,6 +49,13 @@ const toolCallingExecutor: CodeExecutor = { .pipe(Effect.as({ result: "ok", logs: [] } satisfies ExecuteResult), Effect.orDie), }; +const approvalCallingExecutor: CodeExecutor = { + execute: (_code, invoker) => + invoker + .invoke({ path: "approval.ctl.run", args: {} }) + .pipe(Effect.as({ result: "ok", logs: [] } satisfies ExecuteResult), Effect.orDie), +}; + const collectingObserver = () => { const events: ExecutionEvent[] = []; const observer: ExecutionObserver = { @@ -70,6 +101,35 @@ describe("execution engine observer emission", () => { }), ); + it.effect("emits inline interaction events when execute handles elicitation", () => + Effect.gen(function* () { + const executor = yield* makeApprovalExecutor(); + const { events, observer } = collectingObserver(); + const engine = createExecutionEngine({ + executor, + codeExecutor: approvalCallingExecutor, + observer, + }); + + const result = yield* engine.execute("noop", { + trigger: { kind: "test" }, + onElicitation: () => Effect.succeed(ElicitationResponse.make({ action: "accept" })), + }); + expect(result.result).toBe("ok"); + + const started = events.find((e) => Predicate.isTagged(e, "ExecutionStarted")); + const interactionStarted = events.find((e) => Predicate.isTagged(e, "InteractionStarted")); + const interactionResolved = events.find((e) => Predicate.isTagged(e, "InteractionResolved")); + + expect(interactionStarted?.executionId).toBe(started?.executionId); + expect(interactionResolved?.executionId).toBe(started?.executionId); + expect(interactionResolved?.interactionId).toBe(interactionStarted?.interactionId); + expect(interactionStarted?.context.request.message).toContain("approval"); + expect(interactionResolved?.status).toBe("accepted"); + expect(interactionResolved?.response?.action).toBe("accept"); + }), + ); + it.effect("does nothing observable when no observer is configured", () => Effect.gen(function* () { const executor = yield* makeExecutor(); diff --git a/packages/core/sdk/src/execution-observer.ts b/packages/core/sdk/src/execution-observer.ts index 7ff38a9eb..3cc397cb0 100644 --- a/packages/core/sdk/src/execution-observer.ts +++ b/packages/core/sdk/src/execution-observer.ts @@ -1,4 +1,4 @@ -import { Data, Effect, Schema } from "effect"; +import { Data, Effect, Predicate, Schema } from "effect"; import * as Cause from "effect/Cause"; import type { ElicitationContext, ElicitationResponse } from "./elicitation"; @@ -100,6 +100,17 @@ export const noopExecutionObserver: ExecutionObserver = { handle: () => Effect.void, }; +type ExecutionEventName = ExecutionEvent["_tag"]; + +const executionEventName = (event: ExecutionEvent): ExecutionEventName => { + if (Predicate.isTagged(event, "ExecutionStarted")) return "ExecutionStarted"; + if (Predicate.isTagged(event, "ToolCallStarted")) return "ToolCallStarted"; + if (Predicate.isTagged(event, "ToolCallFinished")) return "ToolCallFinished"; + if (Predicate.isTagged(event, "InteractionStarted")) return "InteractionStarted"; + if (Predicate.isTagged(event, "InteractionResolved")) return "InteractionResolved"; + return "ExecutionFinished"; +}; + const logExecutionObserverFailure = ( event: ExecutionEvent, cause: Cause.Cause, @@ -107,7 +118,7 @@ const logExecutionObserverFailure = ( ): Effect.Effect => Effect.logWarning("execution observer failed", { cause: Cause.pretty(cause), - event: event.constructor.name, + event: executionEventName(event), ...(pluginId ? { pluginId } : {}), }); From ee310564e745493de83f8cfdeccb3f3943fbd5e8 Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 15:10:05 +0530 Subject: [PATCH 4/6] fix(execution): preserve observer interrupts --- .../core/sdk/src/execution-observer.test.ts | 54 ++++++++++++++++++- packages/core/sdk/src/execution-observer.ts | 13 ++++- 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/packages/core/sdk/src/execution-observer.test.ts b/packages/core/sdk/src/execution-observer.test.ts index d18e647ea..f9cb6b571 100644 --- a/packages/core/sdk/src/execution-observer.test.ts +++ b/packages/core/sdk/src/execution-observer.test.ts @@ -1,8 +1,14 @@ import { describe, expect, it } from "@effect/vitest"; -import { Effect } from "effect"; +import { Cause, Effect, Exit } from "effect"; import { Subject, Tenant } from "./ids"; -import { ExecutionFinished, ExecutionId, composeExecutionObservers, definePlugin } from "./index"; +import { + ExecutionFinished, + ExecutionId, + composeExecutionObservers, + definePlugin, + ignoreExecutionObserverErrors, +} from "./index"; const owner = { tenant: Tenant.make("tenant_test"), subject: Subject.make("subject_test") }; @@ -34,6 +40,17 @@ const failingPlugin = definePlugin(() => ({ }, })); +const interruptingPlugin = definePlugin(() => ({ + id: "interrupting" as const, + storage: () => ({}), + extension: () => ({ label: "interrupting" }), + runtime: { + executionObserver: () => ({ + handle: () => Effect.interrupt, + }), + }, +})); + const finishedEvent = () => new ExecutionFinished({ executionId: ExecutionId.make("exec_test"), @@ -63,6 +80,39 @@ describe("composeExecutionObservers", () => { }), ); + it.effect("preserves interrupts from isolated observers", () => + Effect.gen(function* () { + const observer = ignoreExecutionObserverErrors({ + handle: () => Effect.interrupt, + }); + + const exit = yield* Effect.exit(observer.handle(finishedEvent())); + + expect(Exit.isFailure(exit)).toBe(true); + if (!Exit.isFailure(exit)) return; + expect(Cause.hasInterrupts(exit.cause)).toBe(true); + }), + ); + + it.effect("preserves interrupts from composed plugin observers", () => + Effect.gen(function* () { + calls = []; + const interrupting = interruptingPlugin(); + const last = observingPlugin("last")(); + const observer = composeExecutionObservers([interrupting, last] as const, { + interrupting: { label: "interrupting" }, + last: { label: "last" }, + }); + + const exit = yield* Effect.exit(observer.handle(finishedEvent())); + + expect(Exit.isFailure(exit)).toBe(true); + if (!Exit.isFailure(exit)) return; + expect(Cause.hasInterrupts(exit.cause)).toBe(true); + expect(calls).toEqual([]); + }), + ); + it.effect("returns a no-op observer when no plugin registers one", () => Effect.gen(function* () { const plain = definePlugin(() => ({ id: "plain", storage: () => ({}) }))(); diff --git a/packages/core/sdk/src/execution-observer.ts b/packages/core/sdk/src/execution-observer.ts index 3cc397cb0..8cf01d731 100644 --- a/packages/core/sdk/src/execution-observer.ts +++ b/packages/core/sdk/src/execution-observer.ts @@ -122,6 +122,15 @@ const logExecutionObserverFailure = ( ...(pluginId ? { pluginId } : {}), }); +const handleExecutionObserverCause = ( + event: ExecutionEvent, + cause: Cause.Cause, + pluginId?: string, +): Effect.Effect => + Cause.hasInterrupts(cause) + ? Effect.interrupt + : logExecutionObserverFailure(event, cause, pluginId); + /** Wrap an observer so any failure (defect or expected error) is logged, and * an observer must never propagate into the execution it observes. */ export const ignoreExecutionObserverErrors = ( @@ -130,7 +139,7 @@ export const ignoreExecutionObserverErrors = ( handle: (event) => observer .handle(event) - .pipe(Effect.catchCause((cause) => logExecutionObserverFailure(event, cause))), + .pipe(Effect.catchCause((cause) => handleExecutionObserverCause(event, cause))), }); /** Collect every plugin's `runtime.executionObserver` and fan each event to @@ -164,7 +173,7 @@ export const composeExecutionObservers = observer .handle(event) .pipe( - Effect.catchCause((cause) => logExecutionObserverFailure(event, cause, pluginId)), + Effect.catchCause((cause) => handleExecutionObserverCause(event, cause, pluginId)), ), // Preserve plugin order so observers see deterministic sequencing. { discard: true }, From 6744fcf0b1438850e0d5cfb59137a573870e0a92 Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 16:06:09 +0530 Subject: [PATCH 5/6] fix(execution): clarify observer wrapper contract --- packages/core/execution/src/engine.ts | 4 ++-- .../core/sdk/src/execution-observer.test.ts | 4 ++-- packages/core/sdk/src/execution-observer.ts | 18 ++++++------------ packages/core/sdk/src/index.ts | 2 +- 4 files changed, 11 insertions(+), 17 deletions(-) diff --git a/packages/core/execution/src/engine.ts b/packages/core/execution/src/engine.ts index 9597fb794..c6aba4946 100644 --- a/packages/core/execution/src/engine.ts +++ b/packages/core/execution/src/engine.ts @@ -21,8 +21,8 @@ import { InteractionStarted, ToolCallFinished, ToolCallStarted, - ignoreExecutionObserverErrors, noopExecutionObserver, + wrapExecutionObserver, } from "@executor-js/sdk/core"; import { CodeExecutionError } from "@executor-js/codemode-core"; import type { CodeExecutor, ExecuteResult, SandboxToolInvoker } from "@executor-js/codemode-core"; @@ -458,7 +458,7 @@ export const createExecutionEngine = { it.effect("preserves interrupts from isolated observers", () => Effect.gen(function* () { - const observer = ignoreExecutionObserverErrors({ + const observer = wrapExecutionObserver({ handle: () => Effect.interrupt, }); diff --git a/packages/core/sdk/src/execution-observer.ts b/packages/core/sdk/src/execution-observer.ts index 8cf01d731..e8a34bdbe 100644 --- a/packages/core/sdk/src/execution-observer.ts +++ b/packages/core/sdk/src/execution-observer.ts @@ -1,4 +1,4 @@ -import { Data, Effect, Predicate, Schema } from "effect"; +import { Data, Effect, Schema } from "effect"; import * as Cause from "effect/Cause"; import type { ElicitationContext, ElicitationResponse } from "./elicitation"; @@ -103,12 +103,8 @@ export const noopExecutionObserver: ExecutionObserver = { type ExecutionEventName = ExecutionEvent["_tag"]; const executionEventName = (event: ExecutionEvent): ExecutionEventName => { - if (Predicate.isTagged(event, "ExecutionStarted")) return "ExecutionStarted"; - if (Predicate.isTagged(event, "ToolCallStarted")) return "ToolCallStarted"; - if (Predicate.isTagged(event, "ToolCallFinished")) return "ToolCallFinished"; - if (Predicate.isTagged(event, "InteractionStarted")) return "InteractionStarted"; - if (Predicate.isTagged(event, "InteractionResolved")) return "InteractionResolved"; - return "ExecutionFinished"; + // oxlint-disable-next-line executor/no-manual-tag-check -- boundary: logging uses the Data.TaggedClass discriminant as an event name + return event._tag; }; const logExecutionObserverFailure = ( @@ -131,11 +127,9 @@ const handleExecutionObserverCause = ( ? Effect.interrupt : logExecutionObserverFailure(event, cause, pluginId); -/** Wrap an observer so any failure (defect or expected error) is logged, and - * an observer must never propagate into the execution it observes. */ -export const ignoreExecutionObserverErrors = ( - observer: ExecutionObserver, -): ExecutionObserver => ({ +/** Wrap an observer so non-interrupt failures are logged and isolated while + * interrupt causes still propagate as cancellation. */ +export const wrapExecutionObserver = (observer: ExecutionObserver): ExecutionObserver => ({ handle: (event) => observer .handle(event) diff --git a/packages/core/sdk/src/index.ts b/packages/core/sdk/src/index.ts index b71169c47..da507ddff 100644 --- a/packages/core/sdk/src/index.ts +++ b/packages/core/sdk/src/index.ts @@ -180,8 +180,8 @@ export { InteractionResolved, ExecutionFinished, noopExecutionObserver, - ignoreExecutionObserverErrors, composeExecutionObservers, + wrapExecutionObserver, type ExecutionTrigger, type ToolCallStatus, type InteractionStatus, From 3fb7bab6b12563c8650590eef53d27c5b07a3444 Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 16:32:38 +0530 Subject: [PATCH 6/6] refactor(execution): scope observer dispatch --- packages/core/execution/src/engine.ts | 48 ++++++++++--------- .../core/sdk/src/execution-observer.test.ts | 30 +++++++++--- packages/core/sdk/src/execution-observer.ts | 36 ++++++++++---- packages/core/sdk/src/index.ts | 3 +- 4 files changed, 77 insertions(+), 40 deletions(-) diff --git a/packages/core/execution/src/engine.ts b/packages/core/execution/src/engine.ts index c6aba4946..f44647970 100644 --- a/packages/core/execution/src/engine.ts +++ b/packages/core/execution/src/engine.ts @@ -21,8 +21,9 @@ import { InteractionStarted, ToolCallFinished, ToolCallStarted, + emitExecutionEvent, noopExecutionObserver, - wrapExecutionObserver, + withExecutionObserver, } from "@executor-js/sdk/core"; import { CodeExecutionError } from "@executor-js/codemode-core"; import type { CodeExecutor, ExecuteResult, SandboxToolInvoker } from "@executor-js/codemode-core"; @@ -456,10 +457,9 @@ export const createExecutionEngine = ExecutionId.make(`exec_${crypto.randomUUID()}`); @@ -499,7 +499,7 @@ export const createExecutionEngine = Effect.gen(function* () { const toolCallId = makeToolCallId(); - yield* emit( + yield* emitExecutionEvent( new ToolCallStarted({ executionId, toolCallId, @@ -511,7 +511,7 @@ export const createExecutionEngine = - emit( + emitExecutionEvent( new ToolCallFinished({ executionId, toolCallId, @@ -524,7 +524,7 @@ export const createExecutionEngine = - emit( + emitExecutionEvent( new ToolCallFinished({ executionId, toolCallId, @@ -547,7 +547,7 @@ export const createExecutionEngine = Effect.gen(function* () { const interactionId = makeInteractionId(); - yield* emit( + yield* emitExecutionEvent( new InteractionStarted({ executionId, interactionId, @@ -558,7 +558,7 @@ export const createExecutionEngine = - emit( + emitExecutionEvent( new InteractionResolved({ executionId, interactionId, @@ -570,7 +570,7 @@ export const createExecutionEngine = - emit( + emitExecutionEvent( new InteractionResolved({ executionId, interactionId, @@ -629,7 +629,7 @@ export const createExecutionEngine = - emit( + emitExecutionEvent( new InteractionResolved({ executionId, interactionId, @@ -698,7 +698,7 @@ export const createExecutionEngine = - emit( + emitExecutionEvent( new InteractionResolved({ executionId, interactionId, @@ -719,8 +719,8 @@ export const createExecutionEngine = emit(finishFromResult(executionId, result))), - Effect.tapCause((cause) => emit(finishFromCause(executionId, cause))), + Effect.tap((result) => emitExecutionEvent(finishFromResult(executionId, result))), + Effect.tapCause((cause) => emitExecutionEvent(finishFromCause(executionId, cause))), ), ); @@ -817,7 +817,7 @@ export const createExecutionEngine = emit(finishFromResult(executionId, result))), - Effect.tapCause((cause) => emit(finishFromCause(executionId, cause))), + Effect.tap((result) => emitExecutionEvent(finishFromResult(executionId, result))), + Effect.tapCause((cause) => emitExecutionEvent(finishFromCause(executionId, cause))), ); }); return { - execute: runInlineExecution, - executeWithPause: startPausableExecution, - resume: resumeExecution, + execute: (code, options) => runInlineExecution(code, options).pipe(observeExecution), + executeWithPause: (code, options) => + startPausableExecution(code, options).pipe(observeExecution), + resume: (executionId, response) => + resumeExecution(executionId, response).pipe(observeExecution), getPausedExecution: (executionId) => Effect.sync(() => pausedExecutions.get(executionId) ?? null), getDescription: buildExecuteDescription(executor), diff --git a/packages/core/sdk/src/execution-observer.test.ts b/packages/core/sdk/src/execution-observer.test.ts index 9483fb3ab..c23612bbf 100644 --- a/packages/core/sdk/src/execution-observer.test.ts +++ b/packages/core/sdk/src/execution-observer.test.ts @@ -7,7 +7,8 @@ import { ExecutionId, composeExecutionObservers, definePlugin, - wrapExecutionObserver, + emitExecutionEvent, + withExecutionObserver, } from "./index"; const owner = { tenant: Tenant.make("tenant_test"), subject: Subject.make("subject_test") }; @@ -61,6 +62,19 @@ const finishedEvent = () => }); describe("composeExecutionObservers", () => { + it.effect("emits events to the scoped observer", () => + Effect.gen(function* () { + calls = []; + yield* emitExecutionEvent(finishedEvent()).pipe( + withExecutionObserver({ + handle: () => Effect.sync(() => calls.push("observed")), + }), + ); + + expect(calls).toEqual(["observed"]); + }), + ); + it.effect("dispatches observers sequentially and isolates failures", () => Effect.gen(function* () { calls = []; @@ -80,13 +94,15 @@ describe("composeExecutionObservers", () => { }), ); - it.effect("preserves interrupts from isolated observers", () => + it.effect("preserves interrupts from scoped observers", () => Effect.gen(function* () { - const observer = wrapExecutionObserver({ - handle: () => Effect.interrupt, - }); - - const exit = yield* Effect.exit(observer.handle(finishedEvent())); + const exit = yield* Effect.exit( + emitExecutionEvent(finishedEvent()).pipe( + withExecutionObserver({ + handle: () => Effect.interrupt, + }), + ), + ); expect(Exit.isFailure(exit)).toBe(true); if (!Exit.isFailure(exit)) return; diff --git a/packages/core/sdk/src/execution-observer.ts b/packages/core/sdk/src/execution-observer.ts index e8a34bdbe..93ec67690 100644 --- a/packages/core/sdk/src/execution-observer.ts +++ b/packages/core/sdk/src/execution-observer.ts @@ -1,4 +1,4 @@ -import { Data, Effect, Schema } from "effect"; +import { Context, Data, Effect, Schema } from "effect"; import * as Cause from "effect/Cause"; import type { ElicitationContext, ElicitationResponse } from "./elicitation"; @@ -100,6 +100,11 @@ export const noopExecutionObserver: ExecutionObserver = { handle: () => Effect.void, }; +const currentExecutionObserver = Context.Reference( + "@executor-js/sdk/ExecutionObserver", + { defaultValue: () => noopExecutionObserver }, +); + type ExecutionEventName = ExecutionEvent["_tag"]; const executionEventName = (event: ExecutionEvent): ExecutionEventName => { @@ -127,14 +132,27 @@ const handleExecutionObserverCause = ( ? Effect.interrupt : logExecutionObserverFailure(event, cause, pluginId); -/** Wrap an observer so non-interrupt failures are logged and isolated while - * interrupt causes still propagate as cancellation. */ -export const wrapExecutionObserver = (observer: ExecutionObserver): ExecutionObserver => ({ - handle: (event) => - observer - .handle(event) - .pipe(Effect.catchCause((cause) => handleExecutionObserverCause(event, cause))), -}); +/** Emit an execution lifecycle event to the observer installed in the current + * Effect context. Defaults to a no-op when no observer is installed. */ +export const emitExecutionEvent = (event: ExecutionEvent): Effect.Effect => + Effect.service(currentExecutionObserver).pipe( + Effect.flatMap((observer) => observer.handle(event)), + ); + +/** Install an execution observer for the scoped Effect. Non-interrupt observer + * failures are logged and isolated; interrupt causes still propagate as + * cancellation. */ +export const withExecutionObserver = + (observer: ExecutionObserver) => + (effect: Effect.Effect): Effect.Effect => + effect.pipe( + Effect.provideService(currentExecutionObserver, { + handle: (event) => + observer + .handle(event) + .pipe(Effect.catchCause((cause) => handleExecutionObserverCause(event, cause))), + }), + ); /** Collect every plugin's `runtime.executionObserver` and fan each event to * all of them, logging per-observer errors. Returns the no-op observer when no diff --git a/packages/core/sdk/src/index.ts b/packages/core/sdk/src/index.ts index da507ddff..7700c34be 100644 --- a/packages/core/sdk/src/index.ts +++ b/packages/core/sdk/src/index.ts @@ -181,7 +181,8 @@ export { ExecutionFinished, noopExecutionObserver, composeExecutionObservers, - wrapExecutionObserver, + emitExecutionEvent, + withExecutionObserver, type ExecutionTrigger, type ToolCallStatus, type InteractionStatus,