From 0d0215de660b25398381f8fcde4be21c95557b1e Mon Sep 17 00:00:00 2001 From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com> Date: Mon, 22 Jun 2026 09:09:10 -0700 Subject: [PATCH] Add persistent code execution cells --- apps/cloud/src/api/protected.test.ts | 10 + apps/cloud/src/engine/execution-usage.ts | 6 + .../codemode-persistent-cells.test.ts | 259 ++++++++++++++++ packages/core/api/src/executions/api.ts | 55 +++- packages/core/api/src/handlers/executions.ts | 92 ++++++ packages/core/execution/src/engine.ts | 282 +++++++++++++++++- packages/core/execution/src/index.ts | 5 + packages/core/execution/src/promise.ts | 25 +- packages/hosts/mcp/src/tool-server.test.ts | 15 + packages/hosts/mcp/src/tool-server.ts | 160 ++++++++++ 10 files changed, 900 insertions(+), 9 deletions(-) create mode 100644 e2e/scenarios/codemode-persistent-cells.test.ts diff --git a/apps/cloud/src/api/protected.test.ts b/apps/cloud/src/api/protected.test.ts index 5a1a02283..6f8dfbffd 100644 --- a/apps/cloud/src/api/protected.test.ts +++ b/apps/cloud/src/api/protected.test.ts @@ -11,6 +11,16 @@ const makeBaseEngine = (): ExecutionEngine => status: "completed", result: { result: "ok", logs: [] }, }), + startCell: () => + Effect.succeed({ + status: "completed", + cellId: "cell_test", + cursor: 1, + events: [], + result: { result: "ok", logs: [] }, + }), + waitCell: () => Effect.succeed(null), + terminateCell: () => Effect.succeed(null), resume: () => Effect.succeed({ status: "completed", diff --git a/apps/cloud/src/engine/execution-usage.ts b/apps/cloud/src/engine/execution-usage.ts index cd4489984..403752e1a 100644 --- a/apps/cloud/src/engine/execution-usage.ts +++ b/apps/cloud/src/engine/execution-usage.ts @@ -16,6 +16,12 @@ export const withExecutionUsageTracking = ( engine .executeWithPause(code) .pipe(Effect.tap(() => Effect.sync(() => trackUsage(organizationId)))), + startCell: (code, options) => + engine + .startCell(code, options) + .pipe(Effect.tap(() => Effect.sync(() => trackUsage(organizationId)))), + waitCell: (cellId, options) => engine.waitCell(cellId, options), + terminateCell: (cellId) => engine.terminateCell(cellId), // resume doesn't count as usage resume: (executionId, response) => engine.resume(executionId, response), getPausedExecution: (executionId) => engine.getPausedExecution(executionId), diff --git a/e2e/scenarios/codemode-persistent-cells.test.ts b/e2e/scenarios/codemode-persistent-cells.test.ts new file mode 100644 index 000000000..cab25994b --- /dev/null +++ b/e2e/scenarios/codemode-persistent-cells.test.ts @@ -0,0 +1,259 @@ +import { expect } from "@effect/vitest"; +import { Effect } from "effect"; + +import type { McpCallResult } from "../src/surfaces/mcp"; +import { scenario } from "../src/scenario"; +import { Mcp, Target } from "../src/services"; + +type CellEvent = { + readonly type?: unknown; + readonly item?: { + readonly type?: unknown; + readonly content?: { + readonly type?: unknown; + readonly text?: unknown; + }; + readonly notification?: { + readonly message?: unknown; + readonly data?: unknown; + }; + }; +}; + +type CellObservation = { + readonly status?: unknown; + readonly cellId?: unknown; + readonly cursor?: unknown; + readonly events?: readonly CellEvent[]; + readonly result?: { + readonly result?: unknown; + readonly output?: readonly unknown[]; + }; +}; + +const cellObservation = (result: McpCallResult): CellObservation => { + const structured = (result.raw as { readonly structuredContent?: unknown }).structuredContent; + expect(structured, `cell call returned structured content: ${result.text.slice(0, 300)}`).toEqual( + expect.any(Object), + ); + return structured as CellObservation; +}; + +const eventTypes = (observation: CellObservation): readonly unknown[] => + observation.events?.map((event) => event.type) ?? []; + +const eventsOf = (observation: CellObservation): readonly CellEvent[] => observation.events ?? []; + +const textOutput = (events: readonly CellEvent[]): readonly string[] => + events.flatMap((event) => + event.type === "output" && + event.item?.type === "content" && + event.item.content?.type === "text" && + typeof event.item.content.text === "string" + ? [event.item.content.text] + : [], + ); + +const notificationOutput = (events: readonly CellEvent[]): readonly string[] => + events.flatMap((event) => + event.type === "output" && + event.item?.type === "notification" && + typeof event.item.notification?.message === "string" + ? [event.item.notification.message] + : [], + ); + +const requireString = (value: unknown, label: string): string => { + expect(value, label).toEqual(expect.any(String)); + return value as string; +}; + +const requireNumber = (value: unknown, label: string): number => { + expect(value, label).toEqual(expect.any(Number)); + return value as number; +}; + +scenario( + "Codemode ยท persistent cells yield, wait, notify, and terminate through MCP", + { timeout: 180_000 }, + Effect.gen(function* () { + const target = yield* Target; + const mcp = yield* Mcp; + const identity = yield* target.newIdentity(); + const session = mcp.session(identity); + + const tools = yield* session.listTools(); + expect(tools, "persistent cell tools are advertised").toEqual( + expect.arrayContaining(["execute_cell", "wait_cell", "terminate_cell"]), + ); + + const first = yield* session.call("execute_cell", { + yieldAfterMs: 250, + code: [ + 'text("phase 1");', + "await yield_control();", + 'text("phase 2");', + "await yieldControl();", + 'notify({ message: "cell almost done", data: { phase: 3 } });', + 'text("phase 3");', + 'return { cell: "done" };', + ].join("\n"), + }); + expect(first.ok, `execute_cell starts and reaches the first yield: ${first.text}`).toBe(true); + + const firstCell = cellObservation(first); + expect(firstCell.status, "the first observation leaves the cell running").toBe("running"); + const cellId = requireString(firstCell.cellId, "execute_cell returns a reusable cell id"); + let cursor = requireNumber(firstCell.cursor, "execute_cell returns an event cursor"); + const firstEvents: CellEvent[] = [...eventsOf(firstCell)]; + for ( + let attempt = 0; + attempt < 2 && !firstEvents.some((event) => event.type === "yielded"); + attempt++ + ) { + const yielded = yield* session.call("wait_cell", { + cellId, + after: cursor, + timeoutMs: 5_000, + }); + expect(yielded.ok, `wait_cell observes the first yield: ${yielded.text}`).toBe(true); + const yieldedCell = cellObservation(yielded); + cursor = requireNumber(yieldedCell.cursor, "wait_cell advances the cursor"); + firstEvents.push(...eventsOf(yieldedCell)); + } + expect( + firstEvents.map((event) => event.type), + "the first phase includes output and an explicit yield checkpoint", + ).toEqual(expect.arrayContaining(["output", "yielded"])); + expect(textOutput(firstEvents), "phase 1 output is visible immediately").toContain("phase 1"); + + const secondEvents: CellEvent[] = []; + for (let attempt = 0; attempt < 4; attempt++) { + const next = yield* session.call("wait_cell", { + cellId, + after: cursor, + timeoutMs: 5_000, + }); + expect(next.ok, `wait_cell observes phase 2 progress: ${next.text}`).toBe(true); + const nextCell = cellObservation(next); + cursor = requireNumber(nextCell.cursor, "wait_cell advances the phase 2 cursor"); + secondEvents.push(...eventsOf(nextCell)); + if ( + textOutput(secondEvents).includes("phase 2") && + secondEvents.some((event) => event.type === "yielded") + ) { + break; + } + } + expect(textOutput(secondEvents), "phase 2 output is delivered incrementally").toContain( + "phase 2", + ); + expect( + secondEvents.map((event) => event.type), + "phase 2 includes another explicit yield checkpoint", + ).toContain("yielded"); + + const finalEvents: CellEvent[] = []; + let completedCell: CellObservation | undefined; + for (let attempt = 0; attempt < 5; attempt++) { + const next = yield* session.call("wait_cell", { + cellId, + after: cursor, + timeoutMs: 5_000, + }); + expect(next.ok, `wait_cell observes completion progress: ${next.text}`).toBe(true); + const nextCell = cellObservation(next); + cursor = requireNumber(nextCell.cursor, "wait_cell advances the completion cursor"); + finalEvents.push(...eventsOf(nextCell)); + if (nextCell.status === "completed") { + completedCell = nextCell; + break; + } + } + expect(completedCell, "wait_cell eventually observes the completed cell").toEqual( + expect.any(Object), + ); + + const observedCompletedCell = completedCell as CellObservation; + expect(observedCompletedCell.status, "the final observation is completed").toBe("completed"); + expect( + finalEvents.map((event) => event.type), + "completion includes final outputs and terminal event", + ).toEqual(expect.arrayContaining(["output", "completed"])); + expect( + notificationOutput(finalEvents), + "notifications are emitted as structured events", + ).toContain("cell almost done"); + expect(textOutput(finalEvents), "phase 3 output is visible before completion").toContain( + "phase 3", + ); + expect( + observedCompletedCell.result?.result, + "the returned value is preserved on completion", + ).toEqual({ cell: "done" }); + + const timerRun = yield* session.call("execute_cell", { + yieldAfterMs: 10, + code: [ + "await new Promise((resolve) => setTimeout(resolve, 50));", + 'text("timer fired");', + 'return { timer: "done" };', + ].join("\n"), + }); + expect(timerRun.ok, `execute_cell starts a timer-backed cell: ${timerRun.text}`).toBe(true); + let timerCell = cellObservation(timerRun); + const timerCellId = requireString(timerCell.cellId, "timer cell id is reusable"); + let timerCursor = requireNumber(timerCell.cursor, "timer cell returns an event cursor"); + const timerEvents: CellEvent[] = [...eventsOf(timerCell)]; + for (let attempt = 0; attempt < 5 && timerCell.status !== "completed"; attempt++) { + const next = yield* session.call("wait_cell", { + cellId: timerCellId, + after: timerCursor, + timeoutMs: 5_000, + }); + expect(next.ok, `wait_cell observes timer-backed completion: ${next.text}`).toBe(true); + timerCell = cellObservation(next); + timerCursor = requireNumber(timerCell.cursor, "wait_cell advances the timer cursor"); + timerEvents.push(...eventsOf(timerCell)); + } + expect(timerCell.status, "timer-backed cells can complete after the first observation").toBe( + "completed", + ); + expect(textOutput(timerEvents), "timer output is visible on completion").toContain( + "timer fired", + ); + expect(timerCell.result?.result, "timer cell return value is preserved").toEqual({ + timer: "done", + }); + + const running = yield* session.call("execute_cell", { + yieldAfterMs: 250, + code: [ + "let i = 0;", + "while (true) {", + " text(`loop ${i}`);", + " i += 1;", + " await yield_control();", + "}", + ].join("\n"), + }); + expect(running.ok, `execute_cell starts a cooperative long-running cell: ${running.text}`).toBe( + true, + ); + const runningCell = cellObservation(running); + expect(runningCell.status, "the loop cell is running after its first yield").toBe("running"); + const runningCellId = requireString(runningCell.cellId, "loop cell id is reusable"); + + const terminated = yield* session.call("terminate_cell", { + cellId: runningCellId, + }); + expect(terminated.ok, `terminate_cell returns a terminal observation: ${terminated.text}`).toBe( + true, + ); + const terminatedCell = cellObservation(terminated); + expect(terminatedCell.status, "terminate_cell marks the cell terminated").toBe("terminated"); + expect(eventTypes(terminatedCell), "the termination event is visible to clients").toContain( + "terminated", + ); + }), +); diff --git a/packages/core/api/src/executions/api.ts b/packages/core/api/src/executions/api.ts index ad6eb4067..8d609e4e4 100644 --- a/packages/core/api/src/executions/api.ts +++ b/packages/core/api/src/executions/api.ts @@ -11,6 +11,11 @@ const ExecuteRequest = Schema.Struct({ code: Schema.String, }); +const StartCellRequest = Schema.Struct({ + code: Schema.String, + yieldAfterMs: Schema.optional(Schema.Number), +}); + const CompletedResult = Schema.Struct({ status: Schema.Literal("completed"), text: Schema.String, @@ -26,6 +31,15 @@ const PausedResult = Schema.Struct({ const ExecuteResponse = Schema.Union([CompletedResult, PausedResult]); +const CellObservation = Schema.Struct({ + status: Schema.Literals(["running", "completed", "failed", "terminated"]), + cellId: Schema.String, + cursor: Schema.Number, + events: Schema.Array(Schema.Unknown), + result: Schema.optional(Schema.Unknown), + error: Schema.optional(Schema.String), +}); + const ResumeRequest = Schema.Struct({ action: Schema.Literals(["accept", "decline", "cancel"]), content: Schema.optional(Schema.Unknown), @@ -47,19 +61,17 @@ const ExecutionNotFoundError = Schema.TaggedStruct("ExecutionNotFoundError", { // --------------------------------------------------------------------------- const ExecutionParams = { executionId: Schema.String }; +const CellParams = { cellId: Schema.String }; +const CellWaitQuery = Schema.Struct({ + after: Schema.optional(Schema.String), + timeoutMs: Schema.optional(Schema.String), +}); // --------------------------------------------------------------------------- // Group // --------------------------------------------------------------------------- export const ExecutionsApi = HttpApiGroup.make("executions") - .add( - HttpApiEndpoint.get("getPaused", "/executions/:executionId", { - params: ExecutionParams, - success: PausedExecutionInfo, - error: [InternalError, ExecutionNotFoundError], - }), - ) .add( HttpApiEndpoint.post("execute", "/executions", { payload: ExecuteRequest, @@ -67,6 +79,35 @@ export const ExecutionsApi = HttpApiGroup.make("executions") error: InternalError, }), ) + .add( + HttpApiEndpoint.post("startCell", "/execution-cells", { + payload: StartCellRequest, + success: CellObservation, + error: InternalError, + }), + ) + .add( + HttpApiEndpoint.get("waitCell", "/execution-cells/:cellId", { + params: CellParams, + query: CellWaitQuery, + success: CellObservation, + error: [InternalError, ExecutionNotFoundError], + }), + ) + .add( + HttpApiEndpoint.post("terminateCell", "/execution-cells/:cellId/terminate", { + params: CellParams, + success: CellObservation, + error: [InternalError, ExecutionNotFoundError], + }), + ) + .add( + HttpApiEndpoint.get("getPaused", "/executions/:executionId", { + params: ExecutionParams, + success: PausedExecutionInfo, + error: [InternalError, ExecutionNotFoundError], + }), + ) .add( HttpApiEndpoint.post("resume", "/executions/:executionId/resume", { params: ExecutionParams, diff --git a/packages/core/api/src/handlers/executions.ts b/packages/core/api/src/handlers/executions.ts index 86e1c17ac..d8e34a9e2 100644 --- a/packages/core/api/src/handlers/executions.ts +++ b/packages/core/api/src/handlers/executions.ts @@ -14,6 +14,53 @@ class ExecutionNotFoundError extends Schema.TaggedErrorClass { + if (value === undefined) return undefined; + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : undefined; +}; + +const isJsonRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null && !Array.isArray(value); + +const toJsonCompatible = (value: unknown): unknown => { + if ( + value === null || + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" + ) { + return value; + } + if ( + typeof value === "undefined" || + typeof value === "function" || + typeof value === "symbol" || + typeof value === "bigint" + ) { + return undefined; + } + if (Array.isArray(value)) { + return value.map((item) => toJsonCompatible(item) ?? null); + } + if (value instanceof Date) { + return value.toISOString(); + } + if (isJsonRecord(value)) { + const result: Record = {}; + for (const [key, item] of Object.entries(value)) { + const converted = toJsonCompatible(item); + if (converted !== undefined) { + result[key] = converted; + } + } + return result; + } + return undefined; +}; + +const jsonSafe = (value: A): A => toJsonCompatible(value) as A; + export const ExecutionsHandlers = HttpApiBuilder.group(ExecutorApi, "executions", (handlers) => handlers .handle("getPaused", ({ params: path }) => @@ -55,6 +102,51 @@ export const ExecutionsHandlers = HttpApiBuilder.group(ExecutorApi, "executions" }), ), ) + .handle("startCell", ({ payload }) => + capture( + Effect.gen(function* () { + const engine = yield* ExecutionEngineService; + return yield* captureEngineError( + engine.startCell(payload.code, { + yieldAfterMs: payload.yieldAfterMs, + }), + ).pipe(Effect.map(jsonSafe)); + }), + ), + ) + .handle("waitCell", ({ params: path, query }) => + capture( + Effect.gen(function* () { + const engine = yield* ExecutionEngineService; + const result = yield* captureEngineError( + engine.waitCell(path.cellId, { + after: parseOptionalNumber(query.after), + timeoutMs: parseOptionalNumber(query.timeoutMs), + }), + ); + + if (!result) { + return yield* new ExecutionNotFoundError({ executionId: path.cellId }); + } + + return jsonSafe(result); + }), + ), + ) + .handle("terminateCell", ({ params: path }) => + capture( + Effect.gen(function* () { + const engine = yield* ExecutionEngineService; + const result = yield* engine.terminateCell(path.cellId); + + if (!result) { + return yield* new ExecutionNotFoundError({ executionId: path.cellId }); + } + + return jsonSafe(result); + }), + ), + ) .handle("resume", ({ params: path, payload }) => capture( Effect.gen(function* () { diff --git a/packages/core/execution/src/engine.ts b/packages/core/execution/src/engine.ts index 24a5a0440..1f3db5a31 100644 --- a/packages/core/execution/src/engine.ts +++ b/packages/core/execution/src/engine.ts @@ -1,5 +1,5 @@ import { Deferred, Effect, Fiber, Predicate, Queue } from "effect"; -import type * as Cause from "effect/Cause"; +import * as Cause from "effect/Cause"; import * as Exit from "effect/Exit"; import type { @@ -12,6 +12,7 @@ import type { import { CodeExecutionError } from "@executor-js/codemode-core"; import type { CodeExecutor, + ExecuteOutputItem, ExecuteNotification, ExecuteResult, SandboxToolInvoker, @@ -41,6 +42,50 @@ export type ExecutionResult = | { readonly status: "completed"; readonly result: ExecuteResult } | { readonly status: "paused"; readonly execution: PausedExecution }; +export type ExecutionCellStatus = "running" | "completed" | "failed" | "terminated"; + +type ExecutionCellEventPayload = + | { + readonly type: "output"; + readonly item: ExecuteOutputItem; + } + | { + readonly type: "yielded"; + } + | { + readonly type: "completed"; + readonly result: ExecuteResult; + } + | { + readonly type: "failed"; + readonly error: string; + } + | { + readonly type: "terminated"; + }; + +export type ExecutionCellEvent = ExecutionCellEventPayload & { + readonly id: number; +}; + +export type ExecutionCellObservation = { + readonly status: ExecutionCellStatus; + readonly cellId: string; + readonly cursor: number; + readonly events: readonly ExecutionCellEvent[]; + readonly result?: ExecuteResult; + readonly error?: string; +}; + +export type StartExecutionCellOptions = { + readonly yieldAfterMs?: number; +}; + +export type WaitExecutionCellOptions = { + readonly after?: number; + readonly timeoutMs?: number; +}; + export type PausedExecution = { readonly id: string; readonly elicitationContext: ElicitationContext; @@ -53,6 +98,25 @@ type InternalPausedExecution = PausedExecution & { readonly pauseQueue: Queue.Queue>; }; +type InternalExecutionCell = { + readonly id: string; + events: ExecutionCellEvent[]; + nextEventId: number; + status: ExecutionCellStatus; + result?: ExecuteResult; + error?: string; + fiber?: Fiber.Fiber; + readonly waiters: Set>; + readonly yieldContinuations: Set<{ + readonly resume: () => void; + readonly terminate: () => void; + }>; +}; + +type CellObservationMode = "any-event" | "yield-or-terminal"; + +const CELL_TERMINATED_MESSAGE = "Execution cell terminated"; + export type ResumeResponse = { readonly action: "accept" | "decline" | "cancel"; readonly content?: Record; @@ -405,6 +469,30 @@ export type ExecutionEngine */ readonly getPausedExecution: (executionId: string) => Effect.Effect; + /** + * Start a persistent execution cell. The cell keeps running after the first + * observation when it yields or exceeds `yieldAfterMs`. + */ + readonly startCell: ( + code: string, + options?: StartExecutionCellOptions, + ) => Effect.Effect; + + /** + * Wait for new cell events after `after`. Returns null when the cell id is + * unknown. + */ + readonly waitCell: ( + cellId: string, + options?: WaitExecutionCellOptions, + ) => Effect.Effect; + + /** + * Stop a running cell and return its current observation. Returns null when + * the cell id is unknown. + */ + readonly terminateCell: (cellId: string) => Effect.Effect; + /** * Get the dynamic tool description (workflow + namespaces). */ @@ -416,6 +504,8 @@ export const createExecutionEngine = => { const { executor, codeExecutor, toolDiscoveryProvider = defaultToolDiscoveryProvider } = config; const pausedExecutions = new Map>(); + const executionCells = new Map>(); + const runSync = Effect.runSync; // Outcomes of executions that already settled (resumed to completion, hit a // new pause, or died while paused). MCP clients retry `resume` when a // response gets lost in transit; without this cache the retry of an @@ -466,6 +556,193 @@ export const createExecutionEngine = ): void => { + for (const waiter of cell.waiters) { + runSync(Deferred.succeed(waiter, undefined)); + } + cell.waiters.clear(); + }; + + const appendCellEvent = ( + cell: InternalExecutionCell, + event: ExecutionCellEventPayload, + ): ExecutionCellEvent => { + const recorded = { ...event, id: cell.nextEventId++ } as ExecutionCellEvent; + cell.events.push(recorded); + notifyCellWaiters(cell); + return recorded; + }; + + const observeCell = (cell: InternalExecutionCell, after = 0): ExecutionCellObservation => { + const events = cell.events.filter((event) => event.id > after); + const yielded = events.some((event) => event.type === "yielded"); + if (yielded) { + for (const continuation of cell.yieldContinuations) { + continuation.resume(); + } + cell.yieldContinuations.clear(); + } + const cursor = Math.max(after, cell.events.at(-1)?.id ?? after); + return { + status: cell.status, + cellId: cell.id, + cursor, + events, + ...(cell.result !== undefined ? { result: cell.result } : {}), + ...(cell.error !== undefined ? { error: cell.error } : {}), + }; + }; + + const waitForCellObservation = ( + cell: InternalExecutionCell, + options: WaitExecutionCellOptions = {}, + mode: CellObservationMode = "any-event", + ): Effect.Effect => { + const after = options.after ?? 0; + const timeoutMs = Math.max(0, Math.floor(options.timeoutMs ?? 30_000)); + return Effect.gen(function* () { + const startedAt = Date.now(); + while (true) { + const current = observeCell(cell, after); + const hasYieldOrTerminalEvent = current.events.some((event) => event.type !== "output"); + const ready = + mode === "any-event" + ? current.events.length > 0 || cell.status !== "running" + : hasYieldOrTerminalEvent || cell.status !== "running"; + if (ready || timeoutMs === 0) return current; + + const remainingMs = timeoutMs - (Date.now() - startedAt); + if (remainingMs <= 0) return current; + + const waiter = yield* Deferred.make(); + cell.waiters.add(waiter); + yield* Deferred.await(waiter).pipe( + Effect.timeoutOrElse({ + duration: `${remainingMs} millis`, + orElse: () => Effect.void, + }), + Effect.ensuring(Effect.sync(() => cell.waiters.delete(waiter))), + ); + } + }); + }; + + const startExecutionCell = Effect.fn("mcp.execute.cell.start")(function* ( + code: string, + options: StartExecutionCellOptions = {}, + ) { + yield* Effect.annotateCurrentSpan({ + "mcp.execute.mode": "cell", + "mcp.execute.code_length": code.length, + }); + + const cell: InternalExecutionCell = { + id: `cell_${crypto.randomUUID()}`, + events: [], + nextEventId: 1, + status: "running", + waiters: new Set(), + yieldContinuations: new Set(), + }; + executionCells.set(cell.id, cell); + + const baseInvoker = makeFullInvoker( + executor, + { + onElicitation: () => Effect.succeed({ action: "cancel" as const }), + }, + toolDiscoveryProvider, + ); + const invoker: SandboxToolInvoker = { + invoke: (input) => baseInvoker.invoke(input), + }; + + const fiber = yield* Effect.forkDetach( + codeExecutor + .execute(code, invoker, { + onOutput: (item) => { + if (cell.status !== "running") return; + appendCellEvent(cell, { type: "output", item }); + }, + onYield: () => + new Promise((resolve, reject) => { + if (cell.status !== "running") { + // oxlint-disable-next-line executor/no-promise-reject -- boundary: sandbox runtimes observe rejected yield promises as cooperative cell termination + reject(CELL_TERMINATED_MESSAGE); + return; + } + appendCellEvent(cell, { type: "yielded" }); + cell.yieldContinuations.add({ + resume: resolve, + terminate: () => { + // oxlint-disable-next-line executor/no-promise-reject -- boundary: sandbox runtimes observe rejected yield promises as cooperative cell termination + reject(CELL_TERMINATED_MESSAGE); + }, + }); + }), + }) + .pipe(Effect.withSpan("executor.code.exec")), + ); + cell.fiber = fiber; + + yield* Effect.forkDetach( + Fiber.await(fiber).pipe( + Effect.flatMap((exit) => + Effect.sync(() => { + if (cell.status === "terminated") return; + if (Exit.isSuccess(exit)) { + cell.result = exit.value; + cell.status = exit.value.error ? "failed" : "completed"; + appendCellEvent(cell, { type: "completed", result: exit.value }); + return; + } + const error = Cause.pretty(exit.cause); + cell.error = error; + cell.status = "failed"; + appendCellEvent(cell, { type: "failed", error }); + }), + ), + ), + ); + + return yield* waitForCellObservation( + cell, + { + after: 0, + timeoutMs: options.yieldAfterMs ?? 1_000, + }, + "yield-or-terminal", + ); + }); + + const waitExecutionCell = Effect.fn("mcp.execute.cell.wait")(function* ( + cellId: string, + options: WaitExecutionCellOptions = {}, + ) { + const cell = executionCells.get(cellId); + if (!cell) return null; + return yield* waitForCellObservation(cell, options); + }); + + const terminateExecutionCell = Effect.fn("mcp.execute.cell.terminate")(function* ( + cellId: string, + ) { + const cell = executionCells.get(cellId); + if (!cell) return null; + if (cell.status === "running") { + cell.status = "terminated"; + appendCellEvent(cell, { type: "terminated" }); + for (const continuation of cell.yieldContinuations) { + continuation.terminate(); + } + cell.yieldContinuations.clear(); + if (cell.fiber) { + yield* Fiber.interrupt(cell.fiber).pipe(Effect.ignore); + } + } + return observeCell(cell); + }); + /** * Start an execution in pause/resume mode. * @@ -625,6 +902,9 @@ export const createExecutionEngine = Effect.sync(() => pausedExecutions.get(executionId) ?? null), + startCell: startExecutionCell, + waitCell: waitExecutionCell, + terminateCell: terminateExecutionCell, getDescription: buildExecuteDescription(executor), }; }; diff --git a/packages/core/execution/src/index.ts b/packages/core/execution/src/index.ts index 18955fc67..ca4e5b89e 100644 --- a/packages/core/execution/src/index.ts +++ b/packages/core/execution/src/index.ts @@ -2,11 +2,16 @@ export { createExecutionEngine, formatExecuteResult, formatPausedExecution, + type ExecutionCellEvent, + type ExecutionCellObservation, + type ExecutionCellStatus, type ExecutionEngine, type ExecutionEngineConfig, type ExecutionResult, type PausedExecution, type ResumeResponse, + type StartExecutionCellOptions, + type WaitExecutionCellOptions, } from "./engine"; export { buildExecuteDescription } from "./description"; diff --git a/packages/core/execution/src/promise.ts b/packages/core/execution/src/promise.ts index cf57304a5..45fe75c3e 100644 --- a/packages/core/execution/src/promise.ts +++ b/packages/core/execution/src/promise.ts @@ -23,10 +23,13 @@ import type { CodeExecutionError, CodeExecutor, ExecuteResult } from "@executor- import { createExecutionEngine as createEffectExecutionEngine, + type ExecutionCellObservation, type ExecutionEngine as EffectExecutionEngine, type ExecutionResult, type PausedExecution, type ResumeResponse, + type StartExecutionCellOptions, + type WaitExecutionCellOptions, } from "./engine"; export type ElicitationHandler = (ctx: ElicitationContext) => Promise; @@ -47,6 +50,15 @@ export type ExecutionEngine = { response: ResumeResponse, ) => Promise; readonly getPausedExecution: (executionId: string) => Promise; + readonly startCell: ( + code: string, + options?: StartExecutionCellOptions, + ) => Promise; + readonly waitCell: ( + cellId: string, + options?: WaitExecutionCellOptions, + ) => Promise; + readonly terminateCell: (cellId: string) => Promise; readonly getDescription: () => Promise; }; @@ -152,6 +164,9 @@ export const toPromiseExecutionEngine = ( executeWithPause: (code) => Effect.runPromise(engine.executeWithPause(code)), resume: (executionId, response) => Effect.runPromise(engine.resume(executionId, response)), getPausedExecution: (executionId) => Effect.runPromise(engine.getPausedExecution(executionId)), + startCell: (code, options) => Effect.runPromise(engine.startCell(code, options)), + waitCell: (cellId, options) => Effect.runPromise(engine.waitCell(cellId, options)), + terminateCell: (cellId) => Effect.runPromise(engine.terminateCell(cellId)), getDescription: () => Effect.runPromise(engine.getDescription), }); @@ -171,7 +186,15 @@ export const createExecutionEngine = (overrides: { execute?: ExecutionEngine["execute"]; executeWithPause?: ExecutionEngine["executeWithPause"]; + startCell?: ExecutionEngine["startCell"]; + waitCell?: ExecutionEngine["waitCell"]; + terminateCell?: ExecutionEngine["terminateCell"]; resume?: ExecutionEngine["resume"]; description?: string; }): ExecutionEngine => ({ @@ -36,6 +39,18 @@ const makeStubEngine = (overrides: { executeWithPause: overrides.executeWithPause ?? (() => Effect.succeed({ status: "completed", result: { result: "default" } })), + startCell: + overrides.startCell ?? + (() => + Effect.succeed({ + status: "completed", + cellId: "cell_test", + cursor: 1, + events: [], + result: { result: "default" }, + })), + waitCell: overrides.waitCell ?? (() => Effect.succeed(null)), + terminateCell: overrides.terminateCell ?? (() => Effect.succeed(null)), resume: overrides.resume ?? (() => Effect.succeed(null)), getPausedExecution: () => Effect.succeed(null), getDescription: Effect.succeed(overrides.description ?? "test executor"), diff --git a/packages/hosts/mcp/src/tool-server.ts b/packages/hosts/mcp/src/tool-server.ts index fc17fc3fc..0b37e0905 100644 --- a/packages/hosts/mcp/src/tool-server.ts +++ b/packages/hosts/mcp/src/tool-server.ts @@ -21,6 +21,8 @@ import type { import type * as Tracer from "effect/Tracer"; import { createExecutionEngine, + type ExecutionCellEvent, + type ExecutionCellObservation, formatExecuteResult, formatPausedExecution, type ExecutionEngine, @@ -437,6 +439,43 @@ const toMcpResult = (result: FormattedExecuteInput): McpToolResult => { }; }; +const cellEventContent = (event: ExecutionCellEvent): ContentBlock[] => { + if (event.type === "output") return outputItemContent(event.item); + if (event.type === "yielded") { + return [{ type: "text", text: `Execution yielded at cursor ${event.id}.` }]; + } + if (event.type === "completed") { + return [{ type: "text", text: formatExecuteResult(event.result).text }]; + } + if (event.type === "failed") { + return [{ type: "text", text: `Execution failed:\n${event.error}` }]; + } + return [{ type: "text", text: "Execution terminated." }]; +}; + +const toMcpCellResult = (observation: ExecutionCellObservation): McpToolResult => { + const content = observation.events.flatMap(cellEventContent); + if (content.length === 0) { + content.push({ + type: "text", + text: `Execution cell ${observation.cellId} is ${observation.status}. cursor=${observation.cursor}`, + }); + } + + return { + content, + structuredContent: { + status: observation.status, + cellId: observation.cellId, + cursor: observation.cursor, + events: observation.events, + ...(observation.result !== undefined ? { result: observation.result } : {}), + ...(observation.error !== undefined ? { error: observation.error } : {}), + }, + isError: observation.status === "failed" || undefined, + }; +}; + const toMcpPausedResult = (formatted: ReturnType): McpToolResult => ({ content: [{ type: "text", text: formatted.text }], structuredContent: formatted.structured, @@ -526,6 +565,20 @@ const missingExecutionResult = (executionId: string): McpToolResult => ({ isError: true, }); +const missingCellResult = (cellId: string): McpToolResult => ({ + content: [ + { + type: "text" as const, + text: `No execution cell: ${cellId}. The cell expired, was terminated, or was lost when its session restarted.`, + }, + ], + structuredContent: { + status: "cell_not_found", + cellId, + }, + isError: true, +}); + const JsonObjectFromString = Schema.fromJsonString(Schema.Record(Schema.String, Schema.Unknown)); const decodeJsonObjectString = Schema.decodeUnknownOption(JsonObjectFromString); @@ -631,6 +684,54 @@ export const createExecutorMcpServer = ( }), ); + const executeCell = ( + code: string, + yieldAfterMs: number | undefined, + ): Effect.Effect => + Effect.gen(function* () { + const observation = yield* engine.startCell(code, { yieldAfterMs }); + return toMcpCellResult(observation); + }).pipe( + Effect.withSpan("mcp.host.tool.execute_cell", { + attributes: { + "mcp.tool.name": "execute_cell", + "mcp.execute.code_length": code.length, + }, + }), + ); + + const waitCell = ( + cellId: string, + after: number | undefined, + timeoutMs: number | undefined, + ): Effect.Effect => + Effect.gen(function* () { + const observation = yield* engine.waitCell(cellId, { after, timeoutMs }); + if (!observation) return missingCellResult(cellId); + return toMcpCellResult(observation); + }).pipe( + Effect.withSpan("mcp.host.tool.wait_cell", { + attributes: { + "mcp.tool.name": "wait_cell", + "mcp.execute.cell_id": cellId, + }, + }), + ); + + const terminateCell = (cellId: string): Effect.Effect => + Effect.gen(function* () { + const observation = yield* engine.terminateCell(cellId); + if (!observation) return missingCellResult(cellId); + return toMcpCellResult(observation); + }).pipe( + Effect.withSpan("mcp.host.tool.terminate_cell", { + attributes: { + "mcp.tool.name": "terminate_cell", + "mcp.execute.cell_id": cellId, + }, + }), + ); + const resumeExecution = ( executionId: string, action: "accept" | "decline" | "cancel", @@ -749,6 +850,65 @@ export const createExecutorMcpServer = ( }), ); + yield* Effect.sync(() => + server.registerTool( + "execute_cell", + { + description: [ + "Start a persistent execution cell. Use this for code that yields, runs longer than one request, or needs incremental observation.", + "The code can call `await yield_control()` or `await yieldControl()` to hand back currently emitted output and continue after the next wait.", + "Call `wait_cell` with the returned cellId and cursor to observe new events. Call `terminate_cell` to stop it.", + ].join("\n"), + inputSchema: { + code: z.string().trim().min(1), + yieldAfterMs: z.number().optional(), + }, + }, + ({ code, yieldAfterMs }) => runToolEffect(executeCell(code, yieldAfterMs)), + ), + ).pipe( + Effect.withSpan("mcp.host.register_tool", { + attributes: { "mcp.tool.name": "execute_cell" }, + }), + ); + + yield* Effect.sync(() => + server.registerTool( + "wait_cell", + { + description: + "Wait for new events from a persistent execution cell. Pass the cursor returned by execute_cell or the previous wait_cell call.", + inputSchema: { + cellId: z.string(), + after: z.number().optional(), + timeoutMs: z.number().optional(), + }, + }, + ({ cellId, after, timeoutMs }) => runToolEffect(waitCell(cellId, after, timeoutMs)), + ), + ).pipe( + Effect.withSpan("mcp.host.register_tool", { + attributes: { "mcp.tool.name": "wait_cell" }, + }), + ); + + yield* Effect.sync(() => + server.registerTool( + "terminate_cell", + { + description: "Terminate a persistent execution cell.", + inputSchema: { + cellId: z.string(), + }, + }, + ({ cellId }) => runToolEffect(terminateCell(cellId)), + ), + ).pipe( + Effect.withSpan("mcp.host.register_tool", { + attributes: { "mcp.tool.name": "terminate_cell" }, + }), + ); + yield* Effect.sync(() => { if (elicitationMode.mode === "native") { return undefined;