Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/cli/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,7 @@ const mcpCommand = Command.make(
{
scope,
elicitationMode: Options.choice("elicitation-mode", ["browser", "model"] as const)
.pipe(Options.withDefault("browser"))
.pipe(Options.withDefault("model"))
.pipe(
Options.withDescription(
"Choose the stdio approval flow: browser approval or a CLI resume tool exposed to the model.",
Expand Down
65 changes: 46 additions & 19 deletions apps/cloud/src/mcp-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import { DurableObject, env } from "cloudflare:workers";
import { createTraceState } from "@opentelemetry/api";
import { Cause, Data, Effect, Layer } from "effect";
import { Cause, Data, Deferred, Effect, Layer } from "effect";
import * as OtelTracer from "@effect/opentelemetry/Tracer";
import type * as Tracer from "effect/Tracer";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
Expand Down Expand Up @@ -264,6 +264,7 @@ export class McpSessionDO extends DurableObject {
private sessionMeta: SessionMeta | null = null;
private transportJsonResponseMode: boolean | null = null;
private approvalResponses = new Map<string, ResumeResponse>();
private approvalWaiters = new Map<string, Deferred.Deferred<ResumeResponse>>();
// Updated at the start of each `handleRequest` so the host-mcp server's
// `parentSpan` getter — invoked by the MCP SDK's deferred tool callbacks
// after `transport.handleRequest()` has already returned its streaming
Expand Down Expand Up @@ -363,29 +364,15 @@ export class McpSessionDO extends DurableObject {
// `Effect.runPromise(engine.getDescription)` at its async
// MCP-SDK boundary and orphan the sub-span.
const description = yield* buildExecuteDescription(executor);
const sessionElicitationMode =
sessionMeta.elicitationMode ?? (sessionMeta.allowModelResume ? "model" : "browser");
const sessionElicitationMode = sessionMeta.elicitationMode ?? "model";
const mcpServer = yield* createExecutorMcpServer({
engine,
description,
parentSpan: () => self.currentRequestSpan ?? undefined,
debug: env.EXECUTOR_MCP_DEBUG === "true",
browserApprovalStore: {
takeResponse: (executionId) =>
Effect.promise(async () => {
const memoryResponse = self.approvalResponses.get(executionId);
if (memoryResponse) {
self.approvalResponses.delete(executionId);
await self.ctx.storage.delete(approvalResponseKey(executionId));
return memoryResponse;
}
const stored = await self.ctx.storage.get<ResumeResponse>(
approvalResponseKey(executionId),
);
if (!stored) return null;
await self.ctx.storage.delete(approvalResponseKey(executionId));
return stored;
}),
takeResponse: (executionId) => self.takeApprovalResponse(executionId),
waitForResponse: (executionId) => self.waitForApprovalResponse(executionId),
},
elicitationMode:
sessionElicitationMode === "browser"
Expand Down Expand Up @@ -595,7 +582,7 @@ export class McpSessionDO extends DurableObject {
return yield* resolveSessionMeta(
token.organizationId,
token.userId,
token.elicitationMode ?? (token.allowModelResume ? "model" : "browser"),
token.elicitationMode ?? "model",
).pipe(
Effect.provide(makeResolveOrganizationServices(dbHandle)),
Effect.tap((sessionMeta) =>
Expand Down Expand Up @@ -762,6 +749,44 @@ export class McpSessionDO extends DurableObject {
);
}

private takeApprovalResponse(executionId: string): Effect.Effect<ResumeResponse | null> {
const self = this;
return Effect.promise(async () => {
const memoryResponse = self.approvalResponses.get(executionId);
if (memoryResponse) {
self.approvalResponses.delete(executionId);
await self.ctx.storage.delete(approvalResponseKey(executionId));
return memoryResponse;
}
const stored = await self.ctx.storage.get<ResumeResponse>(approvalResponseKey(executionId));
if (!stored) return null;
await self.ctx.storage.delete(approvalResponseKey(executionId));
return stored;
});
}

private waitForApprovalResponse(executionId: string): Effect.Effect<ResumeResponse | null> {
const self = this;
return Effect.gen(function* () {
const existing = yield* self.takeApprovalResponse(executionId);
if (existing) return existing;

const waiter =
self.approvalWaiters.get(executionId) ?? (yield* Deferred.make<ResumeResponse>());
self.approvalWaiters.set(executionId, waiter);
yield* Deferred.await(waiter).pipe(
Effect.ensuring(
Effect.sync(() => {
if (self.approvalWaiters.get(executionId) === waiter) {
self.approvalWaiters.delete(executionId);
}
}),
),
);
return yield* self.takeApprovalResponse(executionId);
});
}

async resumeExecutionForApproval(
executionId: string,
identity: McpSessionApprovalIdentity,
Expand All @@ -784,6 +809,8 @@ export class McpSessionDO extends DurableObject {
yield* Effect.promise(() =>
self.ctx.storage.put(approvalResponseKey(executionId), response),
);
const waiter = self.approvalWaiters.get(executionId);
if (waiter) yield* Deferred.succeed(waiter, response);
return resumeApprovalResult(executionId, response);
}).pipe(
Effect.withSpan("McpSessionDO.resumeExecutionForApproval", {
Expand Down
2 changes: 1 addition & 1 deletion apps/cloud/src/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ const readElicitationMode = (request: Request): McpElicitationMode => {
return "model";
}

return "browser";
return "model";
};

/**
Expand Down
88 changes: 88 additions & 0 deletions apps/cloud/src/services/mcp-worker-transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,94 @@ describe("JsonRpcRequestIdQueue", () => {
release();
});

it("does not let a waiting resume block an unrelated same-id tool call", async () => {
const queue = new JsonRpcRequestIdQueue();
let releaseResume!: () => void;
const hungResume = new Promise<void>((resolve) => {
releaseResume = resolve;
});

const resumeStarted = new Promise<void>((resolve) => {
queue.run(
jsonRpcRequest({
jsonrpc: "2.0",
id: 1,
method: "tools/call",
params: { name: "resume", arguments: { executionId: "exec_1" } },
}),
async () => {
resolve();
await hungResume;
},
);
});
await resumeStarted;

const executeDone = await Promise.race([
queue
.run(
jsonRpcRequest({
jsonrpc: "2.0",
id: 1,
method: "tools/call",
params: { name: "execute", arguments: { code: "return 1" } },
}),
async () => "done",
)
.then((v) => ({ kind: "settled" as const, v })),
new Promise<{ kind: "blocked" }>((r) => setTimeout(() => r({ kind: "blocked" }), 200)),
]);

expect(executeDone.kind).toBe("settled");
releaseResume();
});

it("serialises same-id resume calls for the same execution", async () => {
const queue = new JsonRpcRequestIdQueue();
const order: string[] = [];

let releaseFirst!: () => void;
const firstStarted = new Promise<void>((resolve) => {
const firstRunning = new Promise<void>((release) => {
releaseFirst = release;
});
queue.run(
jsonRpcRequest({
jsonrpc: "2.0",
id: 1,
method: "tools/call",
params: { name: "resume", arguments: { executionId: "exec_1" } },
}),
async () => {
order.push("first:start");
resolve();
await firstRunning;
order.push("first:end");
},
);
});
await firstStarted;

const secondDone = queue.run(
jsonRpcRequest({
jsonrpc: "2.0",
id: 1,
method: "tools/call",
params: { name: "resume", arguments: { executionId: "exec_1" } },
}),
async () => {
order.push("second");
},
);

await new Promise((r) => setTimeout(r, 20));
expect(order).toEqual(["first:start"]);

releaseFirst();
await secondDone;
expect(order).toEqual(["first:start", "first:end", "second"]);
});

it("regression: caps wait on a hung previous request and dispatches anyway", async () => {
// Override the timeout for fast CI — the production default is
// PREVIOUS_REQUEST_TIMEOUT_MS (60s) which we cap test-side to 100ms.
Expand Down
33 changes: 30 additions & 3 deletions apps/cloud/src/services/mcp-worker-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ export type McpWorkerTransport = Readonly<{
type JsonRpcLike = {
readonly id?: unknown;
readonly method?: unknown;
readonly params?: unknown;
};

type ToolCallParamsLike = {
readonly name?: unknown;
readonly arguments?: unknown;
};

type ResumeArgumentsLike = {
readonly executionId?: unknown;
};

type HandleRequestResult = {
Expand Down Expand Up @@ -60,7 +70,24 @@ const jsonRpcRequestIdKey = (id: unknown): string | null =>
Option.getOrNull,
);

const extractJsonRpcRequestIdKeys = async (request: Request): Promise<ReadonlyArray<string>> => {
const jsonRpcRequestQueueKey = (message: JsonRpcLike): string | null => {
const idKey = jsonRpcRequestIdKey(message.id);
if (!idKey) return null;

if (message.method !== "tools/call") return idKey;
if (!message.params || typeof message.params !== "object") return idKey;

const params = message.params as ToolCallParamsLike;
if (params.name !== "resume") return idKey;
if (!params.arguments || typeof params.arguments !== "object") return idKey;

const args = params.arguments as ResumeArgumentsLike;
if (typeof args.executionId !== "string" || args.executionId.length === 0) return idKey;

return `${idKey}:tools/call:resume:${args.executionId}`;
};

const extractJsonRpcRequestQueueKeys = async (request: Request): Promise<ReadonlyArray<string>> => {
if (request.method !== "POST") return [];
const contentType = request.headers.get("content-type") ?? "";
if (!contentType.includes("application/json")) return [];
Expand All @@ -74,7 +101,7 @@ const extractJsonRpcRequestIdKeys = async (request: Request): Promise<ReadonlyAr
if (!message || typeof message !== "object") return [];
const rpc = message as JsonRpcLike;
if (typeof rpc.method !== "string") return [];
const key = jsonRpcRequestIdKey(rpc.id);
const key = jsonRpcRequestQueueKey(rpc);
return key ? [key] : [];
});
};
Expand All @@ -97,7 +124,7 @@ export class JsonRpcRequestIdQueue {
}

async run<A>(request: Request, run: () => Promise<A>): Promise<A> {
const ids = [...new Set(await extractJsonRpcRequestIdKeys(request))];
const ids = [...new Set(await extractJsonRpcRequestQueueKeys(request))];
if (ids.length === 0) return await run();

const previous = ids.map((id) => this.inFlight.get(id)).filter(Predicate.isNotUndefined);
Expand Down
2 changes: 1 addition & 1 deletion apps/cloud/src/web/pages/setup-mcp.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const SetupMcpPage = () => {
const navigate = useNavigate();
const [origin, setOrigin] = useState<string | null>(null);
const [advancedOpen, setAdvancedOpen] = useState(false);
const [elicitationMode, setElicitationMode] = useState<McpElicitationMode>("browser");
const [elicitationMode, setElicitationMode] = useState<McpElicitationMode>("model");

useEffect(() => {
setOrigin(window.location.origin);
Expand Down
18 changes: 13 additions & 5 deletions apps/local/src/server/mcp-browser-resume.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ const readApproval = (structured: unknown): { readonly executionId: string; read
expect(record).not.toHaveProperty("interaction");
expect(typeof record.executionId).toBe("string");
expect(typeof record.approvalUrl).toBe("string");
expect(record.resumePrompt).toBe(
"Return text to the user telling them to approve the action at this approvalUrl. Only after you have prompted the user, call the `resume` tool with this executionId; `resume` will wait for the user's browser decision.",
);
const { executionId, approvalUrl } = record as {
readonly executionId: string;
readonly approvalUrl: string;
Expand All @@ -166,7 +169,10 @@ describe("local MCP browser approval resume", () => {
{ name: "browser-resume-test-client", version: "1.0.0" },
{ capabilities: {} },
);
const transport = new StreamableHTTPClientTransport(new URL("/mcp", TEST_BASE_URL), { fetch });
const transport = new StreamableHTTPClientTransport(
new URL("/mcp?elicitation_mode=browser", TEST_BASE_URL),
{ fetch },
);

await mcpClient.connect(transport);

Expand Down Expand Up @@ -231,6 +237,11 @@ const approveInBrowserThenResume = async (
const sessionId = approval.url.searchParams.get("mcp_session_id");
expect(sessionId).not.toBeNull();

const resume = client.callTool({
name: "resume",
arguments: { executionId: approval.executionId },
});

const approvalResponse = await fetch(
new URL(
`/api/mcp-sessions/${encodeURIComponent(sessionId!)}/executions/${encodeURIComponent(approval.executionId)}/resume`,
Expand All @@ -247,8 +258,5 @@ const approveInBrowserThenResume = async (
);
expect(approvalResponse.status).toBe(200);

return await client.callTool({
name: "resume",
arguments: { executionId: approval.executionId },
});
return await resume;
};
Loading
Loading