diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 9e0ad63aa..38e12a3c6 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -1920,6 +1920,7 @@ export const createExecutor = { * facades (e.g. a content-addressed spec blob) instead of inlining them * in `config`. */ readonly storage: TStore; + readonly httpClientLayer: Layer.Layer; /** The connection whose tools are being resolved. */ readonly connection: ConnectionRef; /** Which of the integration's declared auth methods the connection binds diff --git a/packages/plugins/mcp/src/sdk/connection.ts b/packages/plugins/mcp/src/sdk/connection.ts index f629c594d..1e1e2567b 100644 --- a/packages/plugins/mcp/src/sdk/connection.ts +++ b/packages/plugins/mcp/src/sdk/connection.ts @@ -2,8 +2,10 @@ import type { OAuthClientProvider } from "@modelcontextprotocol/sdk/client/auth. import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; +import type { FetchLike } from "@modelcontextprotocol/sdk/shared/transport.js"; import { CfWorkerJsonSchemaValidator } from "@modelcontextprotocol/sdk/validation/cfworker"; -import { Effect, Predicate } from "effect"; +import { Effect, Layer, Predicate, Stream } from "effect"; +import { HttpClient, HttpClientRequest } from "effect/unstable/http"; // NOTE: `StdioClientTransport` is NOT imported eagerly. The upstream module // (`@modelcontextprotocol/sdk/client/stdio.js`) touches `node:child_process` @@ -42,6 +44,7 @@ export type RemoteConnectorInput = Omit< readonly headers?: Record; readonly queryParams?: Record; readonly authProvider?: OAuthClientProvider; + readonly httpClientLayer?: Layer.Layer; }; export type StdioConnectorInput = McpStdioIntegrationConfig; @@ -60,6 +63,100 @@ const buildEndpointUrl = (endpoint: string, queryParams: Record) return url; }; +type HttpMethod = Parameters[0]; +const HTTP_METHODS = new Set([ + "DELETE", + "GET", + "HEAD", + "OPTIONS", + "PATCH", + "POST", + "PUT", +]); + +const httpMethodFrom = (method: string | undefined): HttpMethod => { + const normalized = (method ?? "GET").toUpperCase() as HttpMethod; + return HTTP_METHODS.has(normalized) ? normalized : "POST"; +}; + +const headersFrom = (headers: HeadersInit | undefined): Headers => + headers ? new Headers(headers) : new Headers(); + +const recordFromHeaders = (headers: Headers): Record => + Object.fromEntries(headers.entries()); + +const applyBody = async ( + request: HttpClientRequest.HttpClientRequest, + headers: Headers, + body: BodyInit | null | undefined, +): Promise => { + if (body == null) return request; + const contentType = headers.get("content-type") ?? undefined; + if (typeof body === "string") return HttpClientRequest.bodyText(request, body, contentType); + if (body instanceof URLSearchParams) { + return HttpClientRequest.bodyText( + request, + body.toString(), + contentType ?? "application/x-www-form-urlencoded;charset=UTF-8", + ); + } + if (body instanceof Uint8Array) + return HttpClientRequest.bodyUint8Array(request, body, contentType); + if (body instanceof ArrayBuffer) { + return HttpClientRequest.bodyUint8Array(request, new Uint8Array(body), contentType); + } + const bytes = new Uint8Array(await new Response(body).arrayBuffer()); + return HttpClientRequest.bodyUint8Array(request, bytes, contentType); +}; + +const abortError = (signal: AbortSignal): unknown => { + if (signal.reason !== undefined) return signal.reason; + // oxlint-disable-next-line executor/no-error-constructor -- boundary: Fetch-compatible adapter must reject with an AbortError-shaped value + const error = new Error("The operation was aborted"); + error.name = "AbortError"; + return error; +}; + +const fetchFromHttpClientLayer = ( + httpClientLayer: Layer.Layer, +): FetchLike => { + const execute: FetchLike = async (url, init) => { + const headers = headersFrom(init?.headers); + const requestWithoutBody = HttpClientRequest.make(httpMethodFrom(init?.method))(url, { + headers: recordFromHeaders(headers), + }); + const request = await applyBody(requestWithoutBody, headers, init?.body); + const effect = Effect.gen(function* () { + const client = yield* HttpClient.HttpClient; + const response = yield* client.execute(request); + const responseHeaders = new Headers(); + for (const [key, value] of Object.entries(response.headers)) { + if (value !== undefined) responseHeaders.set(key, value); + } + const body = + response.status === 204 || response.status === 205 || response.status === 304 + ? null + : Stream.toReadableStream(response.stream); + return new Response(body, { + status: response.status, + headers: responseHeaders, + }); + }).pipe(Effect.provide(httpClientLayer)); + const promise = Effect.runPromise(effect); + if (!init?.signal) return promise; + // oxlint-disable-next-line executor/no-promise-reject -- boundary: Fetch-compatible adapter mirrors abort rejection semantics + if (init.signal.aborted) return Promise.reject(abortError(init.signal)); + const aborted = new Promise((_, reject) => { + // oxlint-disable-next-line executor/no-promise-reject -- boundary: Fetch-compatible adapter races the Effect request against AbortSignal + init.signal?.addEventListener("abort", () => reject(abortError(init.signal!)), { + once: true, + }); + }); + return Promise.race([promise, aborted]); + }; + return execute; +}; + // Use the cfworker JSON Schema validator instead of the SDK's default // (Ajv). Ajv compiles schemas via `new Function(...)`, which throws // `Code generation from strings disallowed for this context` when the @@ -157,6 +254,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => { const headers = input.headers ?? {}; const remoteTransport = input.remoteTransport ?? "auto"; const requestInit = Object.keys(headers).length > 0 ? { headers } : undefined; + const fetch = input.httpClientLayer ? fetchFromHttpClientLayer(input.httpClientLayer) : undefined; const endpoint = buildEndpointUrl(input.endpoint, input.queryParams ?? {}); @@ -166,6 +264,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => { new StreamableHTTPClientTransport(endpoint, { requestInit, authProvider: input.authProvider, + fetch, }), }); @@ -175,6 +274,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => { new SSEClientTransport(endpoint, { requestInit, authProvider: input.authProvider, + fetch, }), }); diff --git a/packages/plugins/mcp/src/sdk/plugin.test.ts b/packages/plugins/mcp/src/sdk/plugin.test.ts index 9d0b0cb3d..2e6001bbf 100644 --- a/packages/plugins/mcp/src/sdk/plugin.test.ts +++ b/packages/plugins/mcp/src/sdk/plugin.test.ts @@ -1,6 +1,11 @@ import { describe, expect, it } from "@effect/vitest"; -import { Effect, Option, Predicate, Schema } from "effect"; -import { HttpServerResponse } from "effect/unstable/http"; +import { Effect, Layer, Option, Predicate, Schema } from "effect"; +import { + HttpClient, + HttpClientRequest, + HttpClientResponse, + HttpServerResponse, +} from "effect/unstable/http"; import { AuthTemplateSlug, @@ -17,6 +22,7 @@ import { serveTestHttpApp, } from "@executor-js/sdk/testing"; +import { createMcpConnector } from "./connection"; import { mcpPlugin, userFacingProbeMessage } from "./plugin"; import { McpInvocationError } from "./errors"; import { extractManifestFromListToolsResult, deriveMcpNamespace, joinToolPath } from "./manifest"; @@ -299,6 +305,30 @@ describe("mcpPlugin", () => { }), ); + it.effect("routes remote connector traffic through the provided HttpClient layer", () => + Effect.gen(function* () { + const seen: string[] = []; + const httpClientLayer = Layer.succeed(HttpClient.HttpClient)( + HttpClient.make((request: HttpClientRequest.HttpClientRequest) => { + seen.push(request.url); + return Effect.succeed( + HttpClientResponse.fromWeb(request, new Response("blocked", { status: 403 })), + ); + }), + ); + + const error = yield* createMcpConnector({ + transport: "remote", + endpoint: "https://internal.example/mcp", + remoteTransport: "streamable-http", + httpClientLayer, + }).pipe(Effect.flip); + + expect(Predicate.isTagged(error, "McpConnectionError")).toBe(true); + expect(seen).toEqual(["https://internal.example/mcp"]); + }), + ); + it.effect("integration catalog has no configured MCP integrations initially", () => Effect.gen(function* () { const executor = yield* createExecutor(makeTestConfig({ plugins: [mcpPlugin()] as const })); diff --git a/packages/plugins/mcp/src/sdk/plugin.ts b/packages/plugins/mcp/src/sdk/plugin.ts index dd0228da2..d676efc86 100644 --- a/packages/plugins/mcp/src/sdk/plugin.ts +++ b/packages/plugins/mcp/src/sdk/plugin.ts @@ -469,6 +469,7 @@ const buildConnectorInput = ( values: Record, templateSlug: string | null, allowStdio: boolean, + httpClientLayer?: Layer.Layer, ): Effect.Effect => { if (config.transport === "stdio") { if (!allowStdio) { @@ -512,6 +513,7 @@ const buildConnectorInput = ( queryParams: Object.keys(queryParams).length > 0 ? queryParams : undefined, headers: Object.keys(headers).length > 0 ? headers : undefined, authProvider, + httpClientLayer, }); }; @@ -636,6 +638,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => { endpoint: trimmed, headers: probeHeaders, queryParams: probeQueryParams, + httpClientLayer, }); const result = yield* discoverTools(connector).pipe( @@ -929,7 +932,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => { // Discovery failures (auth not ready, server down) yield an empty tool set // rather than failing — the connection still lands and can be refreshed. // ----------------------------------------------------------------------- - resolveTools: ({ config, connection, template, getValues }) => + resolveTools: ({ config, connection, template, getValues, httpClientLayer }) => Effect.gen(function* () { const parsed = parseMcpIntegrationConfig(config); if (!parsed) return { tools: [] as readonly ToolDef[] }; @@ -945,6 +948,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => { values, template === null ? null : String(template), allowStdio, + httpClientLayer, ).pipe( Effect.map((ci) => createMcpConnector(ci)), Effect.result, @@ -968,7 +972,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => { }), ) as Effect.Effect<{ readonly tools: readonly ToolDef[] }, StorageFailure>, - invokeTool: ({ toolRow, credential, args, elicit }) => + invokeTool: ({ ctx, toolRow, credential, args, elicit }) => Effect.gen(function* () { const parsed = parseMcpIntegrationConfig(credential.config); if (!parsed) { @@ -1013,6 +1017,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => { credential.values, String(credential.template), allowStdio, + options?.httpClientLayer ?? ctx.httpClientLayer, ).pipe(Effect.map((ci) => createMcpConnector(ci))); const raw = yield* invokeMcpTool({ @@ -1087,7 +1092,11 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => { const name = parsed.value.hostname || "mcp"; const slug = deriveMcpNamespace({ endpoint: trimmed }); - const connector = createMcpConnector({ transport: "remote", endpoint: trimmed }); + const connector = createMcpConnector({ + transport: "remote", + endpoint: trimmed, + httpClientLayer, + }); const connected = yield* discoverTools(connector).pipe( Effect.map(() => true), diff --git a/packages/plugins/openapi/src/sdk/spec-blob.test.ts b/packages/plugins/openapi/src/sdk/spec-blob.test.ts index 04fecd120..186050633 100644 --- a/packages/plugins/openapi/src/sdk/spec-blob.test.ts +++ b/packages/plugins/openapi/src/sdk/spec-blob.test.ts @@ -161,6 +161,7 @@ describe("OpenAPI plugin — spec blob storage", () => { }, template: null, storage, + httpClientLayer: FetchHttpClient.layer, getValue: () => Effect.succeed(null), getValues: () => Effect.succeed({}), });