Skip to content

Commit 4ebe1f8

Browse files
akshaykapre-commit-ci[bot]
authored andcommitted
fix: isolation when running multiple notebooks in an app server (marimo-team#8611)
**Summary.** This PR introduces process-level isolation when serving multiple apps from the same server (`marimo run directory/`, `create_asgi_app()`); clients of any given app are still run in threads in the app's process for efficiency. This fixes a critical bug in which different apps shared the same Python globals, leading to undefined behavior such as collisions in `sys.modules. It does make multi-app servers consume slightly more RAM. Process isolation also applies to multi-app servers started with `--sandbox`. (This PR also fixes a bug in which `--sandbox` run servers silently dropped stdout/stderr.) **Dependency on pyzmq.** The proposed implementation also adds a dependency on `pyzmq` for multi-app servers, to pave the way for allowing sandboxed multi-app servers . It would be possible to design a different solution that used multiprocessing instead of `pyzmq`, at the cost of not supporting package sandboxes. It is perhaps worth discussing whether we are okay with making pyzmq a required dependency of marimo. **Feature flag.** App isolation is feature-flagged, currently opt-in. **Context.** When marimo was originally designed, `marimo run` only ever served a single notebook. A single process could safely serve multiple clients of the same notebook since they all share the same code. When multiple app serving was introduced, we continued serving all clients from a single process, even though the clients were potentially running different programs. When two notebooks both `import utils` but expect different implementations from different directories, whichever app loads first wins, and the second app silently gets the wrong module. Similar problems may exist for other Python globals. **This PR.** This PR fixes the problem by running each app in its own OS process. Multiple clients of the *same* app still share a process (as kernel threads), which allows for fast and cheap sessions. The isolation boundary is per-app, not per-client. ``` Before (shared process — sys.modules collisions): ┌──────────────── Main Process ─────────────────┐ │ │ │ Kernel(app1, client A) ← all kernels │ │ Kernel(app1, client B) share one │ │ Kernel(app2, client C) sys.modules │ │ Kernel(app2, client D) │ └───────────────────────────────────────────────┘ After (per-app process isolation): ┌──────────────── Main Process ─────────────────┐ │ (HTTP, WebSocket, routing) │ └──────────────────┬──────────────┬──────────────┘ │ ZMQ │ ZMQ ┌─────────────▼──┐ ┌──────▼──────────────┐ │ App Process │ │ App Process │ │ (app1.py) │ │ (app2.py) │ │ │ │ │ │ Kernel: cl. A │ │ Kernel: cl. C │ │ Kernel: cl. B │ │ Kernel: cl. D │ └─────────────────┘ └─────────────────────┘ isolated sys.modules isolated sys.modules ``` **IPC.** Each app process communicates with the main process over 4 shared ZeroMQ sockets (not per-client). Kernel commands and stream output are multiplexed over these shared channels using session ID tagging: ``` IPC channels (4 shared ZMQ sockets per app process): Main Process App Subprocess ──────────── ────────────── mgmt [PUSH] ─────────────────────▶ [PULL] mgmt loop response [PULL] ◀───────────────────── [PUSH] (create/stop kernel) cmd [PUSH] ──[sid, channel, msg]─▶ [PULL] dispatcher ──▶ kernel queues stream [PULL] ◀──[sid, msg]───────── [PUSH] collector ◀── kernel output ``` ``` Per session (main-process side): AppProcessQueueManager control_queue ──┐ ui_element_queue ──┤──▶ cmd socket (tagged with session_id) completion_queue ──┤ input_queue ──┘ stream_queue ◀──── stream receiver thread (regular Queue) ``` **When this path is activated.** - `create_asgi_app()`: always enables process isolation (the whole point is multi-app) - `marimo run app1.py app2.py` / directory serving: auto-enables when multiple files detected - `marimo run app.py` (single file): no change, uses existing thread-based kernels - `marimo edit`: no change, uses existing process-based kernels --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 9e7f318 commit 4ebe1f8

31 files changed

Lines changed: 2930 additions & 28 deletions

File tree

marimo/_cli/cli.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,14 @@ def run(
11891189
"pyzmq is required when running a gallery with --sandbox.",
11901190
"marimo[sandbox]",
11911191
)
1192+
elif is_multi:
1193+
from marimo._dependencies.dependencies import DependencyManager
1194+
1195+
if not DependencyManager.zmq.has():
1196+
raise MarimoCLIMissingDependencyError(
1197+
"pyzmq is required for running multiple notebooks.",
1198+
"pyzmq",
1199+
)
11921200

11931201
file_router = _create_run_file_router(validated_paths, watch=watch)
11941202

marimo/_config/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,7 @@ class ExperimentalConfig(TypedDict, total=False):
568568
markdown: bool # Used in playground (community cloud)
569569
wasm_layouts: bool # Used in playground (community cloud)
570570
rtc_v2: bool
571+
isolate_apps: bool
571572

572573
# Internal features
573574
cache: CacheConfig

marimo/_ipc/connection.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,5 +194,7 @@ def close(self) -> None:
194194
if self._receiver_thread.is_alive():
195195
self._receiver_thread.join(timeout=1)
196196

197-
# Close all associated sockets (and finally terminate)
198-
self.context.destroy()
197+
# Close all sockets and terminate the context.
198+
# linger=0 drops any unsent messages immediately, preventing
199+
# hangs during shutdown when the remote end is already gone.
200+
self.context.destroy(linger=0)

marimo/_server/asgi.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,12 @@ async def root():
462462
else:
463463
auth_token = AuthToken(token)
464464

465+
from marimo._dependencies.dependencies import DependencyManager
466+
467+
DependencyManager.zmq.require(
468+
"for running multiple notebooks with create_asgi_app()"
469+
)
470+
465471
# We call the entrypoint `root` instead of `filename` incase we want to
466472
# support directories or code in the future
467473
class Builder(ASGIAppBuilder):
@@ -521,6 +527,9 @@ def _create_app_for_file(base_url: str, file_path: str) -> ASGIApp:
521527
auth_token=auth_token,
522528
redirect_console_to_browser=redirect_console_to_browser,
523529
ttl_seconds=session_ttl,
530+
isolate_apps=config_reader.experimental.get(
531+
"isolate_apps", False
532+
),
524533
)
525534
enable_auth = not AuthToken.is_empty(auth_token)
526535
app = create_starlette_app(

marimo/_server/session_manager.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from marimo._server.session.listeners import RecentsTrackerListener
3333
from marimo._server.token_manager import TokenManager
3434
from marimo._server.tokens import AuthToken, SkewProtectionToken
35+
from marimo._session.app_host import AppHostContext, AppHostPool
3536
from marimo._session.consumer import SessionConsumer
3637
from marimo._session.events import SessionEventBus
3738
from marimo._session.extensions.types import SessionExtension
@@ -83,6 +84,7 @@ def __init__(
8384
ttl_seconds: Optional[int],
8485
watch: bool = False,
8586
sandbox_mode: SandboxMode | None = None,
87+
isolate_apps: bool = False,
8688
) -> None:
8789
# Core configuration
8890
self.file_router = file_router
@@ -97,6 +99,15 @@ def __init__(
9799
self._config_manager = config_manager
98100
self.sandbox_mode = sandbox_mode
99101

102+
# When running multiple apps, each app runs in an isolated host
103+
# process, to avoid collisions in sys.modules and other Python global
104+
# structures. These processes are managed by an AppHostPool.
105+
self._app_host_pool: AppHostPool | None = None
106+
if isolate_apps and mode == SessionMode.RUN:
107+
self._app_host_pool = AppHostPool(
108+
sandbox=sandbox_mode is SandboxMode.MULTI,
109+
)
110+
100111
self._repository = SessionRepository()
101112

102113
def _get_code() -> str:
@@ -222,6 +233,11 @@ def create_session(
222233
auto_instantiate=auto_instantiate,
223234
extensions=extensions,
224235
sandbox_mode=self.sandbox_mode,
236+
app_host_context=AppHostContext(
237+
pool=self._app_host_pool, session_id=session_id
238+
)
239+
if self._app_host_pool
240+
else None,
225241
)
226242

227243
# Add to repository
@@ -393,6 +409,8 @@ def shutdown(self) -> None:
393409
"""Shutdown the session manager and stop all file watchers."""
394410
LOGGER.debug("Shutting down")
395411
self.close_all_sessions()
412+
if self._app_host_pool is not None:
413+
self._app_host_pool.shutdown()
396414
self.lsp_server.stop()
397415
self._watcher_manager.stop_all()
398416

marimo/_server/start.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,11 @@ def start(
248248
}
249249
)
250250

251+
is_multi = file_router.get_unique_file_key() is None
252+
isolate_apps = is_multi and config_reader.experimental.get(
253+
"isolate_apps", False
254+
)
255+
251256
session_manager = SessionManager(
252257
file_router=file_router,
253258
mode=mode,
@@ -262,6 +267,7 @@ def start(
262267
redirect_console_to_browser=redirect_console_to_browser,
263268
watch=watch,
264269
sandbox_mode=sandbox_mode,
270+
isolate_apps=isolate_apps,
265271
)
266272

267273
log_level = "info" if development_mode else "error"
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Copyright 2026 Marimo. All rights reserved.
2+
"""App isolation for serving multiple notebooks.
3+
4+
Each notebook is hosted in an AppHost, which isolates the notebook from other
5+
running notebooks. Sessions for the same notebook are routed to a single
6+
AppHost.
7+
8+
AppHosts are created and managed by an AppHostPool.
9+
"""
10+
11+
from marimo._session.app_host.host import AppHost
12+
from marimo._session.app_host.pool import AppHostContext, AppHostPool
13+
14+
__all__ = [
15+
"AppHost",
16+
"AppHostContext",
17+
"AppHostPool",
18+
]
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Copyright 2026 Marimo. All rights reserved.
2+
"""Commands and responses."""
3+
4+
from __future__ import annotations
5+
6+
import enum
7+
import typing
8+
9+
import msgspec
10+
import msgspec.json
11+
12+
from marimo._ast.cell import CellConfig
13+
from marimo._config.config import MarimoConfig
14+
from marimo._runtime.commands import AppMetadata
15+
from marimo._types.ids import CellId_t
16+
17+
18+
# ---------------- Management commands ---------------------
19+
class CreateKernelCmd(msgspec.Struct, tag=True):
20+
"""Request the app host to create a new kernel thread."""
21+
22+
session_id: str
23+
configs: dict[CellId_t, CellConfig]
24+
app_metadata: AppMetadata
25+
user_config: MarimoConfig
26+
virtual_files_supported: bool
27+
redirect_console_to_browser: bool
28+
log_level: int
29+
30+
31+
class StopKernelCmd(msgspec.Struct, tag=True):
32+
"""Request the app host to stop a kernel thread."""
33+
34+
session_id: str
35+
36+
37+
class ShutdownAppHostCmd(msgspec.Struct, tag=True):
38+
"""Request the app host to shut down entirely."""
39+
40+
41+
# ---------------- Management responses ---------------------
42+
class AppHostReadyResponse(msgspec.Struct, tag=True):
43+
"""Signals that the app host has started and is ready."""
44+
45+
46+
class KernelCreatedResponse(msgspec.Struct, tag=True):
47+
"""Confirms a kernel was created (or reports failure)."""
48+
49+
session_id: str
50+
success: bool
51+
error: str | None = None
52+
53+
54+
# ---------------- Management encoders and decoders ---------------------
55+
MgmtCommand = typing.Union[CreateKernelCmd, StopKernelCmd, ShutdownAppHostCmd]
56+
MgmtResponse = typing.Union[AppHostReadyResponse, KernelCreatedResponse]
57+
58+
_cmd_decoder = msgspec.json.Decoder(MgmtCommand)
59+
_resp_decoder = msgspec.json.Decoder(MgmtResponse)
60+
61+
62+
def encode_mgmt_command(cmd: MgmtCommand) -> bytes:
63+
return msgspec.json.encode(cmd)
64+
65+
66+
def decode_mgmt_command(data: bytes) -> MgmtCommand:
67+
result: MgmtCommand = _cmd_decoder.decode(data)
68+
return result
69+
70+
71+
def encode_mgmt_response(resp: MgmtResponse) -> bytes:
72+
return msgspec.json.encode(resp)
73+
74+
75+
def decode_mgmt_response(data: bytes) -> MgmtResponse:
76+
result: MgmtResponse = _resp_decoder.decode(data)
77+
return result
78+
79+
80+
# ---------------- AppHost initialization ----------------
81+
class AppHostArgs(msgspec.Struct):
82+
"""Args sent to the AppHost process."""
83+
84+
# ZMQ PULL address for receiving management commands
85+
mgmt_addr: str
86+
# ZMQ PUSH address for sending management responses
87+
response_addr: str
88+
# ZMQ PULL address for receiving kernel commands
89+
cmd_addr: str
90+
# ZMQ PUSH address for sending kernel output
91+
stream_addr: str
92+
# Notebook file path, for debug logs
93+
file_path: str
94+
log_level: int
95+
96+
def encode_json(self) -> bytes:
97+
return msgspec.json.encode(self)
98+
99+
@classmethod
100+
def decode_json(cls, buf: bytes) -> AppHostArgs:
101+
return msgspec.json.decode(buf, type=cls)
102+
103+
104+
# ---------------- Kernel ----------------
105+
class Channel(enum.Enum):
106+
"""Command channels."""
107+
108+
CONTROL = b"control"
109+
UI_ELEMENT = b"ui_element"
110+
COMPLETION = b"completion"
111+
INPUT = b"input"
112+
113+
114+
# Sentinel sent on the stream channel when a kernel thread exits.
115+
class KernelExited:
116+
"""Signals that a kernel thread has exited."""
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Copyright 2026 Marimo. All rights reserved.
2+
"""ZMQ connection management for IPC between an app host and its process."""
3+
4+
from __future__ import annotations
5+
6+
import dataclasses
7+
from typing import TYPE_CHECKING
8+
9+
from marimo._config.settings import GLOBAL_SETTINGS
10+
from marimo._session.app_host.commands import AppHostArgs
11+
12+
if TYPE_CHECKING:
13+
import zmq
14+
15+
_BIND_ADDR = "tcp://127.0.0.1"
16+
17+
18+
@dataclasses.dataclass
19+
class AppHostConnection:
20+
"""Manages all ZeroMQ sockets for an AppHost."""
21+
22+
context: zmq.Context[zmq.Socket[bytes]]
23+
24+
# PUSH — single frame: encode_mgmt_command(MgmtCommand)
25+
mgmt: zmq.Socket[bytes]
26+
# PULL — single frame: decode_mgmt_response(bytes) -> MgmtResponse
27+
response: zmq.Socket[bytes]
28+
29+
# TODO(akshayka): Consider moving to something less fragile than pickle
30+
# PUSH — 3-frame multipart: [session_id, channel, pickle(payload)]
31+
cmd: zmq.Socket[bytes]
32+
# PULL — 2-frame multipart: [session_id, pickle(KernelMessage | KernelExited)]
33+
stream: zmq.Socket[bytes]
34+
35+
@classmethod
36+
def create(
37+
cls, file_path: str, log_level: int | None = None
38+
) -> tuple[AppHostConnection, AppHostArgs]:
39+
"""Bind all sockets, return connection and args for subprocess."""
40+
import zmq
41+
42+
if log_level is None:
43+
log_level = GLOBAL_SETTINGS.LOG_LEVEL
44+
45+
context = zmq.Context()
46+
try:
47+
mgmt = context.socket(zmq.PUSH)
48+
mgmt_port = mgmt.bind_to_random_port(_BIND_ADDR)
49+
50+
response = context.socket(zmq.PULL)
51+
response_port = response.bind_to_random_port(_BIND_ADDR)
52+
53+
cmd = context.socket(zmq.PUSH)
54+
cmd_port = cmd.bind_to_random_port(_BIND_ADDR)
55+
56+
stream = context.socket(zmq.PULL)
57+
stream_port = stream.bind_to_random_port(_BIND_ADDR)
58+
except Exception:
59+
context.destroy(linger=0)
60+
raise
61+
62+
conn = cls(
63+
context=context,
64+
mgmt=mgmt,
65+
response=response,
66+
cmd=cmd,
67+
stream=stream,
68+
)
69+
70+
args = AppHostArgs(
71+
mgmt_addr=f"{_BIND_ADDR}:{mgmt_port}",
72+
response_addr=f"{_BIND_ADDR}:{response_port}",
73+
cmd_addr=f"{_BIND_ADDR}:{cmd_port}",
74+
stream_addr=f"{_BIND_ADDR}:{stream_port}",
75+
file_path=file_path,
76+
log_level=log_level,
77+
)
78+
79+
return conn, args
80+
81+
def close(self) -> None:
82+
"""Close all sockets and destroy context."""
83+
self.mgmt.close(linger=0)
84+
self.response.close(linger=0)
85+
self.cmd.close(linger=0)
86+
self.stream.close(linger=0)
87+
self.context.destroy(linger=0)

0 commit comments

Comments
 (0)