From 5f50cd2b9c02ea23e27fa324bcb449849a5598c4 Mon Sep 17 00:00:00 2001 From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com> Date: Sun, 28 Jun 2026 16:22:16 -0700 Subject: [PATCH] feat(cli): make executor mcp ensure a durable daemon and bridge to it executor mcp no longer starts a server in-process. It ensures a durable detached daemon and bridges stdio JSON-RPC to that owner over HTTP. Concurrent cold starts run a race-safe election: one process becomes the owner and the rest wait for its manifest and attach instead of failing. The owner's lifetime is independent of any MCP client, so many clients, the web UI, and the desktop app share one local server. This builds on the merged start-lock primitive so no client ever owns the database, replacing the earlier approach where the first mcp process started a server in-process and bridged to itself. Adds a cold-start election probe across the cli VM targets plus a local attach stress test; the one-winner, rest-attach behavior is verified on macOS, Linux, and Windows. --- apps/cli/package.json | 1 + apps/cli/src/main.ts | 402 +++++++++++------- bun.lock | 1 + e2e/cli/election-cold-start.test.ts | 160 +++++++ e2e/cli/election-cold-start.win.ps1 | 56 +++ .../cli-mcp-daemon-attach-stress.test.ts | 342 +++++++++++++++ 6 files changed, 800 insertions(+), 162 deletions(-) create mode 100644 e2e/cli/election-cold-start.test.ts create mode 100644 e2e/cli/election-cold-start.win.ps1 create mode 100644 e2e/local/cli-mcp-daemon-attach-stress.test.ts diff --git a/apps/cli/package.json b/apps/cli/package.json index b45e7d11c..607a73322 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -30,6 +30,7 @@ "@executor-js/runtime-quickjs": "workspace:*", "@executor-js/sdk": "workspace:*", "@jitl/quickjs-wasmfile-release-sync": "catalog:", + "@modelcontextprotocol/sdk": "^1.29.0", "@sentry/bun": "^10.57.0", "effect": "catalog:", "quickjs-emscripten": "catalog:" diff --git a/apps/cli/src/main.ts b/apps/cli/src/main.ts index be8955166..b4fed3c79 100644 --- a/apps/cli/src/main.ts +++ b/apps/cli/src/main.ts @@ -63,6 +63,9 @@ import type { PlatformError } from "effect/PlatformError"; import * as Effect from "effect/Effect"; import * as Option from "effect/Option"; import * as Cause from "effect/Cause"; +import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; +import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; +import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; import { ExecutorApi, checkForUpdate } from "@executor-js/api"; import { @@ -84,15 +87,12 @@ import { } from "./device-login"; import { startServer, - runMcpStdioServer, - getExecutor, rotateLocalAuthToken, localAuthTokenPath, findDataDirOwnershipHeld, type ServerInstance, type StartServerOptions, } from "@executor-js/local"; -import { makeQuickJsExecutor } from "@executor-js/runtime-quickjs"; import { fetchIntegrations } from "./integrations"; import { buildDaemonSpawnSpec, @@ -501,77 +501,134 @@ const waitForDaemonStartupTarget = (input: { // Serialize daemon startup behind a filesystem lock so concurrent CLI invocations don't // each spawn their own daemon. The post-lock pointer recheck catches the case where // another invocation finished bootstrapping while we were waiting for the lock. -const spawnAndWaitForDaemon = (input: { +// A storm of concurrent cold starts should elect ONE owner with the rest +// attaching, never N-1 hard failures. Each attempt either wins the per-scope +// start lock and spawns the daemon, or waits for the current holder's manifest. +// Re-acquiring after a wait timeout is what recovers the one window the wait +// alone cannot: acquireDaemonStartLock reclaims a STALE lock at acquisition, so +// a holder that took the lock then died BEFORE spawning (no daemon, no manifest) +// is recovered when a loser loops back and re-acquires. +const MAX_DAEMON_ELECTION_ATTEMPTS = 3; + +/** The stable message acquireDaemonStartLock fails with on genuine lock + * contention. Only this should be treated as "another process is electing"; + * any other failure (e.g. an unwritable data dir) must propagate, not masquerade + * as a race the caller should wait out. */ +const isStartLockContention = (error: Error): boolean => + error.message.includes("Another daemon startup is already in progress"); + +const spawnDaemonAsLockHolder = (input: { host: string; scopeId: string; preferredPort: number; allowedHosts: ReadonlyArray; }): Effect.Effect => Effect.gen(function* () { - const lock = yield* acquireDaemonStartLock({ hostname: input.host, scopeId: input.scopeId }); - - try { - const existing = yield* readDaemonPointer({ hostname: input.host, scopeId: input.scopeId }); - if (existing && isPidAlive(existing.pid)) { - const existingUrl = daemonBaseUrl(existing.hostname, existing.port); - if (yield* isServerReachable(existingUrl)) { - return existingUrl; - } + const existing = yield* readDaemonPointer({ hostname: input.host, scopeId: input.scopeId }); + if (existing && isPidAlive(existing.pid)) { + const existingUrl = daemonBaseUrl(existing.hostname, existing.port); + if (yield* isServerReachable(existingUrl)) { + return existingUrl; } + } - const selectedPort = yield* chooseDaemonPort({ - preferredPort: input.preferredPort, - hostname: input.host, - }); + const selectedPort = yield* chooseDaemonPort({ + preferredPort: input.preferredPort, + hostname: input.host, + }); - if (selectedPort !== input.preferredPort) { - console.error( - `Port ${input.preferredPort} is in use. Starting daemon on available port ${selectedPort} instead.`, - ); - } + if (selectedPort !== input.preferredPort) { + console.error( + `Port ${input.preferredPort} is in use. Starting daemon on available port ${selectedPort} instead.`, + ); + } - const spec = yield* Effect.try({ - try: () => - buildDaemonSpawnSpec({ - port: selectedPort, - hostname: input.host, - isDevMode, - scriptPath: script, - executablePath: process.execPath, - allowedHosts: input.allowedHosts, - }), - catch: (cause) => - cause instanceof Error - ? cause - : new Error(`Failed to build daemon command: ${String(cause)}`), - }); + const spec = yield* Effect.try({ + try: () => + buildDaemonSpawnSpec({ + port: selectedPort, + hostname: input.host, + isDevMode, + scriptPath: script, + executablePath: process.execPath, + allowedHosts: input.allowedHosts, + }), + catch: (cause) => + cause instanceof Error + ? cause + : new Error(`Failed to build daemon command: ${String(cause)}`), + }); - const startBaseUrl = daemonBaseUrl(input.host, selectedPort); - console.error(`Starting daemon on ${input.host}:${selectedPort}...`); - const child = yield* spawnDetached({ - command: spec.command, - args: spec.args, - env: process.env, - }); + const startBaseUrl = daemonBaseUrl(input.host, selectedPort); + console.error(`Starting daemon on ${input.host}:${selectedPort}...`); + const child = yield* spawnDetached({ + command: spec.command, + args: spec.args, + env: process.env, + }); - const readyBaseUrl = yield* waitForDaemonStartupTarget({ requestedBaseUrl: startBaseUrl }); + const readyBaseUrl = yield* waitForDaemonStartupTarget({ requestedBaseUrl: startBaseUrl }); - if (!readyBaseUrl) { - yield* terminateSpawnedDetachedProcess(child).pipe(Effect.ignore); - return yield* Effect.fail( - new Error( - [ - `Daemon did not become reachable at ${startBaseUrl} and no reachable local server manifest appeared within ${DAEMON_BOOT_TIMEOUT_MS}ms.`, - `Run in foreground to inspect logs: ${cliPrefix} daemon run --foreground --port ${selectedPort} --hostname ${input.host}`, - ].join("\n"), - ), + if (!readyBaseUrl) { + yield* terminateSpawnedDetachedProcess(child).pipe(Effect.ignore); + return yield* Effect.fail( + new Error( + [ + `Daemon did not become reachable at ${startBaseUrl} and no reachable local server manifest appeared within ${DAEMON_BOOT_TIMEOUT_MS}ms.`, + `Run in foreground to inspect logs: ${cliPrefix} daemon run --foreground --port ${selectedPort} --hostname ${input.host}`, + ].join("\n"), + ), + ); + } + + return readyBaseUrl; + }); + +const spawnAndWaitForDaemon = (input: { + host: string; + scopeId: string; + preferredPort: number; + allowedHosts: ReadonlyArray; +}): Effect.Effect => + Effect.gen(function* () { + const requestedBaseUrl = daemonBaseUrl(input.host, input.preferredPort); + + for (let attempt = 1; attempt <= MAX_DAEMON_ELECTION_ATTEMPTS; attempt++) { + const acquired = yield* acquireDaemonStartLock({ + hostname: input.host, + scopeId: input.scopeId, + }).pipe( + Effect.map((lock) => ({ held: true as const, lock })), + Effect.catch((error) => + isStartLockContention(error) + ? Effect.succeed({ held: false as const, lock: null }) + : Effect.fail(error), + ), + ); + + if (acquired.held) { + const lock = acquired.lock; + return yield* spawnDaemonAsLockHolder(input).pipe( + Effect.ensuring(releaseDaemonStartLock(lock).pipe(Effect.ignore)), ); } - return readyBaseUrl; - } finally { - yield* releaseDaemonStartLock(lock).pipe(Effect.ignore); + // Lost the lock: wait for the current holder to advertise a manifest. + const ready = yield* waitForDaemonStartupTarget({ requestedBaseUrl }); + if (ready) return ready; + // Timed out with no manifest. The holder may have died mid-startup; loop to + // re-acquire, which reclaims its now-stale lock. } + + return yield* Effect.fail( + new Error( + [ + `Could not elect or attach to a local Executor daemon after ${MAX_DAEMON_ELECTION_ATTEMPTS} attempts.`, + "A daemon startup may be stuck. Stop any partial daemon and retry, or run it in the foreground:", + `${cliPrefix} daemon run --foreground --port ${input.preferredPort} --hostname ${input.host}`, + ].join("\n"), + ), + ); }); // Auto-start a local daemon on demand so commands like `executor call` work without the @@ -1254,125 +1311,146 @@ const runBackgroundDaemonStart = (input: { }).pipe(Effect.mapError(toError)); // --------------------------------------------------------------------------- -// Stdio MCP session +// Stdio MCP session: a pure stdio <-> HTTP bridge to the owning local daemon. // --------------------------------------------------------------------------- -const withStdoutReroutedToStderr = async (body: () => Promise): Promise => { - const originalWrite = process.stdout.write; - const originalLog = console.log; - const originalInfo = console.info; - const originalDebug = console.debug; - const stderrWrite = process.stderr.write.bind(process.stderr); +const mcpUrlForActiveLocalServer = ( + connection: ExecutorServerConnection, + elicitationMode: "browser" | "model", +): URL => { + const url = new URL("/mcp", connection.origin); + if (elicitationMode === "browser") { + url.searchParams.set("elicitation_mode", "browser"); + } + return url; +}; + +/** + * Bridge a stdio MCP client to a local server's HTTP `/mcp` endpoint. `executor + * mcp` owns NO database: it forwards JSON-RPC between the client's stdin/stdout + * and the daemon over Streamable HTTP. That keeps any number of MCP clients (plus + * the web UI and the desktop app) attached to the single owning daemon at once, + * and means a transient MCP client exiting never takes the server down. Resolves + * when stdin closes or the daemon connection drops; close is best-effort. + */ +const runMcpHttpBridge = async (input: { + readonly manifest: ExecutorLocalServerManifest; + readonly elicitationMode: "browser" | "model"; +}): Promise => { + const stdio = new StdioServerTransport(); + const authorization = getExecutorServerAuthorizationHeader(input.manifest.connection); + const http = new StreamableHTTPClientTransport( + mcpUrlForActiveLocalServer(input.manifest.connection, input.elicitationMode), + authorization ? { requestInit: { headers: { Authorization: authorization } } } : undefined, + ); + + let finished = false; + let closing = false; + let closePromise: Promise | null = null; + let resolveExit: () => void = () => {}; + const waitForExit = new Promise((resolve) => { + resolveExit = resolve; + }); + + const finish = () => { + if (finished) return; + finished = true; + process.off("SIGINT", shutdown); + process.off("SIGTERM", shutdown); + process.stdin.off("end", shutdown); + process.stdin.off("close", shutdown); + resolveExit(); + }; + + const closeBoth = (): Promise => { + if (!closePromise) { + closing = true; + closePromise = Promise.allSettled([stdio.close(), http.close()]).then(() => undefined); + } + return closePromise; + }; - process.stdout.write = ((...args: Parameters) => - stderrWrite(...args)) as typeof process.stdout.write; - console.log = console.error.bind(console); - console.info = console.error.bind(console); - console.debug = console.error.bind(console); + function shutdown() { + finish(); + void closeBoth(); + } + + const isAbortDuringClose = (error: Error): boolean => + error.name === "AbortError" || error.message.toLowerCase().includes("aborted"); + + const reportError = (context: string, cause: unknown) => { + const error = toError(cause); + if (closing && isAbortDuringClose(error)) return; + console.error(`Executor MCP bridge ${context}: ${error.message}`); + }; + + const forwardMessage = + (send: (message: JSONRPCMessage) => Promise, context: string) => + (message: JSONRPCMessage) => { + void send(message).then(undefined, (cause: unknown) => { + reportError(context, cause); + shutdown(); + }); + }; + + process.once("SIGINT", shutdown); + process.once("SIGTERM", shutdown); + process.stdin.once("end", shutdown); + process.stdin.once("close", shutdown); + + stdio.onclose = shutdown; + http.onclose = shutdown; + stdio.onerror = (error) => reportError("stdio transport error", error); + http.onerror = (error) => reportError("daemon transport error", error); + stdio.onmessage = forwardMessage((message) => http.send(message), "failed to send to daemon"); + http.onmessage = forwardMessage((message) => stdio.send(message), "failed to send to stdio"); try { - return await body(); + await http.start(); + await stdio.start(); + await waitForExit; } finally { - process.stdout.write = originalWrite; - console.log = originalLog; - console.info = originalInfo; - console.debug = originalDebug; + finish(); + await closeBoth(); } }; const runStdioMcpSession = (input: { readonly elicitationMode: "browser" | "model" }) => Effect.gen(function* () { - // No process-level startup lock: the DB ownership lock inside startServer - // (openOwnedLocalDatabase) is the real gate. assertNoOtherActiveLocalServer - // is a friendly fast-path that may race without being unsafe. - yield* assertNoOtherActiveLocalServer(); - const web = yield* Effect.tryPromise({ - try: () => - withStdoutReroutedToStderr(async () => { - const host = "127.0.0.1"; - const port = await Effect.runPromise( - chooseDaemonPort({ preferredPort: DEFAULT_PORT, hostname: host }), - ); - const baseUrl = `http://localhost:${port}`; - const restoreWebBaseUrl = installDefaultExecutorWebBaseUrl(baseUrl); - - try { - const executor = await getExecutor(); - const server = await startServer({ - port, - hostname: host, - embeddedWebUI, - }); - const serverBaseUrl = `http://localhost:${server.port}`; - return { executor, server, baseUrl: serverBaseUrl, restoreWebBaseUrl }; - } catch (cause) { - restoreWebBaseUrl(); - throw cause; - } - }), - catch: (cause) => cause, - }).pipe( - Effect.catch((cause) => { - const ownership = findDataDirOwnershipHeld(cause); - if (!ownership) return Effect.fail(toError(cause)); - return Effect.gen(function* () { - const manifest = yield* readReachableLocalServerHint(); - const path = yield* PlatformPath.Path; - const dataDir = resolveExecutorDataDir(path); - if (manifest) { - return yield* Effect.fail( - new Error( - [ - `A local Executor server already owns ${dataDir} at ${manifest.connection.origin}.`, - "The stdio MCP server needs exclusive access to the local database.", - `Use the running HTTP MCP endpoint instead: ${manifest.connection.origin}/mcp`, - ].join("\n"), - ), - ); - } - return yield* Effect.fail( - new Error( - [ - "Executor data directory is owned by another live process, but no reachable local server was advertised.", - `Data directory: ${dataDir}`, - `Ownership lock: ${ownership.lockPath}`, - "Wait for the existing process to finish starting, or stop it and retry.", - ].join("\n"), - ), - ); - }); - }), - ); - yield* publishLocalServerManifest({ - kind: "foreground", - connection: normalizeExecutorServerConnection({ - kind: "http", - origin: web.baseUrl, - displayName: "CLI MCP", - auth: { kind: "bearer", token: web.server.authToken }, - }), - }); - - try { + // `executor mcp` never owns the local database. If a local server is already + // running, bridge this stdio client to it; otherwise ensure a durable + // background daemon is up and bridge to that. ensureDaemon is the race-safe + // election: concurrent cold starts elect one owner and the losers wait for + // its manifest (waitForDaemonStartupTarget) rather than failing. Bridging + // means many MCP clients, the web UI, and the desktop app share one owner, + // and that owner's lifetime is never tied to a transient MCP client. + const active = yield* readActiveLocalServerManifest(); + if (active) { yield* Effect.promise(() => - runMcpStdioServer({ - executor: web.executor, - codeExecutor: makeQuickJsExecutor(), - elicitationMode: - input.elicitationMode === "browser" - ? { - mode: "browser" as const, - approvalUrl: (executionId) => - `${web.baseUrl}/resume/${encodeURIComponent(executionId)}`, - } - : { mode: input.elicitationMode }, - }), + runMcpHttpBridge({ manifest: active, elicitationMode: input.elicitationMode }), + ); + return; + } + + // No reachable owner yet: ensure one. If we lose the election (another + // process became owner first), ensureDaemon may fail, but the winner's + // manifest is then reachable, so re-read it and bridge to that instead. + const elected = yield* ensureDaemon(DEFAULT_BASE_URL).pipe( + Effect.flatMap(() => readActiveLocalServerManifest()), + Effect.catch((error) => + readActiveLocalServerManifest().pipe( + Effect.flatMap((manifest) => (manifest ? Effect.succeed(manifest) : Effect.fail(error))), + ), + ), + ); + if (!elected) { + return yield* Effect.fail( + new Error("The local Executor daemon started but did not advertise a reachable manifest."), ); - } finally { - web.restoreWebBaseUrl(); - yield* Effect.promise(() => web.server.stop()); - yield* removeLocalServerManifestIfOwnedBy({ pid: process.pid }).pipe(Effect.ignore); } + yield* Effect.promise(() => + runMcpHttpBridge({ manifest: elected, elicitationMode: input.elicitationMode }), + ); }); const scope = Options.string("scope").pipe( diff --git a/bun.lock b/bun.lock index 2bf0851f7..dcb4fed28 100644 --- a/bun.lock +++ b/bun.lock @@ -42,6 +42,7 @@ "@executor-js/runtime-quickjs": "workspace:*", "@executor-js/sdk": "workspace:*", "@jitl/quickjs-wasmfile-release-sync": "catalog:", + "@modelcontextprotocol/sdk": "^1.29.0", "@sentry/bun": "^10.57.0", "effect": "catalog:", "quickjs-emscripten": "catalog:", diff --git a/e2e/cli/election-cold-start.test.ts b/e2e/cli/election-cold-start.test.ts new file mode 100644 index 000000000..a1a4a7030 --- /dev/null +++ b/e2e/cli/election-cold-start.test.ts @@ -0,0 +1,160 @@ +// Cross-OS proof of the Phase 1 daemon election (apps/cli/src/main.ts). On a real +// guest OS, fire N `executor` clients at a COLD data dir simultaneously: each +// auto-starts a daemon via resolveExecutorServerConnection -> ensureDaemon -> +// spawnAndWaitForDaemon (the election). Before Phase 1 exactly one client won and +// the other N-1 hard-failed on the start-lock; the fix makes the losers wait for +// the winner's manifest and attach. So the assertion is simply: ALL N succeed, +// exactly one owner is elected, and it is reachable. A second (warm) wave proves +// the steady-state attach path. Uses a separate data dir from the service daemon +// that globalsetup installed on ~/.executor, so the two never collide. +// +// Runs on the cli-* VM targets (cli-linux / cli-macos on local tart; cli-windows +// on EC2). Drive everything over the same SSH the other cli tests use. +import { execFile } from "node:child_process"; +import { promisify } from "node:util"; + +import { expect, it } from "@effect/vitest"; + +const execFileAsync = promisify(execFile); + +const SSH_OPTS = [ + "-o", + "StrictHostKeyChecking=no", + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "ConnectTimeout=15", +] as const; + +type GuestOs = "macos" | "linux" | "windows"; + +const guestOs = (): GuestOs => { + const os = process.env.E2E_VM_OS; + if (os === "macos" || os === "linux" || os === "windows") return os; + throw new Error(`Unsupported E2E_VM_OS: ${os ?? ""}`); +}; + +const sshInvocation = (command: string): { command: string; args: ReadonlyArray } => { + const host = process.env.E2E_CLI_VM_HOST; + if (!host) throw new Error("E2E_CLI_VM_HOST is not set"); + const os = guestOs(); + const wrapped = + os === "linux" ? `export XDG_RUNTIME_DIR=/run/user/$(id -u); ${command}` : command; + const keyPath = process.env.E2E_CLI_SSH_KEY; + const user = os === "windows" ? "Administrator" : "admin"; + return keyPath + ? { command: "ssh", args: ["-i", keyPath, ...SSH_OPTS, `${user}@${host}`, wrapped] } + : { + command: process.env.E2E_SSHPASS_BIN ?? "/opt/homebrew/bin/sshpass", + args: ["-p", "admin", "ssh", ...SSH_OPTS, `${user}@${host}`, wrapped], + }; +}; + +const ssh = async (command: string): Promise<{ stdout: string; stderr: string; code: number }> => { + const invocation = sshInvocation(command); + try { + const { stdout, stderr } = await execFileAsync(invocation.command, [...invocation.args], { + maxBuffer: 64 * 1024 * 1024, + }); + return { stdout, stderr, code: 0 }; + } catch (error) { + const err = error as { stdout?: string; stderr?: string; code?: number }; + return { + stdout: err.stdout ?? "", + stderr: err.stderr ?? "", + code: typeof err.code === "number" ? err.code : 1, + }; + } +}; + +const exePath = (): string => { + const dir = process.env.E2E_CLI_BIN_DIR ?? (guestOs() === "windows" ? "C:/ed" : "~/ed"); + return guestOs() === "windows" ? `${dir}/executor.exe` : `${dir}/executor`; +}; + +const CLIENTS = 6; + +// One concurrent wave of N cold/warm clients against data dir D, emitting a +// single parseable summary line. Unix (linux+macos) flavor; the election logic +// under test is OS-agnostic, the shell around it is not. +const unixWaveScript = (exe: string, dir: string, n: number): string => + [ + "set -u", + `EXE=${exe}`, + `D=${dir}`, + // `timeout` is GNU coreutils: present on Linux, absent on the macOS guest + // (where it would be `gtimeout`). Fall back to no wrapper; the test's own + // 300s budget and `tools search` self-completing are the backstops. + "TO=$(command -v timeout 2>/dev/null || command -v gtimeout 2>/dev/null || true)", + "run_client() {", + ' if [ -n "$TO" ]; then', + ` "$TO" 120 env EXECUTOR_DATA_DIR="$D" EXECUTOR_SCOPE_DIR="$D" "$EXE" tools search "probe-$1" >"$D/out-$1" 2>&1`, + " else", + ` env EXECUTOR_DATA_DIR="$D" EXECUTOR_SCOPE_DIR="$D" "$EXE" tools search "probe-$1" >"$D/out-$1" 2>&1`, + " fi", + ' echo $? >"$D/rc-$1"', + "}", + `i=1; while [ $i -le ${n} ]; do run_client $i & i=$((i+1)); done; wait`, + "ok=0; spawned=0; i=1", + `while [ $i -le ${n} ]; do`, + ' rc=$(cat "$D/rc-$i" 2>/dev/null || echo X); [ "$rc" = "0" ] && ok=$((ok+1))', + ' grep -q "Starting daemon" "$D/out-$i" 2>/dev/null && spawned=$((spawned+1))', + " i=$((i+1)); done", + `manifests=$(ls "$D"/daemon-active-* 2>/dev/null | wc -l | tr -d ' ')`, + `port=$(cat "$D"/daemon-localhost-*.json 2>/dev/null | sed -n 's/.*"port":[ ]*\\([0-9]*\\).*/\\1/p' | head -1)`, + `health=$(curl -s -o /dev/null -w '%{http_code}' --max-time 5 "http://localhost:$port/api/health" 2>/dev/null || echo 000)`, + `echo "PROBE_SUMMARY ok=$ok n=${n} spawned=$spawned manifests=$manifests port=$port health=$health"`, + ].join("\n"); + +const parseSummary = (stdout: string): Record => { + const line = stdout.split("\n").find((l) => l.includes("PROBE_SUMMARY")); + if (!line) throw new Error(`no PROBE_SUMMARY in:\n${stdout}`); + const out: Record = {}; + for (const tok of line.replace("PROBE_SUMMARY", "").trim().split(/\s+/)) { + const [k, v] = tok.split("="); + if (k) out[k] = v ?? ""; + } + return out; +}; + +it("Cold-start election: N simultaneous clients all attach to one elected daemon", async () => { + if (guestOs() === "windows") { + // This SSH/unix-shell flavor does not cover the Windows guest. The same + // election is proven on real Windows by election-cold-start.win.ps1, run by + // hand against the long-lived dockur (QEMU) Windows guest (no EC2 spend): + // cold ok=6 n=6 spawned=1 ... / warm ok=6 n=6 spawned=0 ... (one winner, rest attach). + console.warn( + "election-cold-start: see election-cold-start.win.ps1 for the Windows proof; skipping", + ); + return; + } + const exe = exePath(); + const dir = "/tmp/election-probe"; + await ssh(`rm -rf ${dir} && mkdir -p ${dir}`); + + // WAVE 1: cold. No daemon owns `dir`. All N race the start-lock. + const cold = await ssh(unixWaveScript(exe, dir, CLIENTS)); + const c = parseSummary(cold.stdout); + console.log(`[cold] ${cold.stdout.split("\n").find((l) => l.includes("PROBE_SUMMARY"))}`); + expect(Number(c.ok), `all ${CLIENTS} cold clients succeeded (stdout:\n${cold.stdout})`).toBe( + CLIENTS, + ); + expect(Number(c.manifests), "exactly one daemon was elected").toBe(1); + expect(c.health, "the elected daemon answers /api/health").toBe("200"); + + // WAVE 2: warm. A daemon now owns `dir`; every client should attach. + const warm = await ssh(unixWaveScript(exe, dir, CLIENTS)); + const w = parseSummary(warm.stdout); + console.log(`[warm] ${warm.stdout.split("\n").find((l) => l.includes("PROBE_SUMMARY"))}`); + expect(Number(w.ok), `all ${CLIENTS} warm clients attached (stdout:\n${warm.stdout})`).toBe( + CLIENTS, + ); + expect(Number(w.manifests), "still exactly one daemon").toBe(1); + expect(w.health, "daemon still reachable").toBe("200"); + + // Leave the guest clean: stop only the daemon we elected (not the service + // daemon globalsetup installed on ~/.executor). + await ssh( + `pid=$(cat ${dir}/daemon-active-* 2>/dev/null | sed -n 's/.*"pid":[ ]*\\([0-9]*\\).*/\\1/p' | head -1); [ -n "$pid" ] && kill "$pid" 2>/dev/null; rm -rf ${dir}; true`, + ); +}, 300_000); diff --git a/e2e/cli/election-cold-start.win.ps1 b/e2e/cli/election-cold-start.win.ps1 new file mode 100644 index 000000000..53e78164e --- /dev/null +++ b/e2e/cli/election-cold-start.win.ps1 @@ -0,0 +1,56 @@ +# Windows companion to election-cold-start.test.ts: the Phase 1 daemon election, +# proven on real Windows. The vitest cli-windows target provisions via EC2; this +# script is the same probe, run by hand against any reachable Windows guest (e.g. +# a long-lived dockur/QEMU box) when avoiding cloud spend. +# +# How to run: +# 1. cross-build the binary: cd apps/cli; bun run src/build.ts binary --target executor-windows-x64 +# 2. push the bin dir to the guest C:\ed (scp the dist bin dir to Administrator@:C:/ed/) +# 3. push + run this script: ssh Administrator@ 'powershell -ExecutionPolicy Bypass -File C:\ed\probe.ps1' +# +# Expected (matches Linux/macOS): one winner spawns, the rest attach, all succeed. +# PROBE_SUMMARY-cold ok=6 n=6 spawned=1 manifests=1 port=4788 health=200 +# PROBE_SUMMARY-warm ok=6 n=6 spawned=0 manifests=1 port=4788 health=200 +param([int]$N = 6) +$Exe = "C:\ed\executor.exe" +$D = "C:\Users\administrator\eprobe" +$ErrorActionPreference = "SilentlyContinue" + +function Run-Wave($label) { + $procs = @() + for ($i = 1; $i -le $N; $i++) { + $o = "$D\out-$label-$i.txt"; $e = "$D\err-$label-$i.txt" + $p = Start-Process -FilePath $Exe -ArgumentList "tools", "search", "p$i" ` + -PassThru -NoNewWindow -RedirectStandardOutput $o -RedirectStandardError $e + $procs += $p + } + foreach ($p in $procs) { [void]$p.WaitForExit(120000) } + # Success signal is the round-trip result in the client's stdout, not the + # Start-Process ExitCode (which comes back null under -NoNewWindow on PS 5.1). + # The election winner prints "Starting daemon" to STDERR; losers attach silently. + $ok = 0; $spawned = 0 + for ($i = 1; $i -le $N; $i++) { + if (Select-String -Path "$D\out-$label-$i.txt" -Pattern '"total"' -Quiet) { $ok++ } + $sp = (Select-String -Path "$D\out-$label-$i.txt" -Pattern "Starting daemon" -Quiet) ` + -or (Select-String -Path "$D\err-$label-$i.txt" -Pattern "Starting daemon" -Quiet) + if ($sp) { $spawned++ } + } + $manifests = (Get-ChildItem "$D\daemon-active-*" -EA SilentlyContinue | Measure-Object).Count + $port = "" + $pf = Get-ChildItem "$D\daemon-localhost-*.json" -EA SilentlyContinue | Select-Object -First 1 + if ($pf) { $port = (Get-Content $pf.FullName -Raw | ConvertFrom-Json).port } + $health = "000" + if ($port) { + try { $health = [string](Invoke-WebRequest -UseBasicParsing -TimeoutSec 5 "http://localhost:$port/api/health").StatusCode } catch { $health = "ERR" } + } + Write-Output "PROBE_SUMMARY-$label ok=$ok n=$N spawned=$spawned manifests=$manifests port=$port health=$health" +} + +Remove-Item -Recurse -Force $D -EA SilentlyContinue +New-Item -ItemType Directory -Force -Path $D | Out-Null +$env:EXECUTOR_DATA_DIR = $D +$env:EXECUTOR_SCOPE_DIR = $D +Run-Wave "cold" +Run-Wave "warm" +$am = Get-ChildItem "$D\daemon-active-*" -EA SilentlyContinue | Select-Object -First 1 +if ($am) { $dp = (Get-Content $am.FullName -Raw | ConvertFrom-Json).pid; if ($dp) { Stop-Process -Id $dp -Force -EA SilentlyContinue } } diff --git a/e2e/local/cli-mcp-daemon-attach-stress.test.ts b/e2e/local/cli-mcp-daemon-attach-stress.test.ts new file mode 100644 index 000000000..95daad7cf --- /dev/null +++ b/e2e/local/cli-mcp-daemon-attach-stress.test.ts @@ -0,0 +1,342 @@ +// Stress for PR 1033 (`executor mcp` attaches to the active local daemon). The +// happy path is covered by cli-mcp-daemon-attach.test.ts; this hammers the +// concurrency the bridge introduced: +// +// A. ATTACH STORM — one daemon, many stdio bridges at once, each firing a +// burst of execute() calls. Stresses the StdioServerTransport ↔ +// StreamableHTTPClientTransport forwarding under load and the daemon's MCP +// session handling. A dropped/misrouted JSON-RPC reply shows up as a wrong +// or missing result. +// B. COLD-START RACE — NO daemon, many `executor mcp` started simultaneously. +// They race acquireLocalServerStartLock + the double-check +// readActiveLocalServerManifest the PR added: exactly one must start a +// server, the rest must bridge to it. If the guard is wrong, the losers +// trip the data-dir singleton ("another active local server") or collide on +// a port — surfacing as client failures. +// C. KILL UNDER LOAD — kill the daemon mid-bridge; the http transport's +// onclose must tear the bridge down so the `executor mcp` process exits +// instead of hanging (a leaked stdio child per crashed daemon would be the +// bug). +import { expect } from "@effect/vitest"; +import { Client } from "@modelcontextprotocol/sdk/client/index.js"; +import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; +import { Effect } from "effect"; +import { mkdtempSync, readdirSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { fileURLToPath } from "node:url"; +import type { Subprocess } from "bun"; + +import { scenario } from "../src/scenario"; + +const repoRoot = fileURLToPath(new URL("../../", import.meta.url)); +const testScope = join(repoRoot, "apps/local"); +// Generous: a dev-mode daemon boots a Vite dev server, slow under machine load. +const readyTimeoutMs = 150_000; + +type DaemonProc = Subprocess<"ignore", "pipe", "pipe">; + +const waitForDaemonReady = ( + proc: DaemonProc, +): Promise<{ readonly port: number; readonly stderr: () => string }> => + // oxlint-disable-next-line executor/no-promise-reject -- boundary: local e2e watches a real daemon process + new Promise((resolveReady, rejectReady) => { + let stdoutBuffer = ""; + let stderrBuffer = ""; + let settled = false; + const decoder = new TextDecoder(); + const stdout = proc.stdout.getReader(); + const stderr = proc.stderr.getReader(); + const deadline = setTimeout(() => { + if (settled) return; + settled = true; + // oxlint-disable-next-line executor/no-promise-reject, executor/no-error-constructor -- boundary: captured daemon stderr + rejectReady(new Error(`daemon did not announce ready: ${stderrBuffer}`)); + }, readyTimeoutMs); + void (async () => { + while (true) { + const { value, done } = await stderr.read(); + if (done) return; + stderrBuffer += decoder.decode(value); + } + })(); + void (async () => { + while (true) { + const { value, done } = await stdout.read(); + if (done) { + if (!settled) { + settled = true; + clearTimeout(deadline); + // oxlint-disable-next-line executor/no-promise-reject, executor/no-error-constructor -- boundary: captured daemon stderr + rejectReady(new Error(`daemon stdout closed before ready: ${stderrBuffer}`)); + } + return; + } + stdoutBuffer += decoder.decode(value); + const match = /Daemon ready on http:\/\/(?:\[[^\]]+\]|[^:\s]+):(\d+)/.exec(stdoutBuffer); + if (match) { + settled = true; + clearTimeout(deadline); + resolveReady({ port: Number(match[1]), stderr: () => stderrBuffer }); + return; + } + } + })(); + }); + +const spawnDaemon = (dataDir: string): DaemonProc => + Bun.spawn( + [ + "bun", + "run", + "dev:cli", + "daemon", + "run", + "--foreground", + "--port", + "0", + "--hostname", + "127.0.0.1", + "--scope", + testScope, + ], + { + cwd: repoRoot, + env: { ...process.env, EXECUTOR_DATA_DIR: dataDir }, + stdin: "ignore", + stdout: "pipe", + stderr: "pipe", + }, + ); + +const stopProc = async (proc: DaemonProc): Promise => { + if (proc.exitCode !== null) return; + proc.kill("SIGTERM"); + await Promise.race([proc.exited, Bun.sleep(3000)]); + if (proc.exitCode === null) proc.kill("SIGKILL"); +}; + +const startForegroundDaemon = (dataDir: string) => + Effect.acquireRelease( + Effect.gen(function* () { + const proc = spawnDaemon(dataDir); + const ready = yield* Effect.promise(() => waitForDaemonReady(proc)).pipe( + Effect.tapError(() => Effect.promise(() => stopProc(proc))), + ); + return { proc, port: ready.port, stderr: ready.stderr }; + }), + ({ proc }) => Effect.promise(() => stopProc(proc)), + ); + +interface ClientReport { + readonly id: number; + readonly ok: boolean; + readonly tools: number; + readonly results: ReadonlyArray; + readonly error?: string; +} + +/** One `executor mcp` stdio bridge: connect, list tools, fire `callCount` + * execute() calls (each computing a unique value), collect the text results, + * always close. Never throws — failures are captured in the report so the + * scenario can assert across the whole fleet. */ +const runOneClient = async ( + id: number, + dataDir: string, + callCount: number, +): Promise => { + const transport = new StdioClientTransport({ + command: "bun", + args: ["run", "dev:cli", "mcp", "--scope", testScope], + cwd: repoRoot, + env: { ...process.env, EXECUTOR_DATA_DIR: dataDir }, + stderr: "pipe", + }); + const client = new Client({ name: `stress-client-${id}`, version: "1.0.0" }); + const results: string[] = []; + let errBuf = ""; + transport.stderr?.on("data", (d: Buffer) => { + errBuf += d.toString(); + }); + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: capture per-client failure for fleet-wide assertions + try { + await client.connect(transport); + const { tools } = await client.listTools(); + for (let i = 0; i < callCount; i++) { + // A value unique to (client, call) so a misrouted reply is detectable. + const expr = `return ${id} * 1000 + ${i}`; + const r = await client.callTool({ name: "execute", arguments: { code: expr } }); + const text = (r.content as Array<{ type: string; text: string }>)[0]?.text ?? ""; + results.push(text); + } + return { id, ok: true, tools: tools.length, results }; + } catch (error) { + return { + id, + ok: false, + tools: 0, + results, + error: + (error instanceof Error ? error.message : String(error)) + + (errBuf + ? `\n stderr: ${errBuf.trim().split("\n").slice(-6).join("\n ")}` + : ""), + }; + } finally { + await transport.close().catch(() => undefined); + } +}; + +/** `executor mcp` now ensures a DURABLE (detached) daemon and bridges to it, so a + * cold-start scenario leaves that daemon running. Stop it before removing the + * data dir so the test never leaks an orphan daemon. */ +const stopAutoSpawnedDaemon = (dataDir: string): void => { + try { + const manifest = JSON.parse( + readFileSync(join(dataDir, "server-control", "server.json"), "utf8"), + ) as { pid?: number }; + if (manifest.pid) process.kill(manifest.pid, "SIGTERM"); + } catch { + // no manifest (no daemon spawned) — nothing to stop. + } +}; + +const withTempData = Effect.acquireRelease( + Effect.sync(() => { + const root = mkdtempSync(join(tmpdir(), "executor-mcp-stress-")); + return join(root, "data"); + }), + (dataDir) => + Effect.sync(() => { + stopAutoSpawnedDaemon(dataDir); + rmSync(join(dataDir, ".."), { recursive: true, force: true }); + }), +); + +const summarize = (label: string, reports: ReadonlyArray): void => { + const ok = reports.filter((r) => r.ok).length; + const calls = reports.reduce((n, r) => n + r.results.length, 0); + const failures = reports.filter((r) => !r.ok).map((r) => `#${r.id}: ${r.error}`); + // eslint-disable-next-line no-console + console.log( + `[stress:${label}] clients ${ok}/${reports.length} ok, ${calls} execute calls` + + (failures.length ? `\n failures:\n ${failures.join("\n ")}` : ""), + ); +}; + +const CONCURRENCY = Number(process.env.E2E_MCP_STRESS_CLIENTS ?? "10"); +const CALLS = Number(process.env.E2E_MCP_STRESS_CALLS ?? "5"); + +scenario( + "Local CLI MCP · an attach storm of concurrent stdio bridges all execute correctly", + { timeout: 240_000 }, + Effect.gen(function* () { + const dataDir = yield* withTempData; + const daemon = yield* startForegroundDaemon(dataDir); + + // Many bridges attach to the one daemon at once and each fires a burst. + const reports = yield* Effect.promise(() => + Promise.all(Array.from({ length: CONCURRENCY }, (_, id) => runOneClient(id, dataDir, CALLS))), + ); + summarize("attach-storm", reports); + + for (const report of reports) { + expect(report.ok, `client #${report.id} failed: ${report.error ?? ""}`).toBe(true); + expect(report.tools, `client #${report.id} listed tools`).toBeGreaterThan(0); + // Every reply is the value THIS client asked for — no cross-talk between + // concurrent bridges sharing the daemon. + report.results.forEach((text, i) => { + expect(text, `client #${report.id} call ${i} got its own result`).toContain( + String(report.id * 1000 + i), + ); + }); + } + expect(daemon.proc.exitCode, `daemon survived the storm:\n${daemon.stderr()}`).toBeNull(); + }).pipe(Effect.scoped), +); + +scenario( + "Local CLI MCP · a cold-start race elects exactly one server and every client attaches", + { timeout: 240_000 }, + Effect.gen(function* () { + const dataDir = yield* withTempData; + + // No daemon: launch the whole fleet simultaneously so they race the + // start-lock + double-check. Exactly one should start a server; the rest + // must bridge to it. A broken guard trips the singleton or a port collision. + const reports = yield* Effect.promise(() => + Promise.all(Array.from({ length: CONCURRENCY }, (_, id) => runOneClient(id, dataDir, CALLS))), + ); + summarize("cold-start-race", reports); + + for (const report of reports) { + expect(report.ok, `client #${report.id} lost the race: ${report.error ?? ""}`).toBe(true); + report.results.forEach((text, i) => { + expect(text, `client #${report.id} call ${i}`).toContain(String(report.id * 1000 + i)); + }); + } + + // Exactly one elected owner: a single server.json manifest, not N competing + // servers. (The bridges own no DB and write no manifest of their own.) + const controlDir = join(dataDir, "server-control"); + const serverManifests = (() => { + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: dir read + try { + return readdirSync(controlDir).filter((f) => f === "server.json"); + } catch { + return []; + } + })(); + expect(serverManifests.length, "exactly one elected server manifest").toBe(1); + }).pipe(Effect.scoped), +); + +scenario( + "Local CLI MCP · killing the daemon under load tears bridges down without hanging", + { timeout: 180_000 }, + Effect.gen(function* () { + const dataDir = yield* withTempData; + const daemon = yield* startForegroundDaemon(dataDir); + + // Attach a bridge, prove it works, then SIGKILL the daemon out from under it + // and confirm a subsequent call fails fast (the bridge tore down) rather than + // hanging until the test times out. + const transport = new StdioClientTransport({ + command: "bun", + args: ["run", "dev:cli", "mcp", "--scope", testScope], + cwd: repoRoot, + env: { ...process.env, EXECUTOR_DATA_DIR: dataDir }, + stderr: "pipe", + }); + const client = new Client({ name: "stress-kill", version: "1.0.0" }); + yield* Effect.acquireRelease( + Effect.promise(() => client.connect(transport)), + () => Effect.promise(() => transport.close().catch(() => undefined)), + ); + + const first = yield* Effect.promise(() => + client.callTool({ name: "execute", arguments: { code: "return 1 + 1" } }), + ); + expect((first.content as Array<{ text: string }>)[0]?.text, "bridge works pre-kill").toContain( + "2", + ); + + daemon.proc.kill("SIGKILL"); + yield* Effect.promise(() => Promise.race([daemon.proc.exited, Bun.sleep(3000)])); + + // The next call must settle (reject) quickly — a 10s bound well under the + // scenario timeout catches a hang. + const settled = yield* Effect.promise(() => + Promise.race([ + client + .callTool({ name: "execute", arguments: { code: "return 3" } }) + .then(() => "resolved" as const) + .catch(() => "rejected" as const), + Bun.sleep(10_000).then(() => "timeout" as const), + ]), + ); + // eslint-disable-next-line no-console + console.log(`[stress:kill-under-load] post-kill call → ${settled}`); + expect(settled, "a call after the daemon dies must not hang").not.toBe("timeout"); + }).pipe(Effect.scoped), +);