Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/core/sdk/src/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,7 @@ export const createExecutor = <const TPlugins extends readonly AnyPlugin[] = rea
.resolveTools({
integration: rowToIntegration(integrationRow),
config: decodeJsonColumn(integrationRow.config),
httpClientLayer: runtime.ctx.httpClientLayer,
connection: ref,
template: existingRow ? AuthTemplateSlug.make(existingRow.template) : null,
storage: runtime.storage,
Expand Down
1 change: 1 addition & 0 deletions packages/core/sdk/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ export interface ResolveToolsInput<TStore = unknown> {
* facades (e.g. a content-addressed spec blob) instead of inlining them
* in `config`. */
readonly storage: TStore;
readonly httpClientLayer: Layer.Layer<HttpClient.HttpClient>;
/** The connection whose tools are being resolved. */
readonly connection: ConnectionRef;
/** Which of the integration's declared auth methods the connection binds
Expand Down
102 changes: 101 additions & 1 deletion packages/plugins/mcp/src/sdk/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import type { OAuthClientProvider } from "@modelcontextprotocol/sdk/client/auth.
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import type { FetchLike } from "@modelcontextprotocol/sdk/shared/transport.js";
import { CfWorkerJsonSchemaValidator } from "@modelcontextprotocol/sdk/validation/cfworker";
import { Effect, Predicate } from "effect";
import { Effect, Layer, Predicate, Stream } from "effect";
import { HttpClient, HttpClientRequest } from "effect/unstable/http";

// NOTE: `StdioClientTransport` is NOT imported eagerly. The upstream module
// (`@modelcontextprotocol/sdk/client/stdio.js`) touches `node:child_process`
Expand Down Expand Up @@ -42,6 +44,7 @@ export type RemoteConnectorInput = Omit<
readonly headers?: Record<string, string>;
readonly queryParams?: Record<string, string>;
readonly authProvider?: OAuthClientProvider;
readonly httpClientLayer?: Layer.Layer<HttpClient.HttpClient>;
};

export type StdioConnectorInput = McpStdioIntegrationConfig;
Expand All @@ -60,6 +63,100 @@ const buildEndpointUrl = (endpoint: string, queryParams: Record<string, string>)
return url;
};

type HttpMethod = Parameters<typeof HttpClientRequest.make>[0];
const HTTP_METHODS = new Set<HttpMethod>([
"DELETE",
"GET",
"HEAD",
"OPTIONS",
"PATCH",
"POST",
"PUT",
]);

const httpMethodFrom = (method: string | undefined): HttpMethod => {
const normalized = (method ?? "GET").toUpperCase() as HttpMethod;
return HTTP_METHODS.has(normalized) ? normalized : "POST";
};

const headersFrom = (headers: HeadersInit | undefined): Headers =>
headers ? new Headers(headers) : new Headers();

const recordFromHeaders = (headers: Headers): Record<string, string> =>
Object.fromEntries(headers.entries());

const applyBody = async (
request: HttpClientRequest.HttpClientRequest,
headers: Headers,
body: BodyInit | null | undefined,
): Promise<HttpClientRequest.HttpClientRequest> => {
if (body == null) return request;
const contentType = headers.get("content-type") ?? undefined;
if (typeof body === "string") return HttpClientRequest.bodyText(request, body, contentType);
if (body instanceof URLSearchParams) {
return HttpClientRequest.bodyText(
request,
body.toString(),
contentType ?? "application/x-www-form-urlencoded;charset=UTF-8",
);
}
if (body instanceof Uint8Array)
return HttpClientRequest.bodyUint8Array(request, body, contentType);
if (body instanceof ArrayBuffer) {
return HttpClientRequest.bodyUint8Array(request, new Uint8Array(body), contentType);
}
const bytes = new Uint8Array(await new Response(body).arrayBuffer());
return HttpClientRequest.bodyUint8Array(request, bytes, contentType);
};

const abortError = (signal: AbortSignal): unknown => {
if (signal.reason !== undefined) return signal.reason;
// oxlint-disable-next-line executor/no-error-constructor -- boundary: Fetch-compatible adapter must reject with an AbortError-shaped value
const error = new Error("The operation was aborted");
error.name = "AbortError";
return error;
};

const fetchFromHttpClientLayer = (
httpClientLayer: Layer.Layer<HttpClient.HttpClient>,
): FetchLike => {
const execute: FetchLike = async (url, init) => {
const headers = headersFrom(init?.headers);
const requestWithoutBody = HttpClientRequest.make(httpMethodFrom(init?.method))(url, {
headers: recordFromHeaders(headers),
});
const request = await applyBody(requestWithoutBody, headers, init?.body);
const effect = Effect.gen(function* () {
const client = yield* HttpClient.HttpClient;
const response = yield* client.execute(request);
const responseHeaders = new Headers();
for (const [key, value] of Object.entries(response.headers)) {
if (value !== undefined) responseHeaders.set(key, value);
}
const body =
response.status === 204 || response.status === 205 || response.status === 304
? null
: Stream.toReadableStream(response.stream);
return new Response(body, {
status: response.status,
headers: responseHeaders,
});
}).pipe(Effect.provide(httpClientLayer));
const promise = Effect.runPromise(effect);
if (!init?.signal) return promise;
// oxlint-disable-next-line executor/no-promise-reject -- boundary: Fetch-compatible adapter mirrors abort rejection semantics
if (init.signal.aborted) return Promise.reject(abortError(init.signal));
const aborted = new Promise<never>((_, reject) => {
// oxlint-disable-next-line executor/no-promise-reject -- boundary: Fetch-compatible adapter races the Effect request against AbortSignal
init.signal?.addEventListener("abort", () => reject(abortError(init.signal!)), {
once: true,
});
});
return Promise.race([promise, aborted]);
Comment on lines +145 to +155

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 AbortSignal races the promise but does not cancel the underlying Effect fiber

When signal aborts and aborted wins Promise.race, the function rejects correctly for the caller — but Effect.runPromise(effect) continues executing on its internal fiber until the request completes or fails. For SSE connections, which hold a long-lived stream, this means the stream is still consumed and the HTTP connection is still held open even after the MCP transport has declared the connection aborted. The fix requires spawning a fiber explicitly so it can be interrupted on abort:

const fiber = Effect.runFork(effect);
init.signal?.addEventListener("abort", () => Fiber.interrupt(fiber), { once: true });
return Effect.runPromise(Fiber.join(fiber));

Without interruption, every aborted MCP connection leaks an open HTTP request for its remaining lifetime.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Floating promise rejection when the request wins the race and the signal is later aborted. When promise resolves first, aborted is left pending. If init.signal.abort() is subsequently called (which is the normal SSE teardown path — SSEClientTransport calls controller.abort() via transport.close()), the listener fires, aborted rejects, and there is no handler on it, triggering an unhandledRejection event. Attaching a no-op .catch silences it without affecting the race's already-settled outcome.

Suggested change
return Promise.race([promise, aborted]);
aborted.catch(() => {});
return Promise.race([promise, aborted]);

};
return execute;
};

// Use the cfworker JSON Schema validator instead of the SDK's default
// (Ajv). Ajv compiles schemas via `new Function(...)`, which throws
// `Code generation from strings disallowed for this context` when the
Expand Down Expand Up @@ -157,6 +254,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => {
const headers = input.headers ?? {};
const remoteTransport = input.remoteTransport ?? "auto";
const requestInit = Object.keys(headers).length > 0 ? { headers } : undefined;
const fetch = input.httpClientLayer ? fetchFromHttpClientLayer(input.httpClientLayer) : undefined;

const endpoint = buildEndpointUrl(input.endpoint, input.queryParams ?? {});

Expand All @@ -166,6 +264,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => {
new StreamableHTTPClientTransport(endpoint, {
requestInit,
authProvider: input.authProvider,
fetch,
}),
});

Expand All @@ -175,6 +274,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => {
new SSEClientTransport(endpoint, {
requestInit,
authProvider: input.authProvider,
fetch,
}),
});

Expand Down
34 changes: 32 additions & 2 deletions packages/plugins/mcp/src/sdk/plugin.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { describe, expect, it } from "@effect/vitest";
import { Effect, Option, Predicate, Schema } from "effect";
import { HttpServerResponse } from "effect/unstable/http";
import { Effect, Layer, Option, Predicate, Schema } from "effect";
import {
HttpClient,
HttpClientRequest,
HttpClientResponse,
HttpServerResponse,
} from "effect/unstable/http";

import {
AuthTemplateSlug,
Expand All @@ -17,6 +22,7 @@ import {
serveTestHttpApp,
} from "@executor-js/sdk/testing";

import { createMcpConnector } from "./connection";
import { mcpPlugin, userFacingProbeMessage } from "./plugin";
import { McpInvocationError } from "./errors";
import { extractManifestFromListToolsResult, deriveMcpNamespace, joinToolPath } from "./manifest";
Expand Down Expand Up @@ -299,6 +305,30 @@ describe("mcpPlugin", () => {
}),
);

it.effect("routes remote connector traffic through the provided HttpClient layer", () =>
Effect.gen(function* () {
const seen: string[] = [];
const httpClientLayer = Layer.succeed(HttpClient.HttpClient)(
HttpClient.make((request: HttpClientRequest.HttpClientRequest) => {
seen.push(request.url);
return Effect.succeed(
HttpClientResponse.fromWeb(request, new Response("blocked", { status: 403 })),
);
}),
);

const error = yield* createMcpConnector({
transport: "remote",
endpoint: "https://internal.example/mcp",
remoteTransport: "streamable-http",
httpClientLayer,
}).pipe(Effect.flip);

expect(Predicate.isTagged(error, "McpConnectionError")).toBe(true);
expect(seen).toEqual(["https://internal.example/mcp"]);
}),
);

it.effect("integration catalog has no configured MCP integrations initially", () =>
Effect.gen(function* () {
const executor = yield* createExecutor(makeTestConfig({ plugins: [mcpPlugin()] as const }));
Expand Down
15 changes: 12 additions & 3 deletions packages/plugins/mcp/src/sdk/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ const buildConnectorInput = (
values: Record<string, string | null>,
templateSlug: string | null,
allowStdio: boolean,
httpClientLayer?: Layer.Layer<HttpClient.HttpClient>,
): Effect.Effect<ConnectorInput, McpConnectionError> => {
if (config.transport === "stdio") {
if (!allowStdio) {
Expand Down Expand Up @@ -512,6 +513,7 @@ const buildConnectorInput = (
queryParams: Object.keys(queryParams).length > 0 ? queryParams : undefined,
headers: Object.keys(headers).length > 0 ? headers : undefined,
authProvider,
httpClientLayer,
});
};

Expand Down Expand Up @@ -636,6 +638,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
endpoint: trimmed,
headers: probeHeaders,
queryParams: probeQueryParams,
httpClientLayer,
});

const result = yield* discoverTools(connector).pipe(
Expand Down Expand Up @@ -929,7 +932,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
// Discovery failures (auth not ready, server down) yield an empty tool set
// rather than failing — the connection still lands and can be refreshed.
// -----------------------------------------------------------------------
resolveTools: ({ config, connection, template, getValues }) =>
resolveTools: ({ config, connection, template, getValues, httpClientLayer }) =>
Effect.gen(function* () {
const parsed = parseMcpIntegrationConfig(config);
if (!parsed) return { tools: [] as readonly ToolDef[] };
Expand All @@ -945,6 +948,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
values,
template === null ? null : String(template),
allowStdio,
httpClientLayer,
).pipe(
Effect.map((ci) => createMcpConnector(ci)),
Effect.result,
Expand All @@ -968,7 +972,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
}),
) as Effect.Effect<{ readonly tools: readonly ToolDef[] }, StorageFailure>,

invokeTool: ({ toolRow, credential, args, elicit }) =>
invokeTool: ({ ctx, toolRow, credential, args, elicit }) =>
Effect.gen(function* () {
const parsed = parseMcpIntegrationConfig(credential.config);
if (!parsed) {
Expand Down Expand Up @@ -1013,6 +1017,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
credential.values,
String(credential.template),
allowStdio,
options?.httpClientLayer ?? ctx.httpClientLayer,
).pipe(Effect.map((ci) => createMcpConnector(ci)));

const raw = yield* invokeMcpTool({
Expand Down Expand Up @@ -1087,7 +1092,11 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
const name = parsed.value.hostname || "mcp";
const slug = deriveMcpNamespace({ endpoint: trimmed });

const connector = createMcpConnector({ transport: "remote", endpoint: trimmed });
const connector = createMcpConnector({
transport: "remote",
endpoint: trimmed,
httpClientLayer,
});

const connected = yield* discoverTools(connector).pipe(
Effect.map(() => true),
Expand Down
1 change: 1 addition & 0 deletions packages/plugins/openapi/src/sdk/spec-blob.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ describe("OpenAPI plugin — spec blob storage", () => {
},
template: null,
storage,
httpClientLayer: FetchHttpClient.layer,
getValue: () => Effect.succeed(null),
getValues: () => Effect.succeed({}),
});
Expand Down
Loading