diff --git a/apps/cli/src/main.ts b/apps/cli/src/main.ts index ccfe93ca0..7fb6dc792 100644 --- a/apps/cli/src/main.ts +++ b/apps/cli/src/main.ts @@ -409,6 +409,7 @@ type ExecuteCodeOutcome = readonly status: "paused"; readonly text: string; readonly executionId: string | undefined; + readonly approvalUrl: string | undefined; readonly interaction: | { readonly kind: "url" | "form"; @@ -419,6 +420,11 @@ type ExecuteCodeOutcome = | undefined; }; +const buildResumeApprovalUrl = (baseUrl: string, executionId: string): string => { + const url = new URL(`/resume/${encodeURIComponent(executionId)}`, baseUrl); + return url.toString(); +}; + const executeCode = (input: { baseUrl: string; code: string; @@ -433,10 +439,12 @@ const executeCode = (input: { }); if (response.status === "paused") { + const executionId = extractExecutionId(response.structured); return { status: "paused" as const, text: response.text, - executionId: extractExecutionId(response.structured), + executionId, + approvalUrl: executionId ? buildResumeApprovalUrl(daemonUrl, executionId) : undefined, interaction: extractPausedInteraction(response.structured), }; } @@ -456,6 +464,10 @@ const printExecutionOutcome = (input: { baseUrl: string; outcome: ExecuteCodeOut if (input.outcome.status === "paused") { console.log(input.outcome.text); if (input.outcome.executionId) { + if (input.outcome.approvalUrl) { + console.log("\nApprove in browser:"); + console.log(` ${input.outcome.approvalUrl}`); + } const commandPrefix = `${cliPrefix} resume --execution-id ${input.outcome.executionId} --base-url ${input.baseUrl}`; if (input.outcome.interaction?.kind === "form") { const requestedSchema = input.outcome.interaction.requestedSchema; @@ -464,12 +476,12 @@ const printExecutionOutcome = (input: { baseUrl: string; outcome: ExecuteCodeOut } const template = buildResumeContentTemplate(requestedSchema); const contentArg = shellQuoteArg(JSON.stringify(template)); - console.log("\nResume commands:"); + console.log("\nCLI fallback:"); console.log(` ${commandPrefix} --action accept --content ${contentArg}`); console.log(` ${commandPrefix} --action decline`); console.log(` ${commandPrefix} --action cancel`); } else { - console.log("\nResume command:"); + console.log("\nCLI fallback:"); console.log(` ${commandPrefix} --action accept`); } } @@ -681,11 +693,21 @@ const withStdoutReroutedToStderr = async (body: () => Promise): Promise } }; -const runStdioMcpSession = () => +const runStdioMcpSession = (input: { readonly elicitationMode: "browser" | "model" }) => Effect.gen(function* () { const executor = yield* Effect.promise(() => withStdoutReroutedToStderr(() => getExecutor())); yield* Effect.promise(() => - runMcpStdioServer({ executor, codeExecutor: makeQuickJsExecutor() }), + runMcpStdioServer({ + executor, + codeExecutor: makeQuickJsExecutor(), + elicitationMode: + input.elicitationMode === "browser" + ? { + mode: "browser" as const, + approvalUrl: (executionId) => `/resume/${encodeURIComponent(executionId)}`, + } + : { mode: input.elicitationMode }, + }), ); }); @@ -1206,6 +1228,17 @@ const resumeCommand = Command.make( payload: { action, content: contentObj }, }); + if (result.status === "paused") { + console.log(result.text); + const nextExecutionId = extractExecutionId(result.structured); + if (nextExecutionId) { + console.log(""); + console.log("Approval required:"); + console.log(buildResumeApprovalUrl(daemonUrl, nextExecutionId)); + } + process.exit(0); + } + if (result.isError) { if (shouldPrintVerboseErrors(process.argv)) { console.error(result.text); @@ -1456,11 +1489,23 @@ const daemonCommand = Command.make("daemon").pipe( Command.withDescription("Manage the local daemon"), ); -const mcpCommand = Command.make("mcp", { scope }, ({ scope }) => - Effect.gen(function* () { - applyScope(scope); - yield* runStdioMcpSession(); - }), +const mcpCommand = Command.make( + "mcp", + { + scope, + elicitationMode: Options.choice("elicitation-mode", ["browser", "model"] as const) + .pipe(Options.withDefault("browser")) + .pipe( + Options.withDescription( + "Choose the stdio approval flow: browser approval or a CLI resume tool exposed to the model.", + ), + ), + }, + ({ scope, elicitationMode }) => + Effect.gen(function* () { + applyScope(scope); + yield* runStdioMcpSession({ elicitationMode }); + }), ).pipe(Command.withDescription("Start an MCP server over stdio")); // --------------------------------------------------------------------------- diff --git a/apps/cloud/src/api/execution-usage.ts b/apps/cloud/src/api/execution-usage.ts index fd1e9d8ba..cd4489984 100644 --- a/apps/cloud/src/api/execution-usage.ts +++ b/apps/cloud/src/api/execution-usage.ts @@ -18,5 +18,6 @@ export const withExecutionUsageTracking = ( .pipe(Effect.tap(() => Effect.sync(() => trackUsage(organizationId)))), // resume doesn't count as usage resume: (executionId, response) => engine.resume(executionId, response), + getPausedExecution: (executionId) => engine.getPausedExecution(executionId), getDescription: engine.getDescription, }); diff --git a/apps/cloud/src/api/protected.test.ts b/apps/cloud/src/api/protected.test.ts index 124db4b2e..d6ff28874 100644 --- a/apps/cloud/src/api/protected.test.ts +++ b/apps/cloud/src/api/protected.test.ts @@ -16,6 +16,7 @@ const makeBaseEngine = (): ExecutionEngine => status: "completed", result: { result: "ok", logs: [] }, }), + getPausedExecution: () => Effect.succeed(null), getDescription: Effect.succeed("desc"), }) as ExecutionEngine; diff --git a/apps/cloud/src/auth/api.ts b/apps/cloud/src/auth/api.ts index 4a7523cdb..912488ed5 100644 --- a/apps/cloud/src/auth/api.ts +++ b/apps/cloud/src/auth/api.ts @@ -107,6 +107,55 @@ const CreatedApiKeyResponse = Schema.Struct({ const ApiKeyParams = { apiKeyId: Schema.String }; +const McpSessionExecutionParams = { + mcpSessionId: Schema.String, + executionId: Schema.String, +}; + +const ResumeMcpExecutionBody = Schema.Struct({ + action: Schema.Literals(["accept", "decline", "cancel"]), + content: Schema.optional(Schema.Unknown), +}); + +const McpPausedExecutionResponse = Schema.Struct({ + text: Schema.String, + structured: Schema.Unknown, +}); + +const McpResumeCompletedResponse = Schema.Struct({ + status: Schema.Literal("completed"), + text: Schema.String, + structured: Schema.Unknown, + isError: Schema.Boolean, +}); + +const McpResumePausedResponse = Schema.Struct({ + status: Schema.Literal("paused"), + text: Schema.String, + structured: Schema.Unknown, +}); + +const McpResumeExecutionResponse = Schema.Union([ + McpResumeCompletedResponse, + McpResumePausedResponse, +]); + +export class McpExecutionNotFoundError extends Schema.TaggedErrorClass()( + "McpExecutionNotFoundError", + { + executionId: Schema.String, + }, + { httpApiStatus: 404 }, +) {} + +export class McpSessionForbiddenError extends Schema.TaggedErrorClass()( + "McpSessionForbiddenError", + { + mcpSessionId: Schema.String, + }, + { httpApiStatus: 403 }, +) {} + export const AUTH_PATHS = { login: "/api/auth/login", logout: "/api/auth/logout", @@ -116,6 +165,11 @@ export const AUTH_PATHS = { const AuthErrors = [UserStoreError, WorkOSError] as const; const ApiKeyErrors = [ApiKeyManagementError, NoOrganization, UserStoreError, WorkOSError] as const; +const McpApprovalErrors = [ + NoOrganization, + McpExecutionNotFoundError, + McpSessionForbiddenError, +] as const; /** Public auth endpoints — no authentication required */ export class CloudAuthPublicApi extends HttpApiGroup.make("cloudAuthPublic") @@ -187,4 +241,23 @@ export class CloudAuthApi extends HttpApiGroup.make("cloudAuth") error: ApiKeyErrors, }), ) + .add( + HttpApiEndpoint.get("getMcpPaused", "/mcp-sessions/:mcpSessionId/executions/:executionId", { + params: McpSessionExecutionParams, + success: McpPausedExecutionResponse, + error: McpApprovalErrors, + }), + ) + .add( + HttpApiEndpoint.post( + "resumeMcpExecution", + "/mcp-sessions/:mcpSessionId/executions/:executionId/resume", + { + params: McpSessionExecutionParams, + payload: ResumeMcpExecutionBody, + success: McpResumeExecutionResponse, + error: McpApprovalErrors, + }, + ), + ) .middleware(SessionAuth) {} diff --git a/apps/cloud/src/auth/handlers.ts b/apps/cloud/src/auth/handlers.ts index aac14c361..a48914559 100644 --- a/apps/cloud/src/auth/handlers.ts +++ b/apps/cloud/src/auth/handlers.ts @@ -3,7 +3,13 @@ import { HttpServerResponse } from "effect/unstable/http"; import { Duration, Effect, Predicate } from "effect"; import { setCookie, deleteCookie } from "@tanstack/react-start/server"; -import { AUTH_PATHS, CloudAuthApi, CloudAuthPublicApi } from "./api"; +import { + AUTH_PATHS, + CloudAuthApi, + CloudAuthPublicApi, + McpExecutionNotFoundError, + McpSessionForbiddenError, +} from "./api"; import { NoOrganization, SessionContext } from "./middleware"; import { UserStoreService } from "./context"; import { authorizeOrganization } from "./authorize-organization"; @@ -18,6 +24,7 @@ import { isOverFreeOrganizationLimit, shouldApplyFreeOrganizationLimit, } from "./organization-limits"; +import type { McpSessionApprovalResult, McpSessionResumeApprovalResult } from "../mcp-session"; const COOKIE_OPTIONS = { path: "/", @@ -82,6 +89,45 @@ const requireSessionOrganization = Effect.gen(function* () { return { session, org }; }); +const requireSessionOrganizationId = Effect.gen(function* () { + const session = yield* SessionContext; + if (!session.organizationId) { + return yield* new NoOrganization(); + } + return { + ...session, + organizationId: session.organizationId, + }; +}); + +const getMcpSessionStub = (mcpSessionId: string) => + Effect.try({ + try: () => { + const ns = env.MCP_SESSION; + return ns.get(ns.idFromString(mcpSessionId)); + }, + catch: () => undefined, + }).pipe(Effect.orElseSucceed(() => null)); + +const requireMcpSessionStub = (mcpSessionId: string, executionId: string) => + Effect.gen(function* () { + const stub = yield* getMcpSessionStub(mcpSessionId); + if (!stub) { + return yield* new McpExecutionNotFoundError({ executionId }); + } + return stub; + }); + +const failMcpApprovalResult = ( + result: { readonly status: "not_found" | "forbidden" }, + params: { readonly mcpSessionId: string; readonly executionId: string }, +) => { + if (result.status === "forbidden") { + return Effect.fail(new McpSessionForbiddenError({ mcpSessionId: params.mcpSessionId })); + } + return Effect.fail(new McpExecutionNotFoundError({ executionId: params.executionId })); +}; + const setResponseCookie = ( response: HttpServerResponse.HttpServerResponse, name: string, @@ -447,6 +493,67 @@ export const CloudSessionAuthHandlers = HttpApiBuilder.group( }); }), ) + .handle("getMcpPaused", ({ params }) => + Effect.gen(function* () { + const owner = yield* requireSessionOrganizationId; + const stub = yield* requireMcpSessionStub(params.mcpSessionId, params.executionId); + const result = yield* Effect.promise( + () => + stub.getPausedExecutionForApproval(params.executionId, { + accountId: owner.accountId, + organizationId: owner.organizationId, + }) as Promise, + ); + + if (result.status !== "ok") { + return yield* failMcpApprovalResult(result, params); + } + + return { + text: result.text, + structured: result.structured, + }; + }), + ) + .handle("resumeMcpExecution", ({ params, payload }) => + Effect.gen(function* () { + const owner = yield* requireSessionOrganizationId; + const stub = yield* requireMcpSessionStub(params.mcpSessionId, params.executionId); + const result = yield* Effect.promise( + () => + stub.resumeExecutionForApproval( + params.executionId, + { + accountId: owner.accountId, + organizationId: owner.organizationId, + }, + { + action: payload.action, + content: payload.content as Record | undefined, + }, + ) as Promise, + ); + + if (result.status !== "ok") { + return yield* failMcpApprovalResult(result, params); + } + + if (result.executionStatus === "paused") { + return { + status: "paused" as const, + text: result.text, + structured: result.structured, + }; + } + + return { + status: "completed" as const, + text: result.text, + structured: result.structured, + isError: result.isError ?? false, + }; + }), + ) .handle("revokeApiKey", ({ params }) => Effect.gen(function* () { const { session, org } = yield* requireSessionOrganization; diff --git a/apps/cloud/src/mcp-flow.test.ts b/apps/cloud/src/mcp-flow.test.ts index 58ea8a48f..d395a3a65 100644 --- a/apps/cloud/src/mcp-flow.test.ts +++ b/apps/cloud/src/mcp-flow.test.ts @@ -534,6 +534,37 @@ describe("/mcp session restore", () => { }); describe("McpSessionDO alarm lifecycle", () => { + it("rejects browser approval reads and resumes from a different signed-in user", async () => { + const stub = env.MCP_SESSION.get(env.MCP_SESSION.newUniqueId()); + const sessionMeta = { + organizationId: "org_browser_resume_owner", + organizationName: "Browser Resume Owner", + userId: "user_browser_resume_owner", + }; + + await runInDurableObject(stub, async (_instance, state) => { + await state.storage.put(SESSION_META_KEY, sessionMeta); + await state.storage.put(LAST_ACTIVITY_KEY, Date.now()); + }); + + const attackerIdentity = { + accountId: "user_browser_resume_attacker", + organizationId: sessionMeta.organizationId, + }; + + await expect( + stub.getPausedExecutionForApproval("exec_approval", attackerIdentity), + ).resolves.toEqual({ + status: "forbidden", + }); + + await expect( + stub.resumeExecutionForApproval("exec_approval", attackerIdentity, { action: "accept" }), + ).resolves.toEqual({ + status: "forbidden", + }); + }); + it("keeps a recently active session after a cold-started alarm", async () => { const stub = env.MCP_SESSION.get(env.MCP_SESSION.newUniqueId()); const sessionMeta = { diff --git a/apps/cloud/src/mcp-miniflare.e2e.node.test.ts b/apps/cloud/src/mcp-miniflare.e2e.node.test.ts index 51405d21b..5e52d9b86 100644 --- a/apps/cloud/src/mcp-miniflare.e2e.node.test.ts +++ b/apps/cloud/src/mcp-miniflare.e2e.node.test.ts @@ -327,7 +327,10 @@ const nextOrgId = () => `org_miniflare_${++orgCounter}`; const connectClient = async ( baseUrl: URL, bearer: string, - options: { withElicitation?: boolean } = {}, + options: { + withElicitation?: boolean; + elicitationMode?: "browser" | "model" | "native"; + } = {}, ): Promise => { const client = new Client( { name: "mcp-miniflare-e2e", version: "0.0.1" }, @@ -335,7 +338,10 @@ const connectClient = async ( capabilities: options.withElicitation ? { elicitation: { form: {} } } : {}, }, ); - const transport = new StreamableHTTPClientTransport(new URL("/mcp", baseUrl), { + const endpoint = new URL("/mcp", baseUrl); + if (options.elicitationMode) + endpoint.searchParams.set("elicitation_mode", options.elicitationMode); + const transport = new StreamableHTTPClientTransport(endpoint, { requestInit: { headers: { authorization: `Bearer ${bearer}` } }, }); await client.connect(transport); @@ -756,6 +762,7 @@ layer(TestEnv, { timeout: 60_000 })("cloud MCP over real HTTP (miniflare)", (it) const client = yield* Effect.promise(() => connectClient(baseUrl, makeTestBearer(nextAccountId(), orgId), { withElicitation: true, + elicitationMode: "native", }), ); diff --git a/apps/cloud/src/mcp-session.e2e.node.test.ts b/apps/cloud/src/mcp-session.e2e.node.test.ts index c3552d318..7d7c5c488 100644 --- a/apps/cloud/src/mcp-session.e2e.node.test.ts +++ b/apps/cloud/src/mcp-session.e2e.node.test.ts @@ -92,7 +92,10 @@ const ELICITATION_CAPS: ClientCapabilities = { elicitation: { form: {}, url: {} }, }; -type BuildOptions = { readonly withElicitingPlugin?: boolean }; +type BuildOptions = { + readonly withElicitingPlugin?: boolean; + readonly elicitationMode?: "model" | "native"; +}; const buildScopedExecutor = (scopeId: string, scopeName: string, options: BuildOptions = {}) => Effect.gen(function* () { @@ -134,7 +137,10 @@ const openSession = ( Effect.gen(function* () { const executor = yield* buildScopedExecutor(orgId, `Org ${orgId}`, options); const engine = createExecutionEngine({ executor, codeExecutor: makeQuickJsExecutor() }); - const mcpServer = yield* createExecutorMcpServer({ engine }); + const mcpServer = yield* createExecutorMcpServer({ + engine, + elicitationMode: options.elicitationMode ? { mode: options.elicitationMode } : undefined, + }); const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); const client = new Client( { name: "cloud-e2e-test", version: "1.0.0" }, @@ -205,7 +211,10 @@ describe("cloud MCP session end-to-end", () => { it.effect("bridges a form elicitation from engine to client and back", () => Effect.gen(function* () { - const { client } = yield* openSession(nextOrgId(), { withElicitingPlugin: true }); + const { client } = yield* openSession(nextOrgId(), { + withElicitingPlugin: true, + elicitationMode: "native", + }); client.setRequestHandler(ElicitRequestSchema, async () => ({ action: "accept" as const, diff --git a/apps/cloud/src/mcp-session.ts b/apps/cloud/src/mcp-session.ts index 2759e39d8..f3f7780c9 100644 --- a/apps/cloud/src/mcp-session.ts +++ b/apps/cloud/src/mcp-session.ts @@ -13,7 +13,12 @@ import { drizzle } from "drizzle-orm/postgres-js"; import postgres, { type Sql } from "postgres"; import { createExecutorMcpServer } from "@executor-js/host-mcp"; -import { buildExecuteDescription } from "@executor-js/execution"; +import { + buildExecuteDescription, + formatPausedExecution, + type ExecutionEngine, + type ResumeResponse, +} from "@executor-js/execution"; import type { DrizzleDb, DbServiceShape } from "./services/db"; // Import directly from core-shared-services, NOT from ./api/layers.ts. @@ -37,6 +42,8 @@ import { captureCause } from "./observability"; export type McpSessionInit = { organizationId: string; userId: string; + elicitationMode?: "browser" | "model" | "native"; + allowModelResume?: boolean; }; export type IncomingTraceHeaders = { @@ -45,6 +52,57 @@ export type IncomingTraceHeaders = { readonly baggage?: string; }; +export type McpSessionApprovalIdentity = { + readonly accountId: string; + readonly organizationId: string; +}; + +type McpSessionApprovalErrorResult = + | { readonly status: "not_found" } + | { readonly status: "forbidden" }; + +export type McpSessionApprovalResult = + | { + readonly status: "ok"; + readonly text: string; + readonly structured: Record; + } + | McpSessionApprovalErrorResult; + +export type McpSessionResumeApprovalResult = + | { + readonly status: "ok"; + readonly executionStatus: "completed" | "paused"; + readonly text: string; + readonly structured: Record; + readonly isError?: boolean; + } + | McpSessionApprovalErrorResult; + +const resumeApprovalResult = ( + executionId: string, + response: ResumeResponse, +): Extract => { + const textByAction = { + accept: "I've approved it", + decline: "I've denied it", + cancel: "I've canceled it", + } satisfies Record; + const statusByAction = { + accept: "approved", + decline: "denied", + cancel: "canceled", + } satisfies Record; + + return { + status: "ok", + executionStatus: "completed", + text: textByAction[response.action], + structured: { status: statusByAction[response.action], executionId }, + isError: false, + }; +}; + const HEARTBEAT_MS = 30 * 1000; const SESSION_TIMEOUT_MS = 5 * 60 * 1000; const LONG_LIVED_DB_IDLE_TIMEOUT_SECONDS = 5; @@ -52,6 +110,7 @@ const LONG_LIVED_DB_MAX_LIFETIME_SECONDS = 120; const TRANSPORT_STATE_KEY = "transport"; const SESSION_META_KEY = "session-meta"; const LAST_ACTIVITY_KEY = "last-activity-ms"; +const approvalResponseKey = (executionId: string) => `approval-response:${executionId}`; const INTERNAL_ACCOUNT_ID_HEADER = "x-executor-mcp-account-id"; const INTERNAL_ORGANIZATION_ID_HEADER = "x-executor-mcp-organization-id"; @@ -119,6 +178,8 @@ type SessionMeta = { readonly organizationId: string; readonly organizationName: string; readonly userId: string; + readonly elicitationMode?: "browser" | "model" | "native"; + readonly allowModelResume?: boolean; }; /** @@ -174,6 +235,7 @@ const makeSessionServices = (dbHandle: DbHandle) => makeResolveOrganizationServi const resolveSessionMeta = Effect.fn("McpSessionDO.resolveSessionMeta")(function* ( organizationId: string, userId: string, + elicitationMode: "browser" | "model" | "native", ) { const org = yield* resolveOrganization(organizationId); if (!org) { @@ -183,6 +245,7 @@ const resolveSessionMeta = Effect.fn("McpSessionDO.resolveSessionMeta")(function organizationId: org.id, organizationName: org.name, userId, + elicitationMode, } satisfies SessionMeta; }); @@ -194,11 +257,13 @@ export class McpSessionDO extends DurableObject { private readonly instanceCreatedAt = Date.now(); private mcpServer: McpServer | null = null; private transport: McpWorkerTransport | null = null; + private engine: ExecutionEngine | null = null; private initialized = false; private lastActivityMs = 0; private dbHandle: DbHandle | null = null; private sessionMeta: SessionMeta | null = null; private transportJsonResponseMode: boolean | null = null; + private approvalResponses = new Map(); // Updated at the start of each `handleRequest` so the host-mcp server's // `parentSpan` getter — invoked by the MCP SDK's deferred tool callbacks // after `transport.handleRequest()` has already returned its streaming @@ -298,11 +363,42 @@ export class McpSessionDO extends DurableObject { // `Effect.runPromise(engine.getDescription)` at its async // MCP-SDK boundary and orphan the sub-span. const description = yield* buildExecuteDescription(executor); + const sessionElicitationMode = + sessionMeta.elicitationMode ?? (sessionMeta.allowModelResume ? "model" : "browser"); const mcpServer = yield* createExecutorMcpServer({ engine, description, parentSpan: () => self.currentRequestSpan ?? undefined, debug: env.EXECUTOR_MCP_DEBUG === "true", + browserApprovalStore: { + takeResponse: (executionId) => + Effect.promise(async () => { + const memoryResponse = self.approvalResponses.get(executionId); + if (memoryResponse) { + self.approvalResponses.delete(executionId); + await self.ctx.storage.delete(approvalResponseKey(executionId)); + return memoryResponse; + } + const stored = await self.ctx.storage.get( + approvalResponseKey(executionId), + ); + if (!stored) return null; + await self.ctx.storage.delete(approvalResponseKey(executionId)); + return stored; + }), + }, + elicitationMode: + sessionElicitationMode === "browser" + ? { + mode: "browser" as const, + approvalUrl: (executionId) => { + const origin = env.VITE_PUBLIC_SITE_URL ?? "https://executor.sh"; + const url = new URL(`/resume/${encodeURIComponent(executionId)}`, origin); + url.searchParams.set("mcp_session_id", self.ctx.id.toString()); + return url.toString(); + }, + } + : { mode: sessionElicitationMode }, }).pipe(Effect.withSpan("McpSessionDO.createExecutorMcpServer")); const transport = yield* makeMcpWorkerTransport({ sessionIdGenerator: () => self.ctx.id.toString(), @@ -311,7 +407,7 @@ export class McpSessionDO extends DurableObject { }); self.transportJsonResponseMode = options.enableJsonResponse ?? false; yield* transport.connect(mcpServer); - return { mcpServer, transport }; + return { mcpServer, transport, engine }; }).pipe( Effect.withSpan("McpSessionDO.createRuntime"), Effect.provide(makeSessionServices(options.dbHandle)), @@ -331,6 +427,7 @@ export class McpSessionDO extends DurableObject { yield* Effect.promise(() => mcpServer.close().catch(() => undefined)); self.mcpServer = null; } + self.engine = null; if (self.dbHandle) { const dbHandle = self.dbHandle; yield* Effect.promise(() => dbHandle.end()); @@ -357,10 +454,56 @@ export class McpSessionDO extends DurableObject { self.dbHandle = options.dbHandle; self.mcpServer = runtime.mcpServer; self.transport = runtime.transport; + self.engine = runtime.engine; self.initialized = true; }); } + private ensureRuntimeForApproval(): Effect.Effect { + const self = this; + return Effect.gen(function* () { + if (self.initialized && self.engine) return true; + + const sessionMeta = yield* self.loadSessionMeta(); + if (!sessionMeta) return false; + + yield* self.closeRuntime(); + const dbHandle = makeLongLivedDb(); + yield* self.installRuntime(sessionMeta, { + dbHandle, + enableJsonResponse: true, + }); + yield* Effect.promise(() => self.markActivity()).pipe( + Effect.withSpan("McpSessionDO.markActivity"), + ); + return true; + }).pipe( + Effect.withSpan("McpSessionDO.ensure_runtime_for_approval"), + // oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: DO RPC has no typed Effect channel + Effect.orDie, + ); + } + + private validateApprovalIdentity( + identity: McpSessionApprovalIdentity, + ): Effect.Effect<"ok" | "not_found" | "forbidden"> { + const self = this; + return Effect.gen(function* () { + const sessionMeta = yield* self.loadSessionMeta(); + if (!sessionMeta) return "not_found" as const; + + const matches = + identity.accountId === sessionMeta.userId && + identity.organizationId === sessionMeta.organizationId; + + yield* Effect.annotateCurrentSpan({ + "mcp.session.owner_match": matches, + }); + + return matches ? ("ok" as const) : ("forbidden" as const); + }).pipe(Effect.withSpan("mcp.session.validate_approval_identity")); + } + private restoreRuntimeFromStorage(request: Request): Effect.Effect<"restored" | "missing_meta"> { const self = this; return Effect.gen(function* () { @@ -449,7 +592,11 @@ export class McpSessionDO extends DurableObject { const self = this; return Effect.gen(function* () { const dbHandle = makeEphemeralDb(); - return yield* resolveSessionMeta(token.organizationId, token.userId).pipe( + return yield* resolveSessionMeta( + token.organizationId, + token.userId, + token.elicitationMode ?? (token.allowModelResume ? "model" : "browser"), + ).pipe( Effect.provide(makeResolveOrganizationServices(dbHandle)), Effect.tap((sessionMeta) => Effect.promise(() => self.saveSessionMeta(sessionMeta)).pipe( @@ -507,6 +654,7 @@ export class McpSessionDO extends DurableObject { }); self.mcpServer = runtime.mcpServer; self.transport = runtime.transport; + self.engine = runtime.engine; self.initialized = true; yield* Effect.promise(() => self.markActivity()).pipe( @@ -579,6 +727,76 @@ export class McpSessionDO extends DurableObject { return Effect.runPromise(program); } + async getPausedExecutionForApproval( + executionId: string, + identity: McpSessionApprovalIdentity, + incoming?: IncomingTraceHeaders, + ): Promise { + const self = this; + return Effect.runPromise( + Effect.gen(function* () { + const owner = yield* self.validateApprovalIdentity(identity); + if (owner !== "ok") return { status: owner } as const; + + const restored = yield* self.ensureRuntimeForApproval(); + if (!restored || !self.engine) return { status: "not_found" } as const; + + const paused = yield* self.engine.getPausedExecution(executionId); + if (!paused) return { status: "not_found" } as const; + + const formatted = formatPausedExecution(paused); + return { + status: "ok" as const, + text: formatted.text, + structured: formatted.structured, + }; + }).pipe( + Effect.withSpan("McpSessionDO.getPausedExecutionForApproval", { + attributes: { "mcp.execution.id": executionId }, + }), + (eff) => withIncomingParent(incoming, eff), + Effect.provide(DoTelemetryLive), + // oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: DO RPC exposes Promise results + Effect.orDie, + ), + ); + } + + async resumeExecutionForApproval( + executionId: string, + identity: McpSessionApprovalIdentity, + response: ResumeResponse, + incoming?: IncomingTraceHeaders, + ): Promise { + const self = this; + return Effect.runPromise( + Effect.gen(function* () { + const owner = yield* self.validateApprovalIdentity(identity); + if (owner !== "ok") return { status: owner } as const; + + const restored = yield* self.ensureRuntimeForApproval(); + if (!restored || !self.engine) return { status: "not_found" } as const; + + const paused = yield* self.engine.getPausedExecution(executionId); + if (!paused) return { status: "not_found" } as const; + + self.approvalResponses.set(executionId, response); + yield* Effect.promise(() => + self.ctx.storage.put(approvalResponseKey(executionId), response), + ); + return resumeApprovalResult(executionId, response); + }).pipe( + Effect.withSpan("McpSessionDO.resumeExecutionForApproval", { + attributes: { "mcp.execution.id": executionId }, + }), + (eff) => withIncomingParent(incoming, eff), + Effect.provide(DoTelemetryLive), + // oxlint-disable-next-line executor/no-effect-escape-hatch -- boundary: DO RPC exposes Promise results + Effect.orDie, + ), + ); + } + private dispatchRequest(request: Request): Effect.Effect { const self = this; return Effect.gen(function* () { diff --git a/apps/cloud/src/mcp.ts b/apps/cloud/src/mcp.ts index c3b4983cf..55b46c344 100644 --- a/apps/cloud/src/mcp.ts +++ b/apps/cloud/src/mcp.ts @@ -69,6 +69,8 @@ const CORS_PREFLIGHT_HEADERS = { "access-control-expose-headers": "mcp-session-id", } as const; +const TRUE_QUERY_VALUES = new Set(["1", "true", "yes", "on"]); + const MCP_PATH = "/mcp"; const PROTECTED_RESOURCE_METADATA_PATH = "/.well-known/oauth-protected-resource/mcp"; const PROTECTED_RESOURCE_METADATA_URL = `${RESOURCE_ORIGIN}${PROTECTED_RESOURCE_METADATA_PATH}`; @@ -538,6 +540,25 @@ const withMcpResponseHeaders = (response: Response): Response => { }); }; +type McpElicitationMode = "browser" | "model" | "native"; + +const MCP_ELICITATION_MODES = new Set(["browser", "model", "native"]); + +const readElicitationMode = (request: Request): McpElicitationMode => { + const url = new URL(request.url); + const mode = url.searchParams.get("elicitation_mode"); + if (mode && MCP_ELICITATION_MODES.has(mode as McpElicitationMode)) { + return mode as McpElicitationMode; + } + + const legacyModelResume = url.searchParams.get("allow_model_resume"); + if (legacyModelResume !== null && TRUE_QUERY_VALUES.has(legacyModelResume.toLowerCase())) { + return "model"; + } + + return "browser"; +}; + /** * Forward a request to an existing session DO. Wrapping the DO's `Response` * with `HttpServerResponse.raw` lets streaming bodies (SSE) pass through @@ -630,7 +651,14 @@ const dispatchPost = (request: Request, token: VerifiedToken) => const stub = ns.get(ns.newUniqueId()); const propagation = yield* currentPropagationHeaders(request); yield* Effect.promise(() => - stub.init({ organizationId, userId: token.accountId }, propagation), + stub.init( + { + organizationId, + userId: token.accountId, + elicitationMode: readElicitationMode(request), + }, + propagation, + ), ).pipe( Effect.withSpan("mcp.do.init", { attributes: { "mcp.request.session_id_present": false }, diff --git a/apps/cloud/src/routeTree.gen.ts b/apps/cloud/src/routeTree.gen.ts index ecffa48b2..b4ee0efeb 100644 --- a/apps/cloud/src/routeTree.gen.ts +++ b/apps/cloud/src/routeTree.gen.ts @@ -20,6 +20,7 @@ import { Route as BillingRouteImport } from './routes/billing' import { Route as ApiKeysRouteImport } from './routes/api-keys' import { Route as IndexRouteImport } from './routes/index' import { Route as SourcesNamespaceRouteImport } from './routes/sources.$namespace' +import { Route as ResumeExecutionIdRouteImport } from './routes/resume.$executionId' import { Route as BillingPlansRouteImport } from './routes/billing_.plans' import { Route as SourcesAddPluginKeyRouteImport } from './routes/sources.add.$pluginKey' @@ -78,6 +79,11 @@ const SourcesNamespaceRoute = SourcesNamespaceRouteImport.update({ path: '/sources/$namespace', getParentRoute: () => rootRouteImport, } as any) +const ResumeExecutionIdRoute = ResumeExecutionIdRouteImport.update({ + id: '/resume/$executionId', + path: '/resume/$executionId', + getParentRoute: () => rootRouteImport, +} as any) const BillingPlansRoute = BillingPlansRouteImport.update({ id: '/billing_/plans', path: '/billing/plans', @@ -101,6 +107,7 @@ export interface FileRoutesByFullPath { '/setup-mcp': typeof SetupMcpRoute '/tools': typeof ToolsRoute '/billing/plans': typeof BillingPlansRoute + '/resume/$executionId': typeof ResumeExecutionIdRoute '/sources/$namespace': typeof SourcesNamespaceRoute '/sources/add/$pluginKey': typeof SourcesAddPluginKeyRoute } @@ -116,6 +123,7 @@ export interface FileRoutesByTo { '/setup-mcp': typeof SetupMcpRoute '/tools': typeof ToolsRoute '/billing/plans': typeof BillingPlansRoute + '/resume/$executionId': typeof ResumeExecutionIdRoute '/sources/$namespace': typeof SourcesNamespaceRoute '/sources/add/$pluginKey': typeof SourcesAddPluginKeyRoute } @@ -132,6 +140,7 @@ export interface FileRoutesById { '/setup-mcp': typeof SetupMcpRoute '/tools': typeof ToolsRoute '/billing_/plans': typeof BillingPlansRoute + '/resume/$executionId': typeof ResumeExecutionIdRoute '/sources/$namespace': typeof SourcesNamespaceRoute '/sources/add/$pluginKey': typeof SourcesAddPluginKeyRoute } @@ -149,6 +158,7 @@ export interface FileRouteTypes { | '/setup-mcp' | '/tools' | '/billing/plans' + | '/resume/$executionId' | '/sources/$namespace' | '/sources/add/$pluginKey' fileRoutesByTo: FileRoutesByTo @@ -164,6 +174,7 @@ export interface FileRouteTypes { | '/setup-mcp' | '/tools' | '/billing/plans' + | '/resume/$executionId' | '/sources/$namespace' | '/sources/add/$pluginKey' id: @@ -179,6 +190,7 @@ export interface FileRouteTypes { | '/setup-mcp' | '/tools' | '/billing_/plans' + | '/resume/$executionId' | '/sources/$namespace' | '/sources/add/$pluginKey' fileRoutesById: FileRoutesById @@ -195,6 +207,7 @@ export interface RootRouteChildren { SetupMcpRoute: typeof SetupMcpRoute ToolsRoute: typeof ToolsRoute BillingPlansRoute: typeof BillingPlansRoute + ResumeExecutionIdRoute: typeof ResumeExecutionIdRoute SourcesNamespaceRoute: typeof SourcesNamespaceRoute SourcesAddPluginKeyRoute: typeof SourcesAddPluginKeyRoute } @@ -278,6 +291,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof SourcesNamespaceRouteImport parentRoute: typeof rootRouteImport } + '/resume/$executionId': { + id: '/resume/$executionId' + path: '/resume/$executionId' + fullPath: '/resume/$executionId' + preLoaderRoute: typeof ResumeExecutionIdRouteImport + parentRoute: typeof rootRouteImport + } '/billing_/plans': { id: '/billing_/plans' path: '/billing/plans' @@ -307,6 +327,7 @@ const rootRouteChildren: RootRouteChildren = { SetupMcpRoute: SetupMcpRoute, ToolsRoute: ToolsRoute, BillingPlansRoute: BillingPlansRoute, + ResumeExecutionIdRoute: ResumeExecutionIdRoute, SourcesNamespaceRoute: SourcesNamespaceRoute, SourcesAddPluginKeyRoute: SourcesAddPluginKeyRoute, } diff --git a/apps/cloud/src/routes/resume.$executionId.tsx b/apps/cloud/src/routes/resume.$executionId.tsx new file mode 100644 index 000000000..17dec4d06 --- /dev/null +++ b/apps/cloud/src/routes/resume.$executionId.tsx @@ -0,0 +1,59 @@ +import { useCallback } from "react"; +import { useAtomSet, useAtomValue } from "@effect/atom-react"; +import { Schema } from "effect"; +import { createFileRoute } from "@tanstack/react-router"; +import { + ResumeApprovalPage, + ResumeApprovalPageView, +} from "@executor-js/react/pages/resume-approval"; + +import { mcpPausedExecutionAtom, resumeMcpExecution } from "../web/mcp-resume-atoms"; + +const SearchParams = Schema.toStandardSchemaV1( + Schema.Struct({ + mcp_session_id: Schema.optional(Schema.String), + }), +); + +export const Route = createFileRoute("/resume/$executionId")({ + validateSearch: SearchParams, + component: RouteComponent, +}); + +function RouteComponent() { + const { executionId } = Route.useParams(); + const { mcp_session_id: mcpSessionId } = Route.useSearch(); + if (mcpSessionId) { + return ; + } + return ; +} + +function CloudMcpResumeApproval(props: { executionId: string; mcpSessionId: string }) { + const paused = useAtomValue(mcpPausedExecutionAtom(props.mcpSessionId, props.executionId)); + const doResume = useAtomSet(resumeMcpExecution, { mode: "promiseExit" }); + const resume = useCallback( + ( + executionId: string, + action: "accept" | "decline" | "cancel", + content?: Record, + ) => + doResume({ + params: { + mcpSessionId: props.mcpSessionId, + executionId, + }, + payload: action === "accept" ? { action, content: content ?? {} } : { action }, + }), + [doResume, props.mcpSessionId], + ); + + return ( + + ); +} diff --git a/apps/cloud/src/web/mcp-resume-atoms.ts b/apps/cloud/src/web/mcp-resume-atoms.ts new file mode 100644 index 000000000..8519f5e40 --- /dev/null +++ b/apps/cloud/src/web/mcp-resume-atoms.ts @@ -0,0 +1,9 @@ +import { CloudApiClient } from "./client"; + +export const mcpPausedExecutionAtom = (mcpSessionId: string, executionId: string) => + CloudApiClient.query("cloudAuth", "getMcpPaused", { + params: { mcpSessionId, executionId }, + timeToLive: "5 seconds", + }); + +export const resumeMcpExecution = CloudApiClient.mutation("cloudAuth", "resumeMcpExecution"); diff --git a/apps/cloud/src/web/pages/setup-mcp.tsx b/apps/cloud/src/web/pages/setup-mcp.tsx index 376e0407a..f76b1a663 100644 --- a/apps/cloud/src/web/pages/setup-mcp.tsx +++ b/apps/cloud/src/web/pages/setup-mcp.tsx @@ -2,21 +2,44 @@ import { useEffect, useState } from "react"; import { useNavigate } from "@tanstack/react-router"; import { Button } from "@executor-js/react/components/button"; import { CodeBlock } from "@executor-js/react/components/code-block"; +import { + buildMcpHttpEndpoint, + buildMcpInstallCommand, + type McpElicitationMode, +} from "@executor-js/react/components/mcp-install-card"; import { CopyButton } from "@executor-js/react/components/copy-button"; - -const buildInstallCommand = (endpoint: string): string => - `npx add-mcp ${endpoint} --transport http --name executor`; +import { + Collapsible, + CollapsibleContent, + CollapsibleTrigger, +} from "@executor-js/react/components/collapsible"; +import { NativeSelect, NativeSelectOption } from "@executor-js/react/components/native-select"; export const SetupMcpPage = () => { const navigate = useNavigate(); const [origin, setOrigin] = useState(null); + const [advancedOpen, setAdvancedOpen] = useState(false); + const [elicitationMode, setElicitationMode] = useState("browser"); useEffect(() => { setOrigin(window.location.origin); }, []); - const endpoint = origin ? `${origin}/mcp` : ""; - const command = endpoint ? buildInstallCommand(endpoint) : ""; + const endpoint = origin + ? buildMcpHttpEndpoint({ + origin, + desktop: null, + elicitationMode, + }) + : ""; + const command = origin + ? buildMcpInstallCommand({ + mode: "http", + isDev: false, + origin, + elicitationMode, + }) + : ""; return (
@@ -45,6 +68,39 @@ export const SetupMcpPage = () => {

Paste this into your MCP client config.

+ + + Advanced + + + +
+
+
Resume approvals
+
+ Select how tool approvals are handled for this MCP connection. +
+
+ setElicitationMode(event.target.value as McpElicitationMode)} + aria-label="Elicitation mode" + className="min-w-44" + > + Browser approval + Model resume tool + Native elicitation + +
+
+
+
or diff --git a/apps/local/src/serve.test.ts b/apps/local/src/serve.test.ts index 2f364c42f..9aa471ec0 100644 --- a/apps/local/src/serve.test.ts +++ b/apps/local/src/serve.test.ts @@ -20,6 +20,7 @@ const startTestServer = async (): Promise => { }, mcp: { handleRequest: async () => new Response("ok"), + handleApprovalRequest: async () => new Response("ok"), close: async () => {}, }, }, @@ -78,6 +79,7 @@ describe("startServer network bind auth", () => { }, mcp: { handleRequest: async () => new Response("ok"), + handleApprovalRequest: async () => new Response("ok"), close: async () => {}, }, }, @@ -98,6 +100,7 @@ describe("startServer network bind auth", () => { }, mcp: { handleRequest: async () => new Response("ok"), + handleApprovalRequest: async () => new Response("ok"), close: async () => {}, }, }, diff --git a/apps/local/src/serve.ts b/apps/local/src/serve.ts index 187a3b12d..11aa5e3bd 100644 --- a/apps/local/src/serve.ts +++ b/apps/local/src/serve.ts @@ -168,6 +168,10 @@ export async function startServer(opts: StartServerOptions = {}): Promise ({ + id: "browser-resume-test" as const, + storage: () => ({}), + staticSources: () => [ + { + id: "api", + kind: "in-memory", + name: "API", + tools: [ + { + name: "singleApproval", + description: "Elicit exactly once and return the user's response.", + inputSchema: EmptyInputSchema, + handler: ({ elicit }) => + Effect.gen(function* () { + const response = yield* elicit( + FormElicitation.make({ + message: "Approve browser-resume test call", + requestedSchema: { + type: "object", + properties: { + note: { type: "string", title: "Approval note" }, + }, + required: ["note"], + }, + }), + ); + return { ok: true, response }; + }), + }, + ], + }, + ], +})); + +const makeExecutor = async (tmpDir: string): Promise => { + const plugins = [approvalPlugin()] as const; + const sqlite = await createSqliteFumaDb({ + tables: collectTables(plugins), + namespace: "executor_local_browser_resume_test", + path: join(tmpDir, "data.db"), + }); + const scope = Scope.make({ + id: ScopeId.make(`test-${randomBytes(4).toString("hex")}`), + name: "test", + createdAt: new Date(), + }); + const executor = await Effect.runPromise( + createExecutor({ + scopes: [scope], + db: sqlite.db, + plugins, + onElicitation: "accept-all", + }), + ); + + const close = executor.close; + return { + ...executor, + close: () => + Effect.gen(function* () { + yield* close(); + yield* Effect.promise(() => sqlite.close()); + }), + }; +}; + +const makeMcpFetch = (executor: Executor) => { + const engine = createExecutionEngine({ executor, codeExecutor: makeQuickJsExecutor() }); + const mcp = createMcpRequestHandler({ engine }); + + const fetchImpl: typeof globalThis.fetch = Object.assign( + (input: RequestInfo | URL, init?: RequestInit) => { + const request = input instanceof Request ? input : new Request(input, init); + const url = new URL(request.url); + if (url.pathname.startsWith("/mcp")) return mcp.handleRequest(request); + if (url.pathname.startsWith("/api/mcp-sessions/")) { + return mcp.handleApprovalRequest(request); + } + return Promise.resolve(new Response("Not found", { status: 404 })); + }, + { preconnect: globalThis.fetch.preconnect }, + ); + + return { fetch: fetchImpl, dispose: mcp.close }; +}; + +const readStructuredRecord = (value: unknown): Record => { + expect(typeof value).toBe("object"); + expect(value).not.toBeNull(); + return value as Record; +}; + +const readApproval = (structured: unknown): { readonly executionId: string; readonly url: URL } => { + const record = readStructuredRecord(structured); + expect(record.status).toBe("user_approval_required"); + expect(record).not.toHaveProperty("interaction"); + expect(typeof record.executionId).toBe("string"); + expect(typeof record.approvalUrl).toBe("string"); + const { executionId, approvalUrl } = record as { + readonly executionId: string; + readonly approvalUrl: string; + }; + return { + executionId, + url: new URL(approvalUrl), + }; +}; + +const readResultArray = (structured: unknown): ReadonlyArray => { + const record = readStructuredRecord(structured); + expect(record.status).toBe("completed"); + expect(Array.isArray(record.result)).toBe(true); + return record.result as ReadonlyArray; +}; + +describe("local MCP browser approval resume", () => { + it("continues multiple elicitations from one MCP execute call through the HTTP API path", async () => { + const tmpDir = mkdtempSync(join(tmpdir(), "executor-local-browser-resume-")); + const executor = await makeExecutor(tmpDir); + const { fetch, dispose } = makeMcpFetch(executor); + const mcpClient = new Client( + { name: "browser-resume-test-client", version: "1.0.0" }, + { capabilities: {} }, + ); + const transport = new StreamableHTTPClientTransport(new URL("/mcp", TEST_BASE_URL), { fetch }); + + await mcpClient.connect(transport); + + // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: test owns MCP transports, web handler, and executor lifecycle + try { + const first = await mcpClient.callTool({ + name: "execute", + arguments: { + code: ` + return await Promise.all([ + tools.api.singleApproval({}), + tools.api.singleApproval({}), + tools.api.singleApproval({}) + ]); + `, + }, + }); + + expect(first.isError).toBeFalsy(); + const firstApproval = readApproval(first.structuredContent); + + const second = await approveInBrowserThenResume(fetch, mcpClient, firstApproval); + const secondApproval = readApproval(second.structuredContent); + expect(secondApproval.executionId).not.toBe(firstApproval.executionId); + + const third = await approveInBrowserThenResume(fetch, mcpClient, secondApproval); + const thirdApproval = readApproval(third.structuredContent); + expect(thirdApproval.executionId).not.toBe(firstApproval.executionId); + expect(thirdApproval.executionId).not.toBe(secondApproval.executionId); + + const completed = await approveInBrowserThenResume(fetch, mcpClient, thirdApproval); + const result = readResultArray(completed.structuredContent); + expect(result).toHaveLength(3); + expect(result).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + data: expect.objectContaining({ + response: expect.objectContaining({ + action: "accept", + content: expect.objectContaining({ note: expect.stringContaining("approved-") }), + }), + }), + }), + ]), + ); + } finally { + await mcpClient.close(); + await Effect.runPromise(Effect.ignore(Effect.tryPromise(() => dispose()))); + await Effect.runPromise( + Effect.ignore(Effect.tryPromise(() => Effect.runPromise(executor.close()))), + ); + rmSync(tmpDir, { recursive: true, force: true }); + } + }, 10_000); +}); + +const approveInBrowserThenResume = async ( + fetch: typeof globalThis.fetch, + client: Client, + approval: { readonly executionId: string; readonly url: URL }, +) => { + const sessionId = approval.url.searchParams.get("mcp_session_id"); + expect(sessionId).not.toBeNull(); + + const approvalResponse = await fetch( + new URL( + `/api/mcp-sessions/${encodeURIComponent(sessionId!)}/executions/${encodeURIComponent(approval.executionId)}/resume`, + TEST_BASE_URL, + ), + { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + action: "accept", + content: { note: `approved-${approval.executionId}` }, + }), + }, + ); + expect(approvalResponse.status).toBe(200); + + return await client.callTool({ + name: "resume", + arguments: { executionId: approval.executionId }, + }); +}; diff --git a/apps/local/src/server/mcp.ts b/apps/local/src/server/mcp.ts index b17c800bb..000f05508 100644 --- a/apps/local/src/server/mcp.ts +++ b/apps/local/src/server/mcp.ts @@ -1,9 +1,10 @@ -import { Effect } from "effect"; +import { Effect, Option, Schema } from "effect"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { WebStandardStreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js"; import { createExecutorMcpServer, type ExecutorMcpServerConfig } from "@executor-js/host-mcp"; +import type { ResumeResponse } from "@executor-js/execution"; import { startIntegrationsRefresh } from "./integrations"; @@ -13,6 +14,7 @@ import { startIntegrationsRefresh } from "./integrations"; export type McpRequestHandler = { readonly handleRequest: (request: Request) => Promise; + readonly handleApprovalRequest: (request: Request) => Promise; readonly close: () => Promise; }; @@ -28,6 +30,35 @@ const formatBoundaryError = (error: unknown): unknown => { return error; }; +type McpElicitationMode = "browser" | "model" | "native"; + +const MCP_ELICITATION_MODES = new Set(["browser", "model", "native"]); +const ResumeResponsePayload = Schema.Struct({ + action: Schema.Literals(["accept", "decline", "cancel"]), + content: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)), +}); +const decodeResumeResponsePayload = Schema.decodeUnknownOption(ResumeResponsePayload); + +const readElicitationMode = (request: Request): McpElicitationMode => { + const url = new URL(request.url); + const mode = url.searchParams.get("elicitation_mode"); + if (mode && MCP_ELICITATION_MODES.has(mode as McpElicitationMode)) { + return mode as McpElicitationMode; + } + + return "browser"; +}; + +const approvalUrlForRequest = ( + request: Request, + executionId: string, + sessionId: string | null, +): string => { + const url = new URL(`/resume/${encodeURIComponent(executionId)}`, request.url); + if (sessionId) url.searchParams.set("mcp_session_id", sessionId); + return url.toString(); +}; + const ignoreClose = (close: (() => Promise) | undefined): Promise => close ? Effect.runPromise( @@ -40,15 +71,57 @@ const ignoreClose = (close: (() => Promise) | undefined): Promise => ) : Promise.resolve(); +const approvalRequestPattern = /^\/api\/mcp-sessions\/([^/?#]+)\/executions\/([^/?#]+)\/resume$/; + +const json = (value: unknown, status = 200): Response => + new Response(JSON.stringify(value), { + status, + headers: { "content-type": "application/json" }, + }); + +const readResumeResponse = (request: Request): Promise => + Effect.runPromise( + Effect.tryPromise({ + try: () => request.json(), + catch: () => null, + }).pipe( + Effect.map((raw) => + raw === null ? null : Option.getOrNull(decodeResumeResponsePayload(raw)), + ), + ), + ); + +const resumeApprovalResult = (executionId: string, response: ResumeResponse) => { + const textByAction = { + accept: "I've approved it", + decline: "I've denied it", + cancel: "I've canceled it", + } satisfies Record; + const statusByAction = { + accept: "approved", + decline: "denied", + cancel: "canceled", + } satisfies Record; + + return { + status: "completed", + text: textByAction[response.action], + structured: { status: statusByAction[response.action], executionId }, + isError: false, + }; +}; + export const createMcpRequestHandler = (config: ExecutorMcpServerConfig): McpRequestHandler => { const transports = new Map(); const servers = new Map(); + const approvalResponses = new Map>(); const dispose = async (id: string, opts: { transport?: boolean; server?: boolean } = {}) => { const t = transports.get(id); const s = servers.get(id); transports.delete(id); servers.delete(id); + approvalResponses.delete(id); if (opts.transport) await ignoreClose(t ? () => t.close() : undefined); if (opts.server) await ignoreClose(s ? () => s.close() : undefined); }; @@ -64,10 +137,12 @@ export const createMcpRequestHandler = (config: ExecutorMcpServerConfig): McpReq } let created: McpServer | undefined; + let createdSessionId: string | null = null; const transport = new WebStandardStreamableHTTPServerTransport({ sessionIdGenerator: () => crypto.randomUUID(), enableJsonResponse: true, onsessioninitialized: (sid) => { + createdSessionId = sid; transports.set(sid, transport); if (created) servers.set(sid, created); }, @@ -81,7 +156,30 @@ export const createMcpRequestHandler = (config: ExecutorMcpServerConfig): McpReq // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: MCP SDK handler must return JSON-RPC errors from thrown Promise APIs try { - created = await Effect.runPromise(createExecutorMcpServer(config)); + const elicitationMode = readElicitationMode(request); + created = await Effect.runPromise( + createExecutorMcpServer({ + ...config, + browserApprovalStore: { + takeResponse: (executionId) => + Effect.sync(() => { + if (!createdSessionId) return null; + const sessionApprovals = approvalResponses.get(createdSessionId); + const response = sessionApprovals?.get(executionId) ?? null; + sessionApprovals?.delete(executionId); + return response; + }), + }, + elicitationMode: + elicitationMode === "browser" + ? { + mode: "browser" as const, + approvalUrl: (executionId) => + approvalUrlForRequest(request, executionId, createdSessionId), + } + : { mode: elicitationMode }, + }), + ); await created.connect(transport); const response = await transport.handleRequest(request); @@ -102,6 +200,27 @@ export const createMcpRequestHandler = (config: ExecutorMcpServerConfig): McpReq } }, + handleApprovalRequest: async (request) => { + const url = new URL(request.url); + const match = approvalRequestPattern.exec(url.pathname); + if (!match) return json({ error: "Not found" }, 404); + if (request.method !== "POST") return json({ error: "Method not allowed" }, 405); + + const sessionId = decodeURIComponent(match[1]); + const executionId = decodeURIComponent(match[2]); + if (!servers.has(sessionId)) return json({ error: "MCP session not found" }, 404); + + const response = await readResumeResponse(request); + if (!response) return json({ error: "Invalid approval response" }, 400); + + const sessionApprovals = + approvalResponses.get(sessionId) ?? new Map(); + sessionApprovals.set(executionId, response); + approvalResponses.set(sessionId, sessionApprovals); + + return json(resumeApprovalResult(executionId, response)); + }, + close: async () => { const ids = new Set([...transports.keys(), ...servers.keys()]); await Promise.all([...ids].map((id) => dispose(id, { transport: true, server: true }))); diff --git a/packages/app/src/routeTree.gen.ts b/packages/app/src/routeTree.gen.ts index d238fcfda..706fc9d11 100644 --- a/packages/app/src/routeTree.gen.ts +++ b/packages/app/src/routeTree.gen.ts @@ -15,6 +15,7 @@ import { Route as PoliciesRouteImport } from './routes/policies' import { Route as ConnectionsRouteImport } from './routes/connections' import { Route as IndexRouteImport } from './routes/index' import { Route as SourcesNamespaceRouteImport } from './routes/sources.$namespace' +import { Route as ResumeExecutionIdRouteImport } from './routes/resume.$executionId' import { Route as SourcesAddPluginKeyRouteImport } from './routes/sources.add.$pluginKey' import { Route as PluginsPluginIdSplatRouteImport } from './routes/plugins.$pluginId.$' @@ -48,6 +49,11 @@ const SourcesNamespaceRoute = SourcesNamespaceRouteImport.update({ path: '/sources/$namespace', getParentRoute: () => rootRouteImport, } as any) +const ResumeExecutionIdRoute = ResumeExecutionIdRouteImport.update({ + id: '/resume/$executionId', + path: '/resume/$executionId', + getParentRoute: () => rootRouteImport, +} as any) const SourcesAddPluginKeyRoute = SourcesAddPluginKeyRouteImport.update({ id: '/sources/add/$pluginKey', path: '/sources/add/$pluginKey', @@ -65,6 +71,7 @@ export interface FileRoutesByFullPath { '/policies': typeof PoliciesRoute '/secrets': typeof SecretsRoute '/tools': typeof ToolsRoute + '/resume/$executionId': typeof ResumeExecutionIdRoute '/sources/$namespace': typeof SourcesNamespaceRoute '/plugins/$pluginId/$': typeof PluginsPluginIdSplatRoute '/sources/add/$pluginKey': typeof SourcesAddPluginKeyRoute @@ -75,6 +82,7 @@ export interface FileRoutesByTo { '/policies': typeof PoliciesRoute '/secrets': typeof SecretsRoute '/tools': typeof ToolsRoute + '/resume/$executionId': typeof ResumeExecutionIdRoute '/sources/$namespace': typeof SourcesNamespaceRoute '/plugins/$pluginId/$': typeof PluginsPluginIdSplatRoute '/sources/add/$pluginKey': typeof SourcesAddPluginKeyRoute @@ -86,6 +94,7 @@ export interface FileRoutesById { '/policies': typeof PoliciesRoute '/secrets': typeof SecretsRoute '/tools': typeof ToolsRoute + '/resume/$executionId': typeof ResumeExecutionIdRoute '/sources/$namespace': typeof SourcesNamespaceRoute '/plugins/$pluginId/$': typeof PluginsPluginIdSplatRoute '/sources/add/$pluginKey': typeof SourcesAddPluginKeyRoute @@ -98,6 +107,7 @@ export interface FileRouteTypes { | '/policies' | '/secrets' | '/tools' + | '/resume/$executionId' | '/sources/$namespace' | '/plugins/$pluginId/$' | '/sources/add/$pluginKey' @@ -108,6 +118,7 @@ export interface FileRouteTypes { | '/policies' | '/secrets' | '/tools' + | '/resume/$executionId' | '/sources/$namespace' | '/plugins/$pluginId/$' | '/sources/add/$pluginKey' @@ -118,6 +129,7 @@ export interface FileRouteTypes { | '/policies' | '/secrets' | '/tools' + | '/resume/$executionId' | '/sources/$namespace' | '/plugins/$pluginId/$' | '/sources/add/$pluginKey' @@ -129,6 +141,7 @@ export interface RootRouteChildren { PoliciesRoute: typeof PoliciesRoute SecretsRoute: typeof SecretsRoute ToolsRoute: typeof ToolsRoute + ResumeExecutionIdRoute: typeof ResumeExecutionIdRoute SourcesNamespaceRoute: typeof SourcesNamespaceRoute PluginsPluginIdSplatRoute: typeof PluginsPluginIdSplatRoute SourcesAddPluginKeyRoute: typeof SourcesAddPluginKeyRoute @@ -178,6 +191,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof SourcesNamespaceRouteImport parentRoute: typeof rootRouteImport } + '/resume/$executionId': { + id: '/resume/$executionId' + path: '/resume/$executionId' + fullPath: '/resume/$executionId' + preLoaderRoute: typeof ResumeExecutionIdRouteImport + parentRoute: typeof rootRouteImport + } '/sources/add/$pluginKey': { id: '/sources/add/$pluginKey' path: '/sources/add/$pluginKey' @@ -201,6 +221,7 @@ const rootRouteChildren: RootRouteChildren = { PoliciesRoute: PoliciesRoute, SecretsRoute: SecretsRoute, ToolsRoute: ToolsRoute, + ResumeExecutionIdRoute: ResumeExecutionIdRoute, SourcesNamespaceRoute: SourcesNamespaceRoute, PluginsPluginIdSplatRoute: PluginsPluginIdSplatRoute, SourcesAddPluginKeyRoute: SourcesAddPluginKeyRoute, diff --git a/packages/app/src/routes/resume.$executionId.tsx b/packages/app/src/routes/resume.$executionId.tsx new file mode 100644 index 000000000..32a84347b --- /dev/null +++ b/packages/app/src/routes/resume.$executionId.tsx @@ -0,0 +1,117 @@ +import { useCallback } from "react"; +import { useAtomSet, useAtomValue } from "@effect/atom-react"; +import { Data, Effect, Option, Schema } from "effect"; +import * as Atom from "effect/unstable/reactivity/Atom"; +import { createFileRoute } from "@tanstack/react-router"; +import { + ResumeApprovalPage, + ResumeApprovalPageView, +} from "@executor-js/react/pages/resume-approval"; +import { pausedExecutionAtom } from "@executor-js/react/api/atoms"; +import type { ElicitationAction } from "@executor-js/react/components/elicitation-approval"; + +const SearchParams = Schema.toStandardSchemaV1( + Schema.Struct({ + mcp_session_id: Schema.optional(Schema.String), + }), +); +const LocalMcpResumeCompleted = Schema.Struct({ + status: Schema.Literal("completed"), + text: Schema.String, + structured: Schema.Unknown, + isError: Schema.Boolean, +}); +const LocalMcpResumePaused = Schema.Struct({ + status: Schema.Literal("paused"), + text: Schema.String, + structured: Schema.Unknown, +}); +const LocalMcpResumeResult = Schema.Union([LocalMcpResumeCompleted, LocalMcpResumePaused]); +const decodeLocalMcpResumeResult = Schema.decodeUnknownOption(LocalMcpResumeResult); + +class LocalMcpResumeError extends Data.TaggedError("LocalMcpResumeError")<{ + readonly message: string; +}> {} + +type LocalMcpResumeInput = { + readonly mcpSessionId: string; + readonly executionId: string; + readonly action: ElicitationAction; + readonly content?: Record; +}; + +const resumeLocalMcpExecution = Atom.fn()((input) => + Effect.gen(function* () { + const response = yield* Effect.tryPromise({ + try: () => + fetch( + `/api/mcp-sessions/${encodeURIComponent(input.mcpSessionId)}/executions/${encodeURIComponent(input.executionId)}/resume`, + { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify( + input.action === "accept" + ? { action: input.action, content: input.content ?? {} } + : { action: input.action }, + ), + }, + ), + catch: () => new LocalMcpResumeError({ message: "Failed to submit approval." }), + }); + + if (!response.ok) { + const body = yield* Effect.tryPromise({ + try: () => response.text(), + catch: () => "", + }).pipe(Effect.orElseSucceed(() => "")); + return yield* new LocalMcpResumeError({ + message: body || `Approval request failed (${response.status}).`, + }); + } + + const body = yield* Effect.tryPromise({ + try: () => response.json(), + catch: () => new LocalMcpResumeError({ message: "Approval response was not valid JSON." }), + }); + const result = decodeLocalMcpResumeResult(body); + if (Option.isNone(result)) { + return yield* new LocalMcpResumeError({ + message: "Approval response had an unexpected shape.", + }); + } + return result.value; + }), +); + +export const Route = createFileRoute("/resume/$executionId")({ + validateSearch: SearchParams, + component: RouteComponent, +}); + +function RouteComponent() { + const { executionId } = Route.useParams(); + const { mcp_session_id: mcpSessionId } = Route.useSearch(); + if (mcpSessionId) { + return ; + } + return ; +} + +function LocalMcpResumeApproval(props: { executionId: string; mcpSessionId: string }) { + const paused = useAtomValue(pausedExecutionAtom(props.executionId)); + const doResume = useAtomSet(resumeLocalMcpExecution, { mode: "promiseExit" }); + const resume = useCallback( + (executionId: string, action: ElicitationAction, content?: Record) => + doResume({ mcpSessionId: props.mcpSessionId, executionId, action, content }), + [doResume, props.mcpSessionId], + ); + + return ( + + ); +} diff --git a/packages/core/api/src/executions/api.ts b/packages/core/api/src/executions/api.ts index 81bcac944..ad6eb4067 100644 --- a/packages/core/api/src/executions/api.ts +++ b/packages/core/api/src/executions/api.ts @@ -31,10 +31,11 @@ const ResumeRequest = Schema.Struct({ content: Schema.optional(Schema.Unknown), }); -const ResumeResponse = Schema.Struct({ +const ResumeResponse = Schema.Union([CompletedResult, PausedResult]); + +const PausedExecutionInfo = Schema.Struct({ text: Schema.String, structured: Schema.Unknown, - isError: Schema.Boolean, }); const ExecutionNotFoundError = Schema.TaggedStruct("ExecutionNotFoundError", { @@ -52,6 +53,13 @@ const ExecutionParams = { executionId: Schema.String }; // --------------------------------------------------------------------------- export const ExecutionsApi = HttpApiGroup.make("executions") + .add( + HttpApiEndpoint.get("getPaused", "/executions/:executionId", { + params: ExecutionParams, + success: PausedExecutionInfo, + error: [InternalError, ExecutionNotFoundError], + }), + ) .add( HttpApiEndpoint.post("execute", "/executions", { payload: ExecuteRequest, diff --git a/packages/core/api/src/handlers/executions.ts b/packages/core/api/src/handlers/executions.ts index b6ed076e9..86e1c17ac 100644 --- a/packages/core/api/src/handlers/executions.ts +++ b/packages/core/api/src/handlers/executions.ts @@ -1,13 +1,35 @@ import { HttpApiBuilder } from "effect/unstable/httpapi"; import { Effect } from "effect"; +import { Schema } from "effect"; import { ExecutorApi } from "../api"; import { formatExecuteResult, formatPausedExecution } from "@executor-js/execution"; import { ExecutionEngineService } from "../services"; import { capture, captureEngineError } from "@executor-js/api"; +class ExecutionNotFoundError extends Schema.TaggedErrorClass()( + "ExecutionNotFoundError", + { + executionId: Schema.String, + }, +) {} + export const ExecutionsHandlers = HttpApiBuilder.group(ExecutorApi, "executions", (handlers) => handlers + .handle("getPaused", ({ params: path }) => + capture( + Effect.gen(function* () { + const engine = yield* ExecutionEngineService; + const paused = yield* captureEngineError(engine.getPausedExecution(path.executionId)); + + if (!paused) { + return yield* new ExecutionNotFoundError({ executionId: path.executionId }); + } + + return formatPausedExecution(paused); + }), + ), + ) .handle("execute", ({ payload }) => capture( Effect.gen(function* () { @@ -45,15 +67,13 @@ export const ExecutionsHandlers = HttpApiBuilder.group(ExecutorApi, "executions" ); if (!result) { - return yield* Effect.fail({ - _tag: "ExecutionNotFoundError" as const, - executionId: path.executionId, - }); + return yield* new ExecutionNotFoundError({ executionId: path.executionId }); } if (result.status === "completed") { const formatted = formatExecuteResult(result.result); return { + status: "completed" as const, text: formatted.text, structured: formatted.structured, isError: formatted.isError, @@ -62,9 +82,9 @@ export const ExecutionsHandlers = HttpApiBuilder.group(ExecutorApi, "executions" const formatted = formatPausedExecution(result.execution); return { + status: "paused" as const, text: formatted.text, structured: formatted.structured, - isError: false, }; }), ), diff --git a/packages/core/execution/src/engine.ts b/packages/core/execution/src/engine.ts index 793a1ded4..769cda661 100644 --- a/packages/core/execution/src/engine.ts +++ b/packages/core/execution/src/engine.ts @@ -1,4 +1,4 @@ -import { Deferred, Effect, Fiber, Predicate, Ref } from "effect"; +import { Deferred, Effect, Fiber, Predicate, Queue } from "effect"; import type * as Cause from "effect/Cause"; import type { @@ -42,7 +42,7 @@ export type PausedExecution = { type InternalPausedExecution = PausedExecution & { readonly response: Deferred.Deferred; readonly fiber: Fiber.Fiber; - readonly pauseSignalRef: Ref.Ref>>; + readonly pauseQueue: Queue.Queue>; }; export type ResumeResponse = { @@ -129,6 +129,8 @@ export const formatPausedExecution = ( interaction: { kind: isUrlElicitation ? "url" : "form", message: req.message, + toolId: String(paused.elicitationContext.toolId), + args: paused.elicitationContext.args, ...(isUrlElicitation ? { url: req.url } : {}), ...(isFormElicitation ? { requestedSchema: req.requestedSchema } : {}), }, @@ -334,6 +336,12 @@ export type ExecutionEngine response: ResumeResponse, ) => Effect.Effect; + /** + * Inspect a paused execution without resuming it. Returns null if the id is + * unknown or has already been resumed. + */ + readonly getPausedExecution: (executionId: string) => Effect.Effect; + /** * Get the dynamic tool description (workflow + namespaces). */ @@ -348,7 +356,7 @@ export const createExecutionEngine = , - pauseSignal: Deferred.Deferred>, + pauseQueue: Queue.Queue>, ): Effect.Effect => Effect.raceFirst( Fiber.join(fiber).pipe( Effect.map((result): ExecutionResult => ({ status: "completed", result })), ), - Deferred.await(pauseSignal).pipe( + Queue.take(pauseQueue).pipe( Effect.map((paused): ExecutionResult => ({ status: "paused", execution: paused })), ), ); @@ -385,10 +393,9 @@ export const createExecutionEngine = >()); + // Queue preserves pauses that arrive before the previous approval has + // returned to the caller, which can happen with concurrent tool calls. + const pauseQueue = yield* Queue.unbounded>(); // Will be set once the fiber is forked. let fiber: Fiber.Fiber; @@ -403,12 +410,11 @@ export const createExecutionEngine = >(); - yield* Ref.set(paused.pauseSignalRef, nextSignal); - yield* Deferred.succeed(paused.response, { action: response.action as typeof ElicitationResponse.Type.action, content: response.content, }); - return (yield* awaitCompletionOrPause(paused.fiber, nextSignal)) as ExecutionResult; + return (yield* awaitCompletionOrPause(paused.fiber, paused.pauseQueue)) as ExecutionResult; }); /** @@ -475,6 +474,8 @@ export const createExecutionEngine = + Effect.sync(() => pausedExecutions.get(executionId) ?? null), getDescription: buildExecuteDescription(executor), }; }; diff --git a/packages/core/execution/src/promise.ts b/packages/core/execution/src/promise.ts index 453ed1a9f..d3dd385d2 100644 --- a/packages/core/execution/src/promise.ts +++ b/packages/core/execution/src/promise.ts @@ -47,6 +47,7 @@ export type ExecutionEngine = { executionId: string, response: ResumeResponse, ) => Promise; + readonly getPausedExecution: (executionId: string) => Promise; readonly getDescription: () => Promise; }; @@ -162,6 +163,7 @@ export const toPromiseExecutionEngine = ( ), executeWithPause: (code) => Effect.runPromise(engine.executeWithPause(code)), resume: (executionId, response) => Effect.runPromise(engine.resume(executionId, response)), + getPausedExecution: (executionId) => Effect.runPromise(engine.getPausedExecution(executionId)), getDescription: () => Effect.runPromise(engine.getDescription), }); diff --git a/packages/core/execution/src/tool-invoker.test.ts b/packages/core/execution/src/tool-invoker.test.ts index dca052997..0f79a80ee 100644 --- a/packages/core/execution/src/tool-invoker.test.ts +++ b/packages/core/execution/src/tool-invoker.test.ts @@ -808,6 +808,56 @@ describe("pause/resume with multiple elicitations", () => { { timeout: 10000 }, ); + it.effect( + "resume drains concurrent elicitations that were queued before the first approval", + () => + Effect.gen(function* () { + const executor = yield* makeElicitingExecutor(); + const engine = createExecutionEngine({ executor, codeExecutor }); + + const code = ` + return await Promise.all([ + tools.api.singleApproval({}), + tools.api.singleApproval({}), + tools.api.singleApproval({}) + ]); + `; + + const outcome1 = yield* engine.executeWithPause(code); + expect(outcome1.status).toBe("paused"); + const paused1 = outcome1 as Extract; + + const outcome2 = yield* Effect.race( + engine + .resume(paused1.execution.id, { action: "accept" }) + .pipe(Effect.map((outcome) => ({ kind: "resumed" as const, outcome }))), + Effect.sleep("2 seconds").pipe(Effect.as({ kind: "hung" as const })), + ); + + expect(outcome2.kind).toBe("resumed"); + if (outcome2.kind !== "resumed") return; + expect(outcome2.outcome?.status).toBe("paused"); + const paused2 = outcome2.outcome as Extract< + NonNullable, + { status: "paused" } + >; + + const outcome3 = yield* engine.resume(paused2.execution.id, { action: "accept" }); + expect(outcome3?.status).toBe("paused"); + const paused3 = outcome3 as Extract, { status: "paused" }>; + + const outcome4 = yield* engine.resume(paused3.execution.id, { action: "accept" }); + expect(outcome4?.status).toBe("completed"); + const completed = outcome4 as Extract< + NonNullable, + { status: "completed" } + >; + expect(completed.result.error).toBeUndefined(); + expect(completed.result.result).toHaveLength(3); + }), + { timeout: 10000 }, + ); + // Regression: use separate top-level runPromise calls to match HTTP/CLI // pause/resume, and a single-elicit tool so no later pause can mask a dead // sandbox fiber. diff --git a/packages/hosts/mcp/src/server.test.ts b/packages/hosts/mcp/src/server.test.ts index 606487659..8814352a8 100644 --- a/packages/hosts/mcp/src/server.test.ts +++ b/packages/hosts/mcp/src/server.test.ts @@ -9,7 +9,7 @@ import type * as Cause from "effect/Cause"; import { FormElicitation, ToolId, UrlElicitation } from "@executor-js/sdk"; import type { ExecutionEngine, ExecutionResult } from "@executor-js/execution"; -import { createExecutorMcpServer } from "./server"; +import { createExecutorMcpServer, type ExecutorMcpServerConfig } from "./server"; // --------------------------------------------------------------------------- // Helpers @@ -30,6 +30,7 @@ const makeStubEngine = (overrides: { overrides.executeWithPause ?? (() => Effect.succeed({ status: "completed", result: { result: "default" } })), resume: overrides.resume ?? (() => Effect.succeed(null)), + getPausedExecution: () => Effect.succeed(null), getDescription: Effect.succeed(overrides.description ?? "test executor"), }); @@ -38,8 +39,9 @@ const withClient = async ( engine: ExecutionEngine, capabilities: ClientCapabilities, fn: (client: Client) => Promise, + config?: Pick, "debug" | "elicitationMode" | "browserApprovalStore">, ) => { - const mcpServer = await Effect.runPromise(createExecutorMcpServer({ engine })); + const mcpServer = await Effect.runPromise(createExecutorMcpServer({ engine, ...config })); const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); const client = new Client({ name: "test-client", version: "1.0.0" }, { capabilities }); await mcpServer.connect(serverTransport); @@ -53,6 +55,12 @@ const withClient = async ( } }; +const withNativeClient = async ( + engine: ExecutionEngine, + capabilities: ClientCapabilities, + fn: (client: Client) => Promise, +) => withClient(engine, capabilities, fn, { elicitationMode: { mode: "native" } }); + const ELICITATION_CAPS: ClientCapabilities = { elicitation: { form: {}, url: {} }, }; @@ -97,16 +105,16 @@ const makeElicitingEngine = ( }); // --------------------------------------------------------------------------- -// Client WITH elicitation support (managed / inline path) +// Explicit native elicitation mode // --------------------------------------------------------------------------- -describe("MCP host server — client with elicitation", () => { +describe("MCP host server — native elicitation mode", () => { it("execute tool calls engine.execute and returns result", async () => { const engine = makeStubEngine({ execute: (code) => Effect.succeed({ result: `ran: ${code}` }), }); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { const result = await client.callTool({ name: "execute", arguments: { code: "1+1" }, @@ -121,7 +129,7 @@ describe("MCP host server — client with elicitation", () => { execute: () => Effect.fail(new TestExecutionError({ message: "Unexpected token ':'" })), }); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { const result = await client.callTool({ name: "execute", arguments: { code: "const x: any = 1;" }, @@ -141,7 +149,7 @@ describe("MCP host server — client with elicitation", () => { execute: () => Effect.die(new Error("secret internal detail")), }); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { const result = await client.callTool({ name: "execute", arguments: { code: "run" }, @@ -172,7 +180,7 @@ describe("MCP host server — client with elicitation", () => { (r) => (r.action === "accept" && r.content?.approved ? "approved" : "denied"), ); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { client.setRequestHandler(ElicitRequestSchema, async () => ({ action: "accept" as const, content: { approved: true }, @@ -192,7 +200,7 @@ describe("MCP host server — client with elicitation", () => { (r) => `action:${r.action}`, ); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { client.setRequestHandler(ElicitRequestSchema, async () => ({ action: "decline" as const, content: {}, @@ -206,13 +214,68 @@ describe("MCP host server — client with elicitation", () => { }); }); + it("browser approval mode does not auto-switch to native elicitation", async () => { + let approvalUrlCalled = false; + let executeCalled = false; + const engine = makeStubEngine({ + execute: () => + Effect.sync(() => { + executeCalled = true; + return { result: "should-not-run" }; + }), + executeWithPause: () => + Effect.sync(() => { + return makePausedResult( + "exec_browser_1", + FormElicitation.make({ message: "Paused", requestedSchema: {} }), + ); + }), + }); + + await withClient( + engine, + ELICITATION_CAPS, + async (client) => { + client.setRequestHandler(ElicitRequestSchema, async () => ({ + action: "accept" as const, + content: {}, + })); + + const { tools } = await client.listTools(); + expect(tools.map((t) => t.name)).toContain("resume"); + + const result = await client.callTool({ + name: "execute", + arguments: { code: "needs-inline-approval" }, + }); + expect(result.structuredContent).toMatchObject({ + status: "user_approval_required", + executionId: "exec_browser_1", + approvalUrl: "https://executor.test/resume/exec_browser_1", + }); + expect(result.structuredContent).not.toHaveProperty("interaction"); + expect(executeCalled).toBe(false); + expect(approvalUrlCalled).toBe(true); + }, + { + elicitationMode: { + mode: "browser", + approvalUrl: (executionId) => { + approvalUrlCalled = true; + return `https://executor.test/resume/${executionId}`; + }, + }, + }, + ); + }); + it("empty form schema gets wrapped with minimal valid schema", async () => { let receivedSchema: unknown; const engine = makeElicitingEngine( FormElicitation.make({ message: "Just approve", requestedSchema: {} }), ); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { client.setRequestHandler(ElicitRequestSchema, async (request) => { const params = request.params; if ("requestedSchema" in params) { @@ -239,7 +302,7 @@ describe("MCP host server — client with elicitation", () => { }), ); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { client.setRequestHandler(ElicitRequestSchema, async (request) => { receivedParams = request.params as Record; return { action: "accept" as const, content: {} }; @@ -266,7 +329,7 @@ describe("MCP host server — client with elicitation", () => { }), }); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { const result = await client.callTool({ name: "execute", arguments: { code: "bad" }, @@ -276,8 +339,8 @@ describe("MCP host server — client with elicitation", () => { }); }); - it("resume tool is hidden when client supports elicitation", async () => { - await withClient(makeStubEngine({}), ELICITATION_CAPS, async (client) => { + it("resume tool is hidden in native elicitation mode", async () => { + await withNativeClient(makeStubEngine({}), ELICITATION_CAPS, async (client) => { const { tools } = await client.listTools(); const names = tools.map((t) => t.name); expect(names).toContain("execute"); @@ -287,29 +350,29 @@ describe("MCP host server — client with elicitation", () => { }); // --------------------------------------------------------------------------- -// Client with form-only elicitation (uses managed elicitation) +// Client with form-only elicitation in native mode // --------------------------------------------------------------------------- -describe("MCP host server — client with form-only elicitation", () => { - it("resume tool is hidden when client supports form elicitation", async () => { - await withClient(makeStubEngine({}), FORM_ONLY_CAPS, async (client) => { +describe("MCP host server — native form-only elicitation", () => { + it("resume tool is hidden in native mode", async () => { + await withNativeClient(makeStubEngine({}), FORM_ONLY_CAPS, async (client) => { const { tools } = await client.listTools(); expect(tools.map((t) => t.name)).toContain("execute"); expect(tools.map((t) => t.name)).not.toContain("resume"); }); }); - it("uses managed elicitation path when client supports form", async () => { + it("uses native elicitation path when client supports form", async () => { const engine = makeStubEngine({ - execute: (code) => Effect.succeed({ result: `managed: ${code}` }), + execute: (code) => Effect.succeed({ result: `native: ${code}` }), }); - await withClient(engine, FORM_ONLY_CAPS, async (client) => { + await withNativeClient(engine, FORM_ONLY_CAPS, async (client) => { const result = await client.callTool({ name: "execute", arguments: { code: "test" }, }); - expect(result.content).toEqual([{ type: "text", text: "managed: test" }]); + expect(result.content).toEqual([{ type: "text", text: "native: test" }]); }); }); @@ -323,7 +386,7 @@ describe("MCP host server — client with form-only elicitation", () => { }), ); - await withClient(engine, FORM_ONLY_CAPS, async (client) => { + await withNativeClient(engine, FORM_ONLY_CAPS, async (client) => { client.setRequestHandler(ElicitRequestSchema, async (request) => { receivedMessage = typeof request.params.message === "string" ? request.params.message : undefined; @@ -374,7 +437,92 @@ describe("MCP host server — client without elicitation (pause/resume)", () => }); }); - it("paused execution returns interaction metadata with executionId", async () => { + it("resume tool requires user approval by default", async () => { + let resumeCalled = false; + const engine = makeStubEngine({ + resume: () => + Effect.sync(() => { + resumeCalled = true; + return { status: "completed", result: { result: "should-not-run" } }; + }), + }); + + await withClient( + engine, + NO_CAPS, + async (client) => { + const result = await client.callTool({ + name: "resume", + arguments: { executionId: "exec_1" }, + }); + + expect(resumeCalled).toBe(false); + expect(result.isError).toBeFalsy(); + expect(textOf(result)).toContain("User approval required"); + expect(textOf(result)).toContain("https://executor.test/resume/exec_1"); + expect(result.structuredContent).toMatchObject({ + status: "user_approval_required", + executionId: "exec_1", + approvalUrl: "https://executor.test/resume/exec_1", + }); + }, + { + elicitationMode: { + mode: "browser", + approvalUrl: (executionId) => `https://executor.test/resume/${executionId}`, + }, + }, + ); + }); + + it("browser approval mode consumes a user-approved response and returns the resumed result", async () => { + const approved = new Map }>(); + const engine = makeStubEngine({ + resume: (executionId, response) => + Effect.succeed( + executionId === "exec_1" && response.action === "accept" + ? { status: "completed", result: { result: "resumed-after-browser" } } + : null, + ), + }); + + await withClient( + engine, + NO_CAPS, + async (client) => { + const waiting = await client.callTool({ + name: "resume", + arguments: { executionId: "exec_1" }, + }); + expect(waiting.structuredContent).toMatchObject({ + status: "user_approval_required", + executionId: "exec_1", + }); + + approved.set("exec_1", { action: "accept", content: {} }); + const resumed = await client.callTool({ + name: "resume", + arguments: { executionId: "exec_1" }, + }); + expect(resumed.content).toEqual([{ type: "text", text: "resumed-after-browser" }]); + expect(resumed.structuredContent).toMatchObject({ + status: "completed", + result: "resumed-after-browser", + }); + }, + { + elicitationMode: { + mode: "browser", + approvalUrl: (executionId) => `https://executor.test/resume/${executionId}`, + }, + browserApprovalStore: { + takeResponse: (executionId) => Effect.succeed(approved.get(executionId) ?? null), + }, + }, + ); + }); + + it("model resume mode paused execution returns interaction metadata with executionId", async () => { const engine = makeStubEngine({ executeWithPause: () => Effect.succeed( @@ -391,22 +539,27 @@ describe("MCP host server — client without elicitation (pause/resume)", () => ), }); - await withClient(engine, NO_CAPS, async (client) => { - const result = await client.callTool({ - name: "execute", - arguments: { code: "pause-me" }, - }); - expect(textOf(result)).toContain("exec_42"); - expect(textOf(result)).toContain("Need approval"); - expect(result.isError).toBeFalsy(); - - const structured = result.structuredContent as Record; - expect(structured?.executionId).toBe("exec_42"); - expect(structured?.status).toBe("waiting_for_interaction"); - }); + await withClient( + engine, + NO_CAPS, + async (client) => { + const result = await client.callTool({ + name: "execute", + arguments: { code: "pause-me" }, + }); + expect(textOf(result)).toContain("exec_42"); + expect(textOf(result)).toContain("Need approval"); + expect(result.isError).toBeFalsy(); + + const structured = result.structuredContent as Record; + expect(structured?.executionId).toBe("exec_42"); + expect(structured?.status).toBe("waiting_for_interaction"); + }, + { elicitationMode: { mode: "model" } }, + ); }); - it("resume tool completes a paused execution", async () => { + it("resume tool completes a paused execution when model resume is explicitly enabled", async () => { const engine = makeStubEngine({ resume: (executionId, response) => Effect.succeed( @@ -416,14 +569,19 @@ describe("MCP host server — client without elicitation (pause/resume)", () => ), }); - await withClient(engine, NO_CAPS, async (client) => { - const result = await client.callTool({ - name: "resume", - arguments: { executionId: "exec_1", action: "accept", content: "{}" }, - }); - expect(result.content).toEqual([{ type: "text", text: "resumed-ok" }]); - expect(result.isError).toBeFalsy(); - }); + await withClient( + engine, + NO_CAPS, + async (client) => { + const result = await client.callTool({ + name: "resume", + arguments: { executionId: "exec_1", action: "accept", content: "{}" }, + }); + expect(result.content).toEqual([{ type: "text", text: "resumed-ok" }]); + expect(result.isError).toBeFalsy(); + }, + { elicitationMode: { mode: "model" } }, + ); }); it("resume tool passes parsed content to engine", async () => { @@ -436,17 +594,22 @@ describe("MCP host server — client without elicitation (pause/resume)", () => }), }); - await withClient(engine, NO_CAPS, async (client) => { - await client.callTool({ - name: "resume", - arguments: { - executionId: "exec_1", - action: "accept", - content: JSON.stringify({ approved: true, name: "test" }), - }, - }); - expect(receivedContent).toEqual({ approved: true, name: "test" }); - }); + await withClient( + engine, + NO_CAPS, + async (client) => { + await client.callTool({ + name: "resume", + arguments: { + executionId: "exec_1", + action: "accept", + content: JSON.stringify({ approved: true, name: "test" }), + }, + }); + expect(receivedContent).toEqual({ approved: true, name: "test" }); + }, + { elicitationMode: { mode: "model" } }, + ); }); it("resume with empty content passes undefined", async () => { @@ -459,33 +622,43 @@ describe("MCP host server — client without elicitation (pause/resume)", () => }), }); - await withClient(engine, NO_CAPS, async (client) => { - await client.callTool({ - name: "resume", - arguments: { executionId: "exec_1", action: "accept", content: "{}" }, - }); - expect(receivedContent).toBeUndefined(); - }); + await withClient( + engine, + NO_CAPS, + async (client) => { + await client.callTool({ + name: "resume", + arguments: { executionId: "exec_1", action: "accept", content: "{}" }, + }); + expect(receivedContent).toBeUndefined(); + }, + { elicitationMode: { mode: "model" } }, + ); }); it("resume with unknown executionId returns error", async () => { const engine = makeStubEngine({ resume: () => Effect.succeed(null) }); - await withClient(engine, NO_CAPS, async (client) => { - const result = await client.callTool({ - name: "resume", - arguments: { - executionId: "does-not-exist", - action: "accept", - content: "{}", - }, - }); - expect(result.isError).toBe(true); - expect(textOf(result)).toContain("does-not-exist"); - }); + await withClient( + engine, + NO_CAPS, + async (client) => { + const result = await client.callTool({ + name: "resume", + arguments: { + executionId: "does-not-exist", + action: "accept", + content: "{}", + }, + }); + expect(result.isError).toBe(true); + expect(textOf(result)).toContain("does-not-exist"); + }, + { elicitationMode: { mode: "model" } }, + ); }); - it("paused UrlElicitation includes url and kind in structured output", async () => { + it("model resume mode paused UrlElicitation includes url and kind in structured output", async () => { const engine = makeStubEngine({ executeWithPause: () => Effect.succeed( @@ -500,19 +673,24 @@ describe("MCP host server — client without elicitation (pause/resume)", () => ), }); - await withClient(engine, NO_CAPS, async (client) => { - const result = await client.callTool({ - name: "execute", - arguments: { code: "oauth" }, - }); - expect(textOf(result)).toContain("https://auth.example.com/callback"); - expect(textOf(result)).toContain("exec_99"); - - const structured = result.structuredContent as Record; - const interaction = structured?.interaction as Record; - expect(interaction?.kind).toBe("url"); - expect(interaction?.url).toBe("https://auth.example.com/callback"); - }); + await withClient( + engine, + NO_CAPS, + async (client) => { + const result = await client.callTool({ + name: "execute", + arguments: { code: "oauth" }, + }); + expect(textOf(result)).toContain("https://auth.example.com/callback"); + expect(textOf(result)).toContain("exec_99"); + + const structured = result.structuredContent as Record; + const interaction = structured?.interaction as Record; + expect(interaction?.kind).toBe("url"); + expect(interaction?.url).toBe("https://auth.example.com/callback"); + }, + { elicitationMode: { mode: "model" } }, + ); }); }); @@ -533,7 +711,7 @@ describe("MCP host server — elicitation error handling", () => { (r) => `fallback:${r.action}`, ); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { client.setRequestHandler(ElicitRequestSchema, async () => { // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: MCP client request handler rejects to exercise server fallback throw new Error("client cannot handle this"); @@ -568,30 +746,40 @@ describe("MCP host server — resume content parsing", () => { it("array JSON is rejected (not passed as content)", async () => { const { engine, getContent } = makeResumeEngine(); - await withClient(engine, NO_CAPS, async (client) => { - await client.callTool({ - name: "resume", - arguments: { executionId: "exec_1", action: "accept", content: "[1,2,3]" }, - }); - expect(getContent()).toBeUndefined(); - }); + await withClient( + engine, + NO_CAPS, + async (client) => { + await client.callTool({ + name: "resume", + arguments: { executionId: "exec_1", action: "accept", content: "[1,2,3]" }, + }); + expect(getContent()).toBeUndefined(); + }, + { elicitationMode: { mode: "model" } }, + ); }); it("invalid JSON is handled gracefully (not thrown)", async () => { const { engine, getContent } = makeResumeEngine(); - await withClient(engine, NO_CAPS, async (client) => { - const result = await client.callTool({ - name: "resume", - arguments: { - executionId: "exec_1", - action: "accept", - content: "not-valid-json", - }, - }); - expect(getContent()).toBeUndefined(); - expect(result.isError).toBeFalsy(); - }); + await withClient( + engine, + NO_CAPS, + async (client) => { + const result = await client.callTool({ + name: "resume", + arguments: { + executionId: "exec_1", + action: "accept", + content: "not-valid-json", + }, + }); + expect(getContent()).toBeUndefined(); + expect(result.isError).toBeFalsy(); + }, + { elicitationMode: { mode: "model" } }, + ); }); }); @@ -634,7 +822,7 @@ describe("MCP host server — multiple elicitations", () => { }), }); - await withClient(engine, ELICITATION_CAPS, async (client) => { + await withNativeClient(engine, ELICITATION_CAPS, async (client) => { let callCount = 0; client.setRequestHandler(ElicitRequestSchema, async () => { callCount++; diff --git a/packages/hosts/mcp/src/server.ts b/packages/hosts/mcp/src/server.ts index 5b6342e8d..7f69add75 100644 --- a/packages/hosts/mcp/src/server.ts +++ b/packages/hosts/mcp/src/server.ts @@ -7,7 +7,7 @@ import type { JsonSchemaValidator, } from "@modelcontextprotocol/sdk/validation/types.js"; import { Validator } from "@cfworker/json-schema"; -import { z } from "zod/v4"; +import * as z from "zod/v4"; import type { ElicitationResponse, @@ -22,6 +22,7 @@ import { formatPausedExecution, type ExecutionEngine, type ExecutionEngineConfig, + type ResumeResponse, } from "@executor-js/execution"; // --------------------------------------------------------------------------- @@ -70,6 +71,23 @@ type SharedMcpServerConfig = { * Enable verbose MCP capability / elicitation debug logging. */ readonly debug?: boolean; + /** + * Controls how elicitation is handled for this MCP connection. The secure + * default is browser approval: `execute` pauses, and `resume` returns a URL + * for the signed-in user instead of letting the model answer directly. + */ + readonly elicitationMode?: + | { + readonly mode: "browser"; + readonly approvalUrl: (executionId: string) => string; + } + | { + readonly mode: "model"; + } + | { + readonly mode: "native"; + }; + readonly browserApprovalStore?: BrowserApprovalStore; }; export type ExecutorMcpServerConfig = @@ -78,6 +96,10 @@ export type ExecutorMcpServerConfig & SharedMcpServerConfig & { readonly stateless: true }) | ({ readonly engine: ExecutionEngine; readonly stateless: true } & SharedMcpServerConfig); +export type BrowserApprovalStore = { + readonly takeResponse: (executionId: string) => Effect.Effect; +}; + // --------------------------------------------------------------------------- // Elicitation bridge // --------------------------------------------------------------------------- @@ -95,13 +117,9 @@ const readDebugDefault = (): boolean => { return value === "1" || value === "true"; }; -const supportsManagedElicitation = (server: McpServer): boolean => - getElicitationSupport(server).form; - const capabilitySnapshot = (server: McpServer) => ({ clientCapabilities: server.server.getClientCapabilities() ?? null, elicitationSupport: getElicitationSupport(server), - managedElicitation: supportsManagedElicitation(server), }); type ElicitInputParams = @@ -270,6 +288,33 @@ const newCorrelationId = (): string => .toString(16) .padStart(8, "0"); +const defaultResumeApprovalUrl = (executionId: string): string => + `/resume/${encodeURIComponent(executionId)}`; + +const formatResumeApprovalRequired = (input: { + readonly executionId: string; + readonly approvalUrl: string; +}): McpToolResult => ({ + content: [ + { + type: "text", + text: [ + "User approval required.", + "", + "Tell the user to open this URL while signed in and approve or decline the paused interaction:", + input.approvalUrl, + "", + "Do not choose accept, decline, cancel, or response content on the user's behalf.", + ].join("\n"), + }, + ], + structuredContent: { + status: "user_approval_required", + executionId: input.executionId, + approvalUrl: input.approvalUrl, + }, +}); + const toMcpFailureResult = (cause: Cause.Cause): McpToolResult => { const correlationId = newCorrelationId(); // oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: best-effort defect logging must tolerate non-serializable causes @@ -325,6 +370,12 @@ export const createExecutorMcpServer = ( console.error(`[executor:mcp] ${event}`, data); } }; + const elicitationMode = + config.elicitationMode ?? + ({ + mode: "browser", + approvalUrl: defaultResumeApprovalUrl, + } as const); const resolveParentSpan = (): Tracer.AnySpan | undefined => { const ps = config.parentSpan; @@ -355,12 +406,12 @@ export const createExecutorMcpServer = ( const executeCode = (code: string): Effect.Effect => Effect.gen(function* () { debugLog("execute.call", { - managedElicitation: supportsManagedElicitation(server), + elicitationMode: elicitationMode.mode, elicitationSupport: getElicitationSupport(server), clientCapabilities: server.server.getClientCapabilities() ?? null, codeLength: code.length, }); - if (supportsManagedElicitation(server)) { + if (elicitationMode.mode === "native") { const result = yield* engine.execute(code, { onElicitation: makeMcpElicitationHandler(server, debugLog), }); @@ -377,7 +428,9 @@ export const createExecutorMcpServer = ( }); return outcome.status === "completed" ? toMcpResult(formatExecuteResult(outcome.result)) - : toMcpPausedResult(formatPausedExecution(outcome.execution)); + : elicitationMode.mode === "browser" + ? yield* requireUserResumeApproval(outcome.execution.id) + : toMcpPausedResult(formatPausedExecution(outcome.execution)); }).pipe( Effect.withSpan("mcp.host.tool.execute", { attributes: { @@ -429,9 +482,60 @@ export const createExecutorMcpServer = ( }), ); + const requireUserResumeApproval = (executionId: string): Effect.Effect => + Effect.sync(() => { + const approvalUrl = + elicitationMode.mode === "browser" + ? elicitationMode.approvalUrl(executionId) + : defaultResumeApprovalUrl(executionId); + debugLog("resume.user_approval_required", { + executionId, + approvalUrl, + clientCapabilities: server.server.getClientCapabilities() ?? null, + }); + return formatResumeApprovalRequired({ executionId, approvalUrl }); + }).pipe( + Effect.withSpan("mcp.host.tool.resume.user_approval_required", { + attributes: { + "mcp.tool.name": "resume", + "mcp.execute.execution_id": executionId, + }, + }), + ); + + const takeBrowserApprovalResponse = ( + executionId: string, + ): Effect.Effect => { + return config.browserApprovalStore?.takeResponse(executionId) ?? Effect.succeed(null); + }; + + const resumeAfterBrowserApproval = (executionId: string): Effect.Effect => + Effect.gen(function* () { + const response = yield* takeBrowserApprovalResponse(executionId); + if (!response) return yield* requireUserResumeApproval(executionId); + + const outcome = yield* engine.resume(executionId, response); + if (!outcome) { + return { + content: [{ type: "text" as const, text: `No paused execution: ${executionId}` }], + isError: true, + } satisfies McpToolResult; + } + return outcome.status === "completed" + ? toMcpResult(formatExecuteResult(outcome.result)) + : yield* requireUserResumeApproval(outcome.execution.id); + }).pipe( + Effect.withSpan("mcp.host.tool.resume.browser_approval", { + attributes: { + "mcp.tool.name": "resume", + "mcp.execute.execution_id": executionId, + }, + }), + ); + // --- tools --- - const executeTool = yield* Effect.sync(() => + yield* Effect.sync(() => server.registerTool( "execute", { @@ -446,61 +550,70 @@ export const createExecutorMcpServer = ( }), ); - const resumeTool = yield* Effect.sync(() => - server.registerTool( + yield* Effect.sync(() => { + if (elicitationMode.mode === "native") { + return undefined; + } + + if (elicitationMode.mode === "model") { + return server.registerTool( + "resume", + { + description: [ + "Resume a paused execution using the executionId returned by execute.", + "This connection explicitly allows model-side resume via elicitation_mode=model.", + ].join("\n"), + inputSchema: { + executionId: z.string().describe("The execution ID from the paused result"), + action: z + .enum(["accept", "decline", "cancel"]) + .describe("How to respond to the interaction"), + content: z + .string() + .describe("Optional JSON-encoded response content for form elicitations") + .default("{}"), + }, + }, + ({ executionId, action, content: rawContent }) => + runToolEffect(resumeExecution(executionId, action, parseJsonContent(rawContent))), + ); + } + + return server.registerTool( "resume", { description: [ - "Resume a paused execution using the executionId returned by execute.", - "Never call this without user approval unless they explicitly state otherwise.", + "Request user approval to resume a paused execution.", + "Call this with the executionId returned by execute. If the user has not approved in the browser yet, tell them to open the returned approval URL. If they have approved, this returns the resumed execution result.", + "This connection does not allow the model to choose accept, decline, cancel, or content.", ].join("\n"), inputSchema: { executionId: z.string().describe("The execution ID from the paused result"), - action: z - .enum(["accept", "decline", "cancel"]) - .describe("How to respond to the interaction"), - content: z - .string() - .describe("Optional JSON-encoded response content for form elicitations") - .default("{}"), }, }, - ({ executionId, action, content: rawContent }) => - runToolEffect(resumeExecution(executionId, action, parseJsonContent(rawContent))), - ), - ).pipe( + ({ executionId }) => runToolEffect(resumeAfterBrowserApproval(executionId)), + ); + }).pipe( Effect.withSpan("mcp.host.register_tool", { attributes: { "mcp.tool.name": "resume" }, }), ); - // --- capability-based tool visibility --- - - const syncToolAvailability = () => { - executeTool.enable(); - if (supportsManagedElicitation(server)) { - resumeTool.disable(); - } else { - resumeTool.enable(); - } + yield* Effect.sync(() => { console.error( - "[executor] MCP capability snapshot", + "[executor] MCP session mode", JSON.stringify({ ...capabilitySnapshot(server), - resumeEnabled: !supportsManagedElicitation(server), + elicitationMode: elicitationMode.mode, + resumeEnabled: elicitationMode.mode !== "native", }), ); debugLog("tool.visibility", { clientCapabilities: server.server.getClientCapabilities() ?? null, elicitationSupport: getElicitationSupport(server), - managedElicitation: supportsManagedElicitation(server), - resumeEnabled: !supportsManagedElicitation(server), + elicitationMode: elicitationMode.mode, + resumeEnabled: elicitationMode.mode !== "native", }); - }; - - yield* Effect.sync(() => { - syncToolAvailability(); - server.server.oninitialized = syncToolAvailability; }).pipe(Effect.withSpan("mcp.host.sync_tool_availability")); return server; diff --git a/packages/react/src/api/atoms.tsx b/packages/react/src/api/atoms.tsx index cdac4b18d..352226055 100644 --- a/packages/react/src/api/atoms.tsx +++ b/packages/react/src/api/atoms.tsx @@ -112,6 +112,12 @@ export const policiesAtom = (scopeId: ScopeId) => reactivityKeys: [ReactivityKey.policies], }); +export const pausedExecutionAtom = (executionId: string) => + ExecutorApiClient.query("executions", "getPaused", { + params: { executionId }, + timeToLive: "5 seconds", + }); + // --------------------------------------------------------------------------- // Mutation atoms — reactivityKeys must be passed at call site (effect-atom // does not accept them at definition time). See `reactivity-keys.tsx` for the @@ -152,6 +158,8 @@ export const updatePolicy = ExecutorApiClient.mutation("policies", "update"); export const removePolicy = ExecutorApiClient.mutation("policies", "remove"); +export const resumeExecution = ExecutorApiClient.mutation("executions", "resume"); + // --------------------------------------------------------------------------- // Sources — optimistic surface. // --------------------------------------------------------------------------- diff --git a/packages/react/src/components/elicitation-approval.tsx b/packages/react/src/components/elicitation-approval.tsx new file mode 100644 index 000000000..8459f8782 --- /dev/null +++ b/packages/react/src/components/elicitation-approval.tsx @@ -0,0 +1,480 @@ +import { useEffect, useMemo, useState, type ReactNode } from "react"; +import { Match, Option } from "effect"; + +import { Checkbox } from "./checkbox"; +import { Input } from "./input"; +import { Label } from "./label"; +import { NativeSelect, NativeSelectOption } from "./native-select"; +import { Textarea } from "./textarea"; + +export type ElicitationAction = "accept" | "decline" | "cancel"; +export type ElicitationFieldValue = string | number | boolean | string[]; + +type ElicitationSchemaField = { + readonly name: string; + readonly schema: Record; + readonly required: boolean; +}; + +type ElicitationFormSchema = { + readonly fields: readonly ElicitationSchemaField[]; +}; + +type SelectOption = { + readonly value: string; + readonly label: string; +}; + +export type ElicitationApprovalState = { + readonly hasFields: boolean; + readonly content: () => Record | null; + readonly fields: ReactNode; +}; + +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null && !Array.isArray(value); + +const isStringArray = (value: unknown): value is string[] => + Array.isArray(value) && value.every((item) => typeof item === "string"); + +const toOptionalString = (value: unknown): string | undefined => + typeof value === "string" && value.length > 0 ? value : undefined; + +const fieldLabel = (field: ElicitationSchemaField): string => + toOptionalString(field.schema.title) ?? field.name; + +const fieldDescription = (field: ElicitationSchemaField): string | undefined => + toOptionalString(field.schema.description); + +const enumOptions = (schema: Record): readonly SelectOption[] => { + const oneOf = schema.oneOf; + if (Array.isArray(oneOf)) { + return oneOf.flatMap((item): SelectOption[] => { + if (!isRecord(item) || typeof item.const !== "string") return []; + return [{ value: item.const, label: toOptionalString(item.title) ?? item.const }]; + }); + } + + const values = schema.enum; + if (!isStringArray(values)) return []; + const labels = isStringArray(schema.enumNames) ? schema.enumNames : values; + return values.map((value, index) => ({ value, label: labels[index] ?? value })); +}; + +const multiSelectOptions = (schema: Record): readonly SelectOption[] => { + const items = schema.items; + if (!isRecord(items)) return []; + + const anyOf = items.anyOf; + if (Array.isArray(anyOf)) { + return anyOf.flatMap((item): SelectOption[] => { + if (!isRecord(item) || typeof item.const !== "string") return []; + return [{ value: item.const, label: toOptionalString(item.title) ?? item.const }]; + }); + } + + const values = items.enum; + if (!isStringArray(values)) return []; + const labels = isStringArray(items.enumNames) ? items.enumNames : values; + return values.map((value, index) => ({ value, label: labels[index] ?? value })); +}; + +const parseElicitationFormSchema = (value: unknown): ElicitationFormSchema | null => { + if (!isRecord(value) || !isRecord(value.properties)) return null; + const required = isStringArray(value.required) ? value.required : []; + const fields = Object.entries(value.properties).flatMap( + ([name, schema]): ElicitationSchemaField[] => + isRecord(schema) ? [{ name, schema, required: required.includes(name) }] : [], + ); + return fields.length > 0 ? { fields } : null; +}; + +const initialFieldValue = (field: ElicitationSchemaField): ElicitationFieldValue | undefined => { + const value = field.schema.default; + if (value === undefined) return undefined; + const matched = Match.value(field.schema.type).pipe( + Match.when("boolean", () => value === true), + Match.when( + (type: unknown) => type === "number" || type === "integer", + () => { + const numberValue = typeof value === "number" ? value : Number(value); + return Number.isFinite(numberValue) ? numberValue : undefined; + }, + ), + Match.when("array", () => (isStringArray(value) ? value : undefined)), + Match.option, + ); + return Option.getOrElse(matched, () => (typeof value === "string" ? value : String(value))); +}; + +const initialFormValues = ( + formSchema: ElicitationFormSchema | null, +): Record => { + const values: Record = {}; + for (const field of formSchema?.fields ?? []) { + const value = initialFieldValue(field); + if (value !== undefined) values[field.name] = value; + } + return values; +}; + +const isEmptyFormValue = (value: ElicitationFieldValue | undefined): boolean => + value === undefined || value === "" || (Array.isArray(value) && value.length === 0); + +const numericConstraint = ( + schema: Record, + key: "minimum" | "maximum", +): number | undefined => (typeof schema[key] === "number" ? schema[key] : undefined); + +const lengthConstraint = ( + schema: Record, + key: "minLength" | "maxLength" | "minItems" | "maxItems", +): number | undefined => (typeof schema[key] === "number" ? schema[key] : undefined); + +const validateEmail = (value: string): boolean => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value); + +const validateUrl = (value: string): boolean => { + try { + void new URL(value); + return true; + } catch { + return false; + } +}; + +const validateFieldValue = ( + field: ElicitationSchemaField, + value: ElicitationFieldValue | undefined, +): { readonly value?: ElicitationFieldValue; readonly error?: string } => { + if (isEmptyFormValue(value)) { + return field.required ? { error: "This field is required." } : {}; + } + + if (field.schema.type === "boolean") { + return typeof value === "boolean" ? { value } : { error: "Choose true or false." }; + } + + if (field.schema.type === "number" || field.schema.type === "integer") { + const numberValue = typeof value === "number" ? value : Number(value); + const typeLabel = field.schema.type === "integer" ? "an integer" : "a number"; + if (!Number.isFinite(numberValue)) return { error: `Must be ${typeLabel}.` }; + if (field.schema.type === "integer" && !Number.isInteger(numberValue)) { + return { error: "Must be an integer." }; + } + const minimum = numericConstraint(field.schema, "minimum"); + const maximum = numericConstraint(field.schema, "maximum"); + if (minimum !== undefined && numberValue < minimum) return { error: `Must be >= ${minimum}.` }; + if (maximum !== undefined && numberValue > maximum) return { error: `Must be <= ${maximum}.` }; + return { value: numberValue }; + } + + if (field.schema.type === "array") { + const selected = Array.isArray(value) ? value : []; + const options = multiSelectOptions(field.schema); + const allowed = new Set(options.map((option) => option.value)); + if (!selected.every((item) => allowed.has(item))) return { error: "Choose a valid option." }; + const minItems = lengthConstraint(field.schema, "minItems"); + const maxItems = lengthConstraint(field.schema, "maxItems"); + if (minItems !== undefined && selected.length < minItems) { + return { error: `Choose at least ${minItems} option${minItems === 1 ? "" : "s"}.` }; + } + if (maxItems !== undefined && selected.length > maxItems) { + return { error: `Choose at most ${maxItems} option${maxItems === 1 ? "" : "s"}.` }; + } + return { value: selected }; + } + + const stringValue = String(value); + const options = enumOptions(field.schema); + if (options.length > 0 && !options.some((option) => option.value === stringValue)) { + return { error: "Choose a valid option." }; + } + const minLength = lengthConstraint(field.schema, "minLength"); + const maxLength = lengthConstraint(field.schema, "maxLength"); + if (minLength !== undefined && stringValue.length < minLength) { + return { error: `Must be at least ${minLength} character${minLength === 1 ? "" : "s"}.` }; + } + if (maxLength !== undefined && stringValue.length > maxLength) { + return { error: `Must be at most ${maxLength} character${maxLength === 1 ? "" : "s"}.` }; + } + if (field.schema.format === "email" && !validateEmail(stringValue)) { + return { error: "Must be a valid email address." }; + } + if (field.schema.format === "uri" && !validateUrl(stringValue)) { + return { error: "Must be a valid URL." }; + } + return { value: stringValue }; +}; + +const buildElicitationContent = ( + formSchema: ElicitationFormSchema | null, + formValues: Record, +): { readonly content: Record; readonly errors: Record } => { + if (!formSchema) return { content: {}, errors: {} }; + + const content: Record = {}; + const errors: Record = {}; + for (const field of formSchema.fields) { + const result = validateFieldValue(field, formValues[field.name]); + if (result.error) { + errors[field.name] = result.error; + continue; + } + if (result.value !== undefined) content[field.name] = result.value; + } + return { content, errors }; +}; + +export function useElicitationApproval(schema: unknown): ElicitationApprovalState { + const formSchema = useMemo(() => parseElicitationFormSchema(schema), [schema]); + const [formValues, setFormValues] = useState>(() => + initialFormValues(formSchema), + ); + const [fieldErrors, setFieldErrors] = useState>({}); + + useEffect(() => { + setFormValues(initialFormValues(formSchema)); + setFieldErrors({}); + }, [formSchema]); + + const setFieldValue = (name: string, value: ElicitationFieldValue | undefined) => { + setFormValues((prev) => { + const next = { ...prev }; + if (value === undefined) { + delete next[name]; + } else { + next[name] = value; + } + return next; + }); + setFieldErrors((prev) => { + if (!prev[name]) return prev; + const next = { ...prev }; + delete next[name]; + return next; + }); + }; + + return { + hasFields: Boolean(formSchema), + content: () => { + const result = buildElicitationContent(formSchema, formValues); + setFieldErrors(result.errors); + return Object.keys(result.errors).length > 0 ? null : result.content; + }, + fields: formSchema ? ( + + ) : null, + }; +} + +function ElicitationApprovalFields({ + formSchema, + formValues, + fieldErrors, + onChange, +}: { + formSchema: ElicitationFormSchema; + formValues: Record; + fieldErrors: Record; + onChange: (name: string, value: ElicitationFieldValue | undefined) => void; +}) { + return ( +
+
+
Additional details
+
+ These values will be sent with approval. +
+
+ {formSchema.fields.map((field) => ( + onChange(field.name, value)} + /> + ))} +
+ ); +} + +function ElicitationApprovalField({ + field, + value, + error, + onChange, +}: { + field: ElicitationSchemaField; + value: ElicitationFieldValue | undefined; + error: string | undefined; + onChange: (value: ElicitationFieldValue | undefined) => void; +}) { + const label = fieldLabel(field); + const description = fieldDescription(field); + const fieldId = `trusted-interaction-field-${field.name}`; + const describedBy = description ? `${fieldId}-description` : undefined; + const errorId = error ? `${fieldId}-error` : undefined; + const ariaDescribedBy = [describedBy, errorId].filter(Boolean).join(" ") || undefined; + + return ( +
+ + + {description && ( +
+ {description} +
+ )} + {error && ( +
+ {error} +
+ )} +
+ ); +} + +function ElicitationApprovalFieldControl({ + field, + fieldId, + value, + ariaDescribedBy, + invalid, + onChange, +}: { + field: ElicitationSchemaField; + fieldId: string; + value: ElicitationFieldValue | undefined; + ariaDescribedBy: string | undefined; + invalid: boolean; + onChange: (value: ElicitationFieldValue | undefined) => void; +}) { + const options = enumOptions(field.schema); + if (options.length > 0) { + return ( + onChange(event.target.value || undefined)} + aria-describedby={ariaDescribedBy} + aria-invalid={invalid} + className="w-full" + > + Select... + {options.map((option) => ( + + {option.label} + + ))} + + ); + } + + const multiOptions = multiSelectOptions(field.schema); + if (multiOptions.length > 0) { + const selected = Array.isArray(value) ? value : []; + return ( +
+ {multiOptions.map((option) => { + const checked = selected.includes(option.value); + return ( + + ); + })} +
+ ); + } + + if (field.schema.type === "boolean") { + return ( + + ); + } + + if (field.schema.type === "number" || field.schema.type === "integer") { + return ( + onChange(event.target.value)} + aria-describedby={ariaDescribedBy} + aria-invalid={invalid} + /> + ); + } + + const isLongText = typeof field.schema.maxLength === "number" && field.schema.maxLength > 160; + if (field.schema.type === "string" && isLongText && field.schema.format === undefined) { + return ( +