diff --git a/apps/cli/package.json b/apps/cli/package.json index b45e7d11c..607a73322 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -30,6 +30,7 @@ "@executor-js/runtime-quickjs": "workspace:*", "@executor-js/sdk": "workspace:*", "@jitl/quickjs-wasmfile-release-sync": "catalog:", + "@modelcontextprotocol/sdk": "^1.29.0", "@sentry/bun": "^10.57.0", "effect": "catalog:", "quickjs-emscripten": "catalog:" diff --git a/apps/cli/src/main.ts b/apps/cli/src/main.ts index be8955166..b4fed3c79 100644 --- a/apps/cli/src/main.ts +++ b/apps/cli/src/main.ts @@ -63,6 +63,9 @@ import type { PlatformError } from "effect/PlatformError"; import * as Effect from "effect/Effect"; import * as Option from "effect/Option"; import * as Cause from "effect/Cause"; +import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; +import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; +import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; import { ExecutorApi, checkForUpdate } from "@executor-js/api"; import { @@ -84,15 +87,12 @@ import { } from "./device-login"; import { startServer, - runMcpStdioServer, - getExecutor, rotateLocalAuthToken, localAuthTokenPath, findDataDirOwnershipHeld, type ServerInstance, type StartServerOptions, } from "@executor-js/local"; -import { makeQuickJsExecutor } from "@executor-js/runtime-quickjs"; import { fetchIntegrations } from "./integrations"; import { buildDaemonSpawnSpec, @@ -501,77 +501,134 @@ const waitForDaemonStartupTarget = (input: { // Serialize daemon startup behind a filesystem lock so concurrent CLI invocations don't // each spawn their own daemon. The post-lock pointer recheck catches the case where // another invocation finished bootstrapping while we were waiting for the lock. -const spawnAndWaitForDaemon = (input: { +// A storm of concurrent cold starts should elect ONE owner with the rest +// attaching, never N-1 hard failures. Each attempt either wins the per-scope +// start lock and spawns the daemon, or waits for the current holder's manifest. +// Re-acquiring after a wait timeout is what recovers the one window the wait +// alone cannot: acquireDaemonStartLock reclaims a STALE lock at acquisition, so +// a holder that took the lock then died BEFORE spawning (no daemon, no manifest) +// is recovered when a loser loops back and re-acquires. +const MAX_DAEMON_ELECTION_ATTEMPTS = 3; + +/** The stable message acquireDaemonStartLock fails with on genuine lock + * contention. Only this should be treated as "another process is electing"; + * any other failure (e.g. an unwritable data dir) must propagate, not masquerade + * as a race the caller should wait out. */ +const isStartLockContention = (error: Error): boolean => + error.message.includes("Another daemon startup is already in progress"); + +const spawnDaemonAsLockHolder = (input: { host: string; scopeId: string; preferredPort: number; allowedHosts: ReadonlyArray; }): Effect.Effect => Effect.gen(function* () { - const lock = yield* acquireDaemonStartLock({ hostname: input.host, scopeId: input.scopeId }); - - try { - const existing = yield* readDaemonPointer({ hostname: input.host, scopeId: input.scopeId }); - if (existing && isPidAlive(existing.pid)) { - const existingUrl = daemonBaseUrl(existing.hostname, existing.port); - if (yield* isServerReachable(existingUrl)) { - return existingUrl; - } + const existing = yield* readDaemonPointer({ hostname: input.host, scopeId: input.scopeId }); + if (existing && isPidAlive(existing.pid)) { + const existingUrl = daemonBaseUrl(existing.hostname, existing.port); + if (yield* isServerReachable(existingUrl)) { + return existingUrl; } + } - const selectedPort = yield* chooseDaemonPort({ - preferredPort: input.preferredPort, - hostname: input.host, - }); + const selectedPort = yield* chooseDaemonPort({ + preferredPort: input.preferredPort, + hostname: input.host, + }); - if (selectedPort !== input.preferredPort) { - console.error( - `Port ${input.preferredPort} is in use. Starting daemon on available port ${selectedPort} instead.`, - ); - } + if (selectedPort !== input.preferredPort) { + console.error( + `Port ${input.preferredPort} is in use. Starting daemon on available port ${selectedPort} instead.`, + ); + } - const spec = yield* Effect.try({ - try: () => - buildDaemonSpawnSpec({ - port: selectedPort, - hostname: input.host, - isDevMode, - scriptPath: script, - executablePath: process.execPath, - allowedHosts: input.allowedHosts, - }), - catch: (cause) => - cause instanceof Error - ? cause - : new Error(`Failed to build daemon command: ${String(cause)}`), - }); + const spec = yield* Effect.try({ + try: () => + buildDaemonSpawnSpec({ + port: selectedPort, + hostname: input.host, + isDevMode, + scriptPath: script, + executablePath: process.execPath, + allowedHosts: input.allowedHosts, + }), + catch: (cause) => + cause instanceof Error + ? cause + : new Error(`Failed to build daemon command: ${String(cause)}`), + }); - const startBaseUrl = daemonBaseUrl(input.host, selectedPort); - console.error(`Starting daemon on ${input.host}:${selectedPort}...`); - const child = yield* spawnDetached({ - command: spec.command, - args: spec.args, - env: process.env, - }); + const startBaseUrl = daemonBaseUrl(input.host, selectedPort); + console.error(`Starting daemon on ${input.host}:${selectedPort}...`); + const child = yield* spawnDetached({ + command: spec.command, + args: spec.args, + env: process.env, + }); - const readyBaseUrl = yield* waitForDaemonStartupTarget({ requestedBaseUrl: startBaseUrl }); + const readyBaseUrl = yield* waitForDaemonStartupTarget({ requestedBaseUrl: startBaseUrl }); - if (!readyBaseUrl) { - yield* terminateSpawnedDetachedProcess(child).pipe(Effect.ignore); - return yield* Effect.fail( - new Error( - [ - `Daemon did not become reachable at ${startBaseUrl} and no reachable local server manifest appeared within ${DAEMON_BOOT_TIMEOUT_MS}ms.`, - `Run in foreground to inspect logs: ${cliPrefix} daemon run --foreground --port ${selectedPort} --hostname ${input.host}`, - ].join("\n"), - ), + if (!readyBaseUrl) { + yield* terminateSpawnedDetachedProcess(child).pipe(Effect.ignore); + return yield* Effect.fail( + new Error( + [ + `Daemon did not become reachable at ${startBaseUrl} and no reachable local server manifest appeared within ${DAEMON_BOOT_TIMEOUT_MS}ms.`, + `Run in foreground to inspect logs: ${cliPrefix} daemon run --foreground --port ${selectedPort} --hostname ${input.host}`, + ].join("\n"), + ), + ); + } + + return readyBaseUrl; + }); + +const spawnAndWaitForDaemon = (input: { + host: string; + scopeId: string; + preferredPort: number; + allowedHosts: ReadonlyArray; +}): Effect.Effect => + Effect.gen(function* () { + const requestedBaseUrl = daemonBaseUrl(input.host, input.preferredPort); + + for (let attempt = 1; attempt <= MAX_DAEMON_ELECTION_ATTEMPTS; attempt++) { + const acquired = yield* acquireDaemonStartLock({ + hostname: input.host, + scopeId: input.scopeId, + }).pipe( + Effect.map((lock) => ({ held: true as const, lock })), + Effect.catch((error) => + isStartLockContention(error) + ? Effect.succeed({ held: false as const, lock: null }) + : Effect.fail(error), + ), + ); + + if (acquired.held) { + const lock = acquired.lock; + return yield* spawnDaemonAsLockHolder(input).pipe( + Effect.ensuring(releaseDaemonStartLock(lock).pipe(Effect.ignore)), ); } - return readyBaseUrl; - } finally { - yield* releaseDaemonStartLock(lock).pipe(Effect.ignore); + // Lost the lock: wait for the current holder to advertise a manifest. + const ready = yield* waitForDaemonStartupTarget({ requestedBaseUrl }); + if (ready) return ready; + // Timed out with no manifest. The holder may have died mid-startup; loop to + // re-acquire, which reclaims its now-stale lock. } + + return yield* Effect.fail( + new Error( + [ + `Could not elect or attach to a local Executor daemon after ${MAX_DAEMON_ELECTION_ATTEMPTS} attempts.`, + "A daemon startup may be stuck. Stop any partial daemon and retry, or run it in the foreground:", + `${cliPrefix} daemon run --foreground --port ${input.preferredPort} --hostname ${input.host}`, + ].join("\n"), + ), + ); }); // Auto-start a local daemon on demand so commands like `executor call` work without the @@ -1254,125 +1311,146 @@ const runBackgroundDaemonStart = (input: { }).pipe(Effect.mapError(toError)); // --------------------------------------------------------------------------- -// Stdio MCP session +// Stdio MCP session: a pure stdio <-> HTTP bridge to the owning local daemon. // --------------------------------------------------------------------------- -const withStdoutReroutedToStderr = async (body: () => Promise): Promise => { - const originalWrite = process.stdout.write; - const originalLog = console.log; - const originalInfo = console.info; - const originalDebug = console.debug; - const stderrWrite = process.stderr.write.bind(process.stderr); +const mcpUrlForActiveLocalServer = ( + connection: ExecutorServerConnection, + elicitationMode: "browser" | "model", +): URL => { + const url = new URL("/mcp", connection.origin); + if (elicitationMode === "browser") { + url.searchParams.set("elicitation_mode", "browser"); + } + return url; +}; + +/** + * Bridge a stdio MCP client to a local server's HTTP `/mcp` endpoint. `executor + * mcp` owns NO database: it forwards JSON-RPC between the client's stdin/stdout + * and the daemon over Streamable HTTP. That keeps any number of MCP clients (plus + * the web UI and the desktop app) attached to the single owning daemon at once, + * and means a transient MCP client exiting never takes the server down. Resolves + * when stdin closes or the daemon connection drops; close is best-effort. + */ +const runMcpHttpBridge = async (input: { + readonly manifest: ExecutorLocalServerManifest; + readonly elicitationMode: "browser" | "model"; +}): Promise => { + const stdio = new StdioServerTransport(); + const authorization = getExecutorServerAuthorizationHeader(input.manifest.connection); + const http = new StreamableHTTPClientTransport( + mcpUrlForActiveLocalServer(input.manifest.connection, input.elicitationMode), + authorization ? { requestInit: { headers: { Authorization: authorization } } } : undefined, + ); + + let finished = false; + let closing = false; + let closePromise: Promise | null = null; + let resolveExit: () => void = () => {}; + const waitForExit = new Promise((resolve) => { + resolveExit = resolve; + }); + + const finish = () => { + if (finished) return; + finished = true; + process.off("SIGINT", shutdown); + process.off("SIGTERM", shutdown); + process.stdin.off("end", shutdown); + process.stdin.off("close", shutdown); + resolveExit(); + }; + + const closeBoth = (): Promise => { + if (!closePromise) { + closing = true; + closePromise = Promise.allSettled([stdio.close(), http.close()]).then(() => undefined); + } + return closePromise; + }; - process.stdout.write = ((...args: Parameters) => - stderrWrite(...args)) as typeof process.stdout.write; - console.log = console.error.bind(console); - console.info = console.error.bind(console); - console.debug = console.error.bind(console); + function shutdown() { + finish(); + void closeBoth(); + } + + const isAbortDuringClose = (error: Error): boolean => + error.name === "AbortError" || error.message.toLowerCase().includes("aborted"); + + const reportError = (context: string, cause: unknown) => { + const error = toError(cause); + if (closing && isAbortDuringClose(error)) return; + console.error(`Executor MCP bridge ${context}: ${error.message}`); + }; + + const forwardMessage = + (send: (message: JSONRPCMessage) => Promise, context: string) => + (message: JSONRPCMessage) => { + void send(message).then(undefined, (cause: unknown) => { + reportError(context, cause); + shutdown(); + }); + }; + + process.once("SIGINT", shutdown); + process.once("SIGTERM", shutdown); + process.stdin.once("end", shutdown); + process.stdin.once("close", shutdown); + + stdio.onclose = shutdown; + http.onclose = shutdown; + stdio.onerror = (error) => reportError("stdio transport error", error); + http.onerror = (error) => reportError("daemon transport error", error); + stdio.onmessage = forwardMessage((message) => http.send(message), "failed to send to daemon"); + http.onmessage = forwardMessage((message) => stdio.send(message), "failed to send to stdio"); try { - return await body(); + await http.start(); + await stdio.start(); + await waitForExit; } finally { - process.stdout.write = originalWrite; - console.log = originalLog; - console.info = originalInfo; - console.debug = originalDebug; + finish(); + await closeBoth(); } }; const runStdioMcpSession = (input: { readonly elicitationMode: "browser" | "model" }) => Effect.gen(function* () { - // No process-level startup lock: the DB ownership lock inside startServer - // (openOwnedLocalDatabase) is the real gate. assertNoOtherActiveLocalServer - // is a friendly fast-path that may race without being unsafe. - yield* assertNoOtherActiveLocalServer(); - const web = yield* Effect.tryPromise({ - try: () => - withStdoutReroutedToStderr(async () => { - const host = "127.0.0.1"; - const port = await Effect.runPromise( - chooseDaemonPort({ preferredPort: DEFAULT_PORT, hostname: host }), - ); - const baseUrl = `http://localhost:${port}`; - const restoreWebBaseUrl = installDefaultExecutorWebBaseUrl(baseUrl); - - try { - const executor = await getExecutor(); - const server = await startServer({ - port, - hostname: host, - embeddedWebUI, - }); - const serverBaseUrl = `http://localhost:${server.port}`; - return { executor, server, baseUrl: serverBaseUrl, restoreWebBaseUrl }; - } catch (cause) { - restoreWebBaseUrl(); - throw cause; - } - }), - catch: (cause) => cause, - }).pipe( - Effect.catch((cause) => { - const ownership = findDataDirOwnershipHeld(cause); - if (!ownership) return Effect.fail(toError(cause)); - return Effect.gen(function* () { - const manifest = yield* readReachableLocalServerHint(); - const path = yield* PlatformPath.Path; - const dataDir = resolveExecutorDataDir(path); - if (manifest) { - return yield* Effect.fail( - new Error( - [ - `A local Executor server already owns ${dataDir} at ${manifest.connection.origin}.`, - "The stdio MCP server needs exclusive access to the local database.", - `Use the running HTTP MCP endpoint instead: ${manifest.connection.origin}/mcp`, - ].join("\n"), - ), - ); - } - return yield* Effect.fail( - new Error( - [ - "Executor data directory is owned by another live process, but no reachable local server was advertised.", - `Data directory: ${dataDir}`, - `Ownership lock: ${ownership.lockPath}`, - "Wait for the existing process to finish starting, or stop it and retry.", - ].join("\n"), - ), - ); - }); - }), - ); - yield* publishLocalServerManifest({ - kind: "foreground", - connection: normalizeExecutorServerConnection({ - kind: "http", - origin: web.baseUrl, - displayName: "CLI MCP", - auth: { kind: "bearer", token: web.server.authToken }, - }), - }); - - try { + // `executor mcp` never owns the local database. If a local server is already + // running, bridge this stdio client to it; otherwise ensure a durable + // background daemon is up and bridge to that. ensureDaemon is the race-safe + // election: concurrent cold starts elect one owner and the losers wait for + // its manifest (waitForDaemonStartupTarget) rather than failing. Bridging + // means many MCP clients, the web UI, and the desktop app share one owner, + // and that owner's lifetime is never tied to a transient MCP client. + const active = yield* readActiveLocalServerManifest(); + if (active) { yield* Effect.promise(() => - runMcpStdioServer({ - executor: web.executor, - codeExecutor: makeQuickJsExecutor(), - elicitationMode: - input.elicitationMode === "browser" - ? { - mode: "browser" as const, - approvalUrl: (executionId) => - `${web.baseUrl}/resume/${encodeURIComponent(executionId)}`, - } - : { mode: input.elicitationMode }, - }), + runMcpHttpBridge({ manifest: active, elicitationMode: input.elicitationMode }), + ); + return; + } + + // No reachable owner yet: ensure one. If we lose the election (another + // process became owner first), ensureDaemon may fail, but the winner's + // manifest is then reachable, so re-read it and bridge to that instead. + const elected = yield* ensureDaemon(DEFAULT_BASE_URL).pipe( + Effect.flatMap(() => readActiveLocalServerManifest()), + Effect.catch((error) => + readActiveLocalServerManifest().pipe( + Effect.flatMap((manifest) => (manifest ? Effect.succeed(manifest) : Effect.fail(error))), + ), + ), + ); + if (!elected) { + return yield* Effect.fail( + new Error("The local Executor daemon started but did not advertise a reachable manifest."), ); - } finally { - web.restoreWebBaseUrl(); - yield* Effect.promise(() => web.server.stop()); - yield* removeLocalServerManifestIfOwnedBy({ pid: process.pid }).pipe(Effect.ignore); } + yield* Effect.promise(() => + runMcpHttpBridge({ manifest: elected, elicitationMode: input.elicitationMode }), + ); }); const scope = Options.string("scope").pipe( diff --git a/bun.lock b/bun.lock index 2bf0851f7..dcb4fed28 100644 --- a/bun.lock +++ b/bun.lock @@ -42,6 +42,7 @@ "@executor-js/runtime-quickjs": "workspace:*", "@executor-js/sdk": "workspace:*", "@jitl/quickjs-wasmfile-release-sync": "catalog:", + "@modelcontextprotocol/sdk": "^1.29.0", "@sentry/bun": "^10.57.0", "effect": "catalog:", "quickjs-emscripten": "catalog:", diff --git a/e2e/cli/election-cold-start.test.ts b/e2e/cli/election-cold-start.test.ts new file mode 100644 index 000000000..a1a4a7030 --- /dev/null +++ b/e2e/cli/election-cold-start.test.ts @@ -0,0 +1,160 @@ +// Cross-OS proof of the Phase 1 daemon election (apps/cli/src/main.ts). On a real +// guest OS, fire N `executor` clients at a COLD data dir simultaneously: each +// auto-starts a daemon via resolveExecutorServerConnection -> ensureDaemon -> +// spawnAndWaitForDaemon (the election). Before Phase 1 exactly one client won and +// the other N-1 hard-failed on the start-lock; the fix makes the losers wait for +// the winner's manifest and attach. So the assertion is simply: ALL N succeed, +// exactly one owner is elected, and it is reachable. A second (warm) wave proves +// the steady-state attach path. Uses a separate data dir from the service daemon +// that globalsetup installed on ~/.executor, so the two never collide. +// +// Runs on the cli-* VM targets (cli-linux / cli-macos on local tart; cli-windows +// on EC2). Drive everything over the same SSH the other cli tests use. +import { execFile } from "node:child_process"; +import { promisify } from "node:util"; + +import { expect, it } from "@effect/vitest"; + +const execFileAsync = promisify(execFile); + +const SSH_OPTS = [ + "-o", + "StrictHostKeyChecking=no", + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "ConnectTimeout=15", +] as const; + +type GuestOs = "macos" | "linux" | "windows"; + +const guestOs = (): GuestOs => { + const os = process.env.E2E_VM_OS; + if (os === "macos" || os === "linux" || os === "windows") return os; + throw new Error(`Unsupported E2E_VM_OS: ${os ?? ""}`); +}; + +const sshInvocation = (command: string): { command: string; args: ReadonlyArray } => { + const host = process.env.E2E_CLI_VM_HOST; + if (!host) throw new Error("E2E_CLI_VM_HOST is not set"); + const os = guestOs(); + const wrapped = + os === "linux" ? `export XDG_RUNTIME_DIR=/run/user/$(id -u); ${command}` : command; + const keyPath = process.env.E2E_CLI_SSH_KEY; + const user = os === "windows" ? "Administrator" : "admin"; + return keyPath + ? { command: "ssh", args: ["-i", keyPath, ...SSH_OPTS, `${user}@${host}`, wrapped] } + : { + command: process.env.E2E_SSHPASS_BIN ?? "/opt/homebrew/bin/sshpass", + args: ["-p", "admin", "ssh", ...SSH_OPTS, `${user}@${host}`, wrapped], + }; +}; + +const ssh = async (command: string): Promise<{ stdout: string; stderr: string; code: number }> => { + const invocation = sshInvocation(command); + try { + const { stdout, stderr } = await execFileAsync(invocation.command, [...invocation.args], { + maxBuffer: 64 * 1024 * 1024, + }); + return { stdout, stderr, code: 0 }; + } catch (error) { + const err = error as { stdout?: string; stderr?: string; code?: number }; + return { + stdout: err.stdout ?? "", + stderr: err.stderr ?? "", + code: typeof err.code === "number" ? err.code : 1, + }; + } +}; + +const exePath = (): string => { + const dir = process.env.E2E_CLI_BIN_DIR ?? (guestOs() === "windows" ? "C:/ed" : "~/ed"); + return guestOs() === "windows" ? `${dir}/executor.exe` : `${dir}/executor`; +}; + +const CLIENTS = 6; + +// One concurrent wave of N cold/warm clients against data dir D, emitting a +// single parseable summary line. Unix (linux+macos) flavor; the election logic +// under test is OS-agnostic, the shell around it is not. +const unixWaveScript = (exe: string, dir: string, n: number): string => + [ + "set -u", + `EXE=${exe}`, + `D=${dir}`, + // `timeout` is GNU coreutils: present on Linux, absent on the macOS guest + // (where it would be `gtimeout`). Fall back to no wrapper; the test's own + // 300s budget and `tools search` self-completing are the backstops. + "TO=$(command -v timeout 2>/dev/null || command -v gtimeout 2>/dev/null || true)", + "run_client() {", + ' if [ -n "$TO" ]; then', + ` "$TO" 120 env EXECUTOR_DATA_DIR="$D" EXECUTOR_SCOPE_DIR="$D" "$EXE" tools search "probe-$1" >"$D/out-$1" 2>&1`, + " else", + ` env EXECUTOR_DATA_DIR="$D" EXECUTOR_SCOPE_DIR="$D" "$EXE" tools search "probe-$1" >"$D/out-$1" 2>&1`, + " fi", + ' echo $? >"$D/rc-$1"', + "}", + `i=1; while [ $i -le ${n} ]; do run_client $i & i=$((i+1)); done; wait`, + "ok=0; spawned=0; i=1", + `while [ $i -le ${n} ]; do`, + ' rc=$(cat "$D/rc-$i" 2>/dev/null || echo X); [ "$rc" = "0" ] && ok=$((ok+1))', + ' grep -q "Starting daemon" "$D/out-$i" 2>/dev/null && spawned=$((spawned+1))', + " i=$((i+1)); done", + `manifests=$(ls "$D"/daemon-active-* 2>/dev/null | wc -l | tr -d ' ')`, + `port=$(cat "$D"/daemon-localhost-*.json 2>/dev/null | sed -n 's/.*"port":[ ]*\\([0-9]*\\).*/\\1/p' | head -1)`, + `health=$(curl -s -o /dev/null -w '%{http_code}' --max-time 5 "http://localhost:$port/api/health" 2>/dev/null || echo 000)`, + `echo "PROBE_SUMMARY ok=$ok n=${n} spawned=$spawned manifests=$manifests port=$port health=$health"`, + ].join("\n"); + +const parseSummary = (stdout: string): Record => { + const line = stdout.split("\n").find((l) => l.includes("PROBE_SUMMARY")); + if (!line) throw new Error(`no PROBE_SUMMARY in:\n${stdout}`); + const out: Record = {}; + for (const tok of line.replace("PROBE_SUMMARY", "").trim().split(/\s+/)) { + const [k, v] = tok.split("="); + if (k) out[k] = v ?? ""; + } + return out; +}; + +it("Cold-start election: N simultaneous clients all attach to one elected daemon", async () => { + if (guestOs() === "windows") { + // This SSH/unix-shell flavor does not cover the Windows guest. The same + // election is proven on real Windows by election-cold-start.win.ps1, run by + // hand against the long-lived dockur (QEMU) Windows guest (no EC2 spend): + // cold ok=6 n=6 spawned=1 ... / warm ok=6 n=6 spawned=0 ... (one winner, rest attach). + console.warn( + "election-cold-start: see election-cold-start.win.ps1 for the Windows proof; skipping", + ); + return; + } + const exe = exePath(); + const dir = "/tmp/election-probe"; + await ssh(`rm -rf ${dir} && mkdir -p ${dir}`); + + // WAVE 1: cold. No daemon owns `dir`. All N race the start-lock. + const cold = await ssh(unixWaveScript(exe, dir, CLIENTS)); + const c = parseSummary(cold.stdout); + console.log(`[cold] ${cold.stdout.split("\n").find((l) => l.includes("PROBE_SUMMARY"))}`); + expect(Number(c.ok), `all ${CLIENTS} cold clients succeeded (stdout:\n${cold.stdout})`).toBe( + CLIENTS, + ); + expect(Number(c.manifests), "exactly one daemon was elected").toBe(1); + expect(c.health, "the elected daemon answers /api/health").toBe("200"); + + // WAVE 2: warm. A daemon now owns `dir`; every client should attach. + const warm = await ssh(unixWaveScript(exe, dir, CLIENTS)); + const w = parseSummary(warm.stdout); + console.log(`[warm] ${warm.stdout.split("\n").find((l) => l.includes("PROBE_SUMMARY"))}`); + expect(Number(w.ok), `all ${CLIENTS} warm clients attached (stdout:\n${warm.stdout})`).toBe( + CLIENTS, + ); + expect(Number(w.manifests), "still exactly one daemon").toBe(1); + expect(w.health, "daemon still reachable").toBe("200"); + + // Leave the guest clean: stop only the daemon we elected (not the service + // daemon globalsetup installed on ~/.executor). + await ssh( + `pid=$(cat ${dir}/daemon-active-* 2>/dev/null | sed -n 's/.*"pid":[ ]*\\([0-9]*\\).*/\\1/p' | head -1); [ -n "$pid" ] && kill "$pid" 2>/dev/null; rm -rf ${dir}; true`, + ); +}, 300_000); diff --git a/e2e/cli/election-cold-start.win.ps1 b/e2e/cli/election-cold-start.win.ps1 new file mode 100644 index 000000000..53e78164e --- /dev/null +++ b/e2e/cli/election-cold-start.win.ps1 @@ -0,0 +1,56 @@ +# Windows companion to election-cold-start.test.ts: the Phase 1 daemon election, +# proven on real Windows. The vitest cli-windows target provisions via EC2; this +# script is the same probe, run by hand against any reachable Windows guest (e.g. +# a long-lived dockur/QEMU box) when avoiding cloud spend. +# +# How to run: +# 1. cross-build the binary: cd apps/cli; bun run src/build.ts binary --target executor-windows-x64 +# 2. push the bin dir to the guest C:\ed (scp the dist bin dir to Administrator@:C:/ed/) +# 3. push + run this script: ssh Administrator@ 'powershell -ExecutionPolicy Bypass -File C:\ed\probe.ps1' +# +# Expected (matches Linux/macOS): one winner spawns, the rest attach, all succeed. +# PROBE_SUMMARY-cold ok=6 n=6 spawned=1 manifests=1 port=4788 health=200 +# PROBE_SUMMARY-warm ok=6 n=6 spawned=0 manifests=1 port=4788 health=200 +param([int]$N = 6) +$Exe = "C:\ed\executor.exe" +$D = "C:\Users\administrator\eprobe" +$ErrorActionPreference = "SilentlyContinue" + +function Run-Wave($label) { + $procs = @() + for ($i = 1; $i -le $N; $i++) { + $o = "$D\out-$label-$i.txt"; $e = "$D\err-$label-$i.txt" + $p = Start-Process -FilePath $Exe -ArgumentList "tools", "search", "p$i" ` + -PassThru -NoNewWindow -RedirectStandardOutput $o -RedirectStandardError $e + $procs += $p + } + foreach ($p in $procs) { [void]$p.WaitForExit(120000) } + # Success signal is the round-trip result in the client's stdout, not the + # Start-Process ExitCode (which comes back null under -NoNewWindow on PS 5.1). + # The election winner prints "Starting daemon" to STDERR; losers attach silently. + $ok = 0; $spawned = 0 + for ($i = 1; $i -le $N; $i++) { + if (Select-String -Path "$D\out-$label-$i.txt" -Pattern '"total"' -Quiet) { $ok++ } + $sp = (Select-String -Path "$D\out-$label-$i.txt" -Pattern "Starting daemon" -Quiet) ` + -or (Select-String -Path "$D\err-$label-$i.txt" -Pattern "Starting daemon" -Quiet) + if ($sp) { $spawned++ } + } + $manifests = (Get-ChildItem "$D\daemon-active-*" -EA SilentlyContinue | Measure-Object).Count + $port = "" + $pf = Get-ChildItem "$D\daemon-localhost-*.json" -EA SilentlyContinue | Select-Object -First 1 + if ($pf) { $port = (Get-Content $pf.FullName -Raw | ConvertFrom-Json).port } + $health = "000" + if ($port) { + try { $health = [string](Invoke-WebRequest -UseBasicParsing -TimeoutSec 5 "http://localhost:$port/api/health").StatusCode } catch { $health = "ERR" } + } + Write-Output "PROBE_SUMMARY-$label ok=$ok n=$N spawned=$spawned manifests=$manifests port=$port health=$health" +} + +Remove-Item -Recurse -Force $D -EA SilentlyContinue +New-Item -ItemType Directory -Force -Path $D | Out-Null +$env:EXECUTOR_DATA_DIR = $D +$env:EXECUTOR_SCOPE_DIR = $D +Run-Wave "cold" +Run-Wave "warm" +$am = Get-ChildItem "$D\daemon-active-*" -EA SilentlyContinue | Select-Object -First 1 +if ($am) { $dp = (Get-Content $am.FullName -Raw | ConvertFrom-Json).pid; if ($dp) { Stop-Process -Id $dp -Force -EA SilentlyContinue } } diff --git a/e2e/local/cli-mcp-daemon-attach-stress.test.ts b/e2e/local/cli-mcp-daemon-attach-stress.test.ts new file mode 100644 index 000000000..95daad7cf --- /dev/null +++ b/e2e/local/cli-mcp-daemon-attach-stress.test.ts @@ -0,0 +1,342 @@ +// Stress for PR 1033 (`executor mcp` attaches to the active local daemon). The +// happy path is covered by cli-mcp-daemon-attach.test.ts; this hammers the +// concurrency the bridge introduced: +// +// A. ATTACH STORM — one daemon, many stdio bridges at once, each firing a +// burst of execute() calls. Stresses the StdioServerTransport ↔ +// StreamableHTTPClientTransport forwarding under load and the daemon's MCP +// session handling. A dropped/misrouted JSON-RPC reply shows up as a wrong +// or missing result. +// B. COLD-START RACE — NO daemon, many `executor mcp` started simultaneously. +// They race acquireLocalServerStartLock + the double-check +// readActiveLocalServerManifest the PR added: exactly one must start a +// server, the rest must bridge to it. If the guard is wrong, the losers +// trip the data-dir singleton ("another active local server") or collide on +// a port — surfacing as client failures. +// C. KILL UNDER LOAD — kill the daemon mid-bridge; the http transport's +// onclose must tear the bridge down so the `executor mcp` process exits +// instead of hanging (a leaked stdio child per crashed daemon would be the +// bug). +import { expect } from "@effect/vitest"; +import { Client } from "@modelcontextprotocol/sdk/client/index.js"; +import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; +import { Effect } from "effect"; +import { mkdtempSync, readdirSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { fileURLToPath } from "node:url"; +import type { Subprocess } from "bun"; + +import { scenario } from "../src/scenario"; + +const repoRoot = fileURLToPath(new URL("../../", import.meta.url)); +const testScope = join(repoRoot, "apps/local"); +// Generous: a dev-mode daemon boots a Vite dev server, slow under machine load. +const readyTimeoutMs = 150_000; + +type DaemonProc = Subprocess<"ignore", "pipe", "pipe">; + +const waitForDaemonReady = ( + proc: DaemonProc, +): Promise<{ readonly port: number; readonly stderr: () => string }> => + // oxlint-disable-next-line executor/no-promise-reject -- boundary: local e2e watches a real daemon process + new Promise((resolveReady, rejectReady) => { + let stdoutBuffer = ""; + let stderrBuffer = ""; + let settled = false; + const decoder = new TextDecoder(); + const stdout = proc.stdout.getReader(); + const stderr = proc.stderr.getReader(); + const deadline = setTimeout(() => { + if (settled) return; + settled = true; + // oxlint-disable-next-line executor/no-promise-reject, executor/no-error-constructor -- boundary: captured daemon stderr + rejectReady(new Error(`daemon did not announce ready: ${stderrBuffer}`)); + }, readyTimeoutMs); + void (async () => { + while (true) { + const { value, done } = await stderr.read(); + if (done) return; + stderrBuffer += decoder.decode(value); + } + })(); + void (async () => { + while (true) { + const { value, done } = await stdout.read(); + if (done) { + if (!settled) { + settled = true; + clearTimeout(deadline); + // oxlint-disable-next-line executor/no-promise-reject, executor/no-error-constructor -- boundary: captured daemon stderr + rejectReady(new Error(`daemon stdout closed before ready: ${stderrBuffer}`)); + } + return; + } + stdoutBuffer += decoder.decode(value); + const match = /Daemon ready on http:\/\/(?:\[[^\]]+\]|[^:\s]+):(\d+)/.exec(stdoutBuffer); + if (match) { + settled = true; + clearTimeout(deadline); + resolveReady({ port: Number(match[1]), stderr: () => stderrBuffer }); + return; + } + } + })(); + }); + +const spawnDaemon = (dataDir: string): DaemonProc => + Bun.spawn( + [ + "bun", + "run", + "dev:cli", + "daemon", + "run", + "--foreground", + "--port", + "0", + "--hostname", + "127.0.0.1", + "--scope", + testScope, + ], + { + cwd: repoRoot, + env: { ...process.env, EXECUTOR_DATA_DIR: dataDir }, + stdin: "ignore", + stdout: "pipe", + stderr: "pipe", + }, + ); + +const stopProc = async (proc: DaemonProc): Promise => { + if (proc.exitCode !== null) return; + proc.kill("SIGTERM"); + await Promise.race([proc.exited, Bun.sleep(3000)]); + if (proc.exitCode === null) proc.kill("SIGKILL"); +}; + +const startForegroundDaemon = (dataDir: string) => + Effect.acquireRelease( + Effect.gen(function* () { + const proc = spawnDaemon(dataDir); + const ready = yield* Effect.promise(() => waitForDaemonReady(proc)).pipe( + Effect.tapError(() => Effect.promise(() => stopProc(proc))), + ); + return { proc, port: ready.port, stderr: ready.stderr }; + }), + ({ proc }) => Effect.promise(() => stopProc(proc)), + ); + +interface ClientReport { + readonly id: number; + readonly ok: boolean; + readonly tools: number; + readonly results: ReadonlyArray; + readonly error?: string; +} + +/** One `executor mcp` stdio bridge: connect, list tools, fire `callCount` + * execute() calls (each computing a unique value), collect the text results, + * always close. Never throws — failures are captured in the report so the + * scenario can assert across the whole fleet. */ +const runOneClient = async ( + id: number, + dataDir: string, + callCount: number, +): Promise => { + const transport = new StdioClientTransport({ + command: "bun", + args: ["run", "dev:cli", "mcp", "--scope", testScope], + cwd: repoRoot, + env: { ...process.env, EXECUTOR_DATA_DIR: dataDir }, + stderr: "pipe", + }); + const client = new Client({ name: `stress-client-${id}`, version: "1.0.0" }); + const results: string[] = []; + let errBuf = ""; + transport.stderr?.on("data", (d: Buffer) => { + errBuf += d.toString(); + }); + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: capture per-client failure for fleet-wide assertions + try { + await client.connect(transport); + const { tools } = await client.listTools(); + for (let i = 0; i < callCount; i++) { + // A value unique to (client, call) so a misrouted reply is detectable. + const expr = `return ${id} * 1000 + ${i}`; + const r = await client.callTool({ name: "execute", arguments: { code: expr } }); + const text = (r.content as Array<{ type: string; text: string }>)[0]?.text ?? ""; + results.push(text); + } + return { id, ok: true, tools: tools.length, results }; + } catch (error) { + return { + id, + ok: false, + tools: 0, + results, + error: + (error instanceof Error ? error.message : String(error)) + + (errBuf + ? `\n stderr: ${errBuf.trim().split("\n").slice(-6).join("\n ")}` + : ""), + }; + } finally { + await transport.close().catch(() => undefined); + } +}; + +/** `executor mcp` now ensures a DURABLE (detached) daemon and bridges to it, so a + * cold-start scenario leaves that daemon running. Stop it before removing the + * data dir so the test never leaks an orphan daemon. */ +const stopAutoSpawnedDaemon = (dataDir: string): void => { + try { + const manifest = JSON.parse( + readFileSync(join(dataDir, "server-control", "server.json"), "utf8"), + ) as { pid?: number }; + if (manifest.pid) process.kill(manifest.pid, "SIGTERM"); + } catch { + // no manifest (no daemon spawned) — nothing to stop. + } +}; + +const withTempData = Effect.acquireRelease( + Effect.sync(() => { + const root = mkdtempSync(join(tmpdir(), "executor-mcp-stress-")); + return join(root, "data"); + }), + (dataDir) => + Effect.sync(() => { + stopAutoSpawnedDaemon(dataDir); + rmSync(join(dataDir, ".."), { recursive: true, force: true }); + }), +); + +const summarize = (label: string, reports: ReadonlyArray): void => { + const ok = reports.filter((r) => r.ok).length; + const calls = reports.reduce((n, r) => n + r.results.length, 0); + const failures = reports.filter((r) => !r.ok).map((r) => `#${r.id}: ${r.error}`); + // eslint-disable-next-line no-console + console.log( + `[stress:${label}] clients ${ok}/${reports.length} ok, ${calls} execute calls` + + (failures.length ? `\n failures:\n ${failures.join("\n ")}` : ""), + ); +}; + +const CONCURRENCY = Number(process.env.E2E_MCP_STRESS_CLIENTS ?? "10"); +const CALLS = Number(process.env.E2E_MCP_STRESS_CALLS ?? "5"); + +scenario( + "Local CLI MCP · an attach storm of concurrent stdio bridges all execute correctly", + { timeout: 240_000 }, + Effect.gen(function* () { + const dataDir = yield* withTempData; + const daemon = yield* startForegroundDaemon(dataDir); + + // Many bridges attach to the one daemon at once and each fires a burst. + const reports = yield* Effect.promise(() => + Promise.all(Array.from({ length: CONCURRENCY }, (_, id) => runOneClient(id, dataDir, CALLS))), + ); + summarize("attach-storm", reports); + + for (const report of reports) { + expect(report.ok, `client #${report.id} failed: ${report.error ?? ""}`).toBe(true); + expect(report.tools, `client #${report.id} listed tools`).toBeGreaterThan(0); + // Every reply is the value THIS client asked for — no cross-talk between + // concurrent bridges sharing the daemon. + report.results.forEach((text, i) => { + expect(text, `client #${report.id} call ${i} got its own result`).toContain( + String(report.id * 1000 + i), + ); + }); + } + expect(daemon.proc.exitCode, `daemon survived the storm:\n${daemon.stderr()}`).toBeNull(); + }).pipe(Effect.scoped), +); + +scenario( + "Local CLI MCP · a cold-start race elects exactly one server and every client attaches", + { timeout: 240_000 }, + Effect.gen(function* () { + const dataDir = yield* withTempData; + + // No daemon: launch the whole fleet simultaneously so they race the + // start-lock + double-check. Exactly one should start a server; the rest + // must bridge to it. A broken guard trips the singleton or a port collision. + const reports = yield* Effect.promise(() => + Promise.all(Array.from({ length: CONCURRENCY }, (_, id) => runOneClient(id, dataDir, CALLS))), + ); + summarize("cold-start-race", reports); + + for (const report of reports) { + expect(report.ok, `client #${report.id} lost the race: ${report.error ?? ""}`).toBe(true); + report.results.forEach((text, i) => { + expect(text, `client #${report.id} call ${i}`).toContain(String(report.id * 1000 + i)); + }); + } + + // Exactly one elected owner: a single server.json manifest, not N competing + // servers. (The bridges own no DB and write no manifest of their own.) + const controlDir = join(dataDir, "server-control"); + const serverManifests = (() => { + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: dir read + try { + return readdirSync(controlDir).filter((f) => f === "server.json"); + } catch { + return []; + } + })(); + expect(serverManifests.length, "exactly one elected server manifest").toBe(1); + }).pipe(Effect.scoped), +); + +scenario( + "Local CLI MCP · killing the daemon under load tears bridges down without hanging", + { timeout: 180_000 }, + Effect.gen(function* () { + const dataDir = yield* withTempData; + const daemon = yield* startForegroundDaemon(dataDir); + + // Attach a bridge, prove it works, then SIGKILL the daemon out from under it + // and confirm a subsequent call fails fast (the bridge tore down) rather than + // hanging until the test times out. + const transport = new StdioClientTransport({ + command: "bun", + args: ["run", "dev:cli", "mcp", "--scope", testScope], + cwd: repoRoot, + env: { ...process.env, EXECUTOR_DATA_DIR: dataDir }, + stderr: "pipe", + }); + const client = new Client({ name: "stress-kill", version: "1.0.0" }); + yield* Effect.acquireRelease( + Effect.promise(() => client.connect(transport)), + () => Effect.promise(() => transport.close().catch(() => undefined)), + ); + + const first = yield* Effect.promise(() => + client.callTool({ name: "execute", arguments: { code: "return 1 + 1" } }), + ); + expect((first.content as Array<{ text: string }>)[0]?.text, "bridge works pre-kill").toContain( + "2", + ); + + daemon.proc.kill("SIGKILL"); + yield* Effect.promise(() => Promise.race([daemon.proc.exited, Bun.sleep(3000)])); + + // The next call must settle (reject) quickly — a 10s bound well under the + // scenario timeout catches a hang. + const settled = yield* Effect.promise(() => + Promise.race([ + client + .callTool({ name: "execute", arguments: { code: "return 3" } }) + .then(() => "resolved" as const) + .catch(() => "rejected" as const), + Bun.sleep(10_000).then(() => "timeout" as const), + ]), + ); + // eslint-disable-next-line no-console + console.log(`[stress:kill-under-load] post-kill call → ${settled}`); + expect(settled, "a call after the daemon dies must not hang").not.toBe("timeout"); + }).pipe(Effect.scoped), +);