Skip to content

Commit 4c95551

Browse files
committed
feat(node): worker_threads
1 parent 3964450 commit 4c95551

2 files changed

Lines changed: 208 additions & 0 deletions

File tree

node/worker_threads.ts

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/// <reference lib="webworker"/>
2+
3+
import { notImplemented } from "./_utils.ts";
4+
import { resolve, toFileUrl } from "../path/mod.ts";
5+
import { EventEmitter } from "./events.ts";
6+
7+
let environmentData = new Map();
8+
let threads = 0;
9+
10+
function attachProxy<T extends Record<PropertyKey, any>>(target: T, child: object) {
11+
function selectTarget(property: PropertyKey) {
12+
return Reflect.has(target, property) ? target : child;
13+
}
14+
15+
const getOrSet = (type: "get" | "set") => (_: T, property: string | symbol, receiver: any): any => {
16+
const target = selectTarget(property);
17+
const ret = Reflect[type](target, property, receiver);
18+
if (typeof ret === "function") return ret.bind(target);
19+
return ret;
20+
};
21+
22+
return new Proxy<T>(target, {
23+
get: getOrSet("get"),
24+
set: getOrSet("set"),
25+
deleteProperty(_, property) {
26+
return Reflect.deleteProperty(selectTarget(property), property);
27+
},
28+
getOwnPropertyDescriptor(_, property) {
29+
return Reflect.getOwnPropertyDescriptor(selectTarget(property), property);
30+
},
31+
has: (target, property) => Reflect.has(target, property) || Reflect.has(child, property),
32+
ownKeys: (target) => Reflect.ownKeys(target).concat(Reflect.ownKeys(child)),
33+
});
34+
}
35+
36+
interface _WorkerOptions {
37+
// only for typings
38+
argv?: any[];
39+
env?: object;
40+
eval?: boolean;
41+
execArgv?: string[];
42+
stdin?: boolean;
43+
stdout?: boolean;
44+
stderr?: boolean;
45+
trackUnmanagedFds?: boolean;
46+
resourceLimits?: {
47+
maxYoungGenerationSizeMb?: number;
48+
maxOldGenerationSizeMb?: number;
49+
codeRangeSizeMb?: number;
50+
stackSizeMb?: number;
51+
};
52+
53+
transferList?: Transferable[];
54+
workerData?: any;
55+
}
56+
interface _Worker extends Worker, EventEmitter {}
57+
class _Worker extends Worker {
58+
readonly threadId: number;
59+
readonly resourceLimits: Required<NonNullable<_WorkerOptions["resourceLimits"]>> = {
60+
maxYoungGenerationSizeMb: -1,
61+
maxOldGenerationSizeMb: -1,
62+
codeRangeSizeMb: -1,
63+
stackSizeMb: 4,
64+
};
65+
private readonly emitter = new EventEmitter();
66+
67+
constructor(specifier: URL | string, options?: _WorkerOptions) {
68+
super(typeof specifier === "string" ? toFileUrl(resolve(specifier)) : specifier, {
69+
...(options || {}),
70+
type: "module",
71+
// unstable
72+
deno: { namespace: true },
73+
});
74+
this.addEventListener("error", (event) => this.emitter.emit("error", event.error || event.message));
75+
this.addEventListener("messageerror", (event) => this.emitter.emit("messageerror", event.data));
76+
this.addEventListener("message", (event) => this.emitter.emit("message", event.data));
77+
this.postMessage({
78+
environmentData,
79+
threadId: (this.threadId = ++threads),
80+
workerData: options?.workerData,
81+
}, options?.transferList || []);
82+
this.emitter.emit("online");
83+
return attachProxy(this, this.emitter);
84+
}
85+
86+
terminate() {
87+
super.terminate();
88+
this.emitter.emit("exit", 0);
89+
}
90+
91+
readonly getHeapSnapshot = notImplemented;
92+
// fake performance
93+
readonly performance = globalThis.performance;
94+
}
95+
96+
export const isMainThread = typeof WorkerGlobalScope === "undefined" || self instanceof WorkerGlobalScope === false;
97+
// fake resourceLimits
98+
export const resourceLimits = isMainThread ? {} : {
99+
maxYoungGenerationSizeMb: 48,
100+
maxOldGenerationSizeMb: 2048,
101+
codeRangeSizeMb: 0,
102+
stackSizeMb: 4,
103+
};
104+
105+
let threadId = 0;
106+
let workerData = null;
107+
type ParentPort = WorkerGlobalScope & typeof globalThis & EventEmitter;
108+
let parentPort: ParentPort = null as any;
109+
110+
if (!isMainThread) {
111+
({ threadId, workerData, environmentData } = await new Promise((resolve) => {
112+
self.addEventListener("message", (event: MessageEvent) => resolve(event.data), { once: true });
113+
}));
114+
parentPort = attachProxy(self as ParentPort, new EventEmitter());
115+
parentPort.addEventListener("offline", () => parentPort.emit("close"));
116+
parentPort.addEventListener("message", (event: MessageEvent) => parentPort.emit("message", event.data));
117+
parentPort.addEventListener("messageerror", (event: MessageEvent) => parentPort.emit("messageerror", event.data));
118+
}
119+
120+
export function getEnvironmentData(key: any) {
121+
return environmentData.get(key);
122+
}
123+
124+
export function setEnvironmentData(key: any, value: any) {
125+
if (value === undefined) {
126+
environmentData.delete(key);
127+
} else {
128+
environmentData.set(key, value);
129+
}
130+
}
131+
132+
export const MessagePort = globalThis.MessagePort;
133+
export const MessageChannel = globalThis.MessageChannel;
134+
export const BroadcastChannel = globalThis.BroadcastChannel;
135+
export const SHARE_ENV = Symbol.for("nodejs.worker_threads.SHARE_ENV");
136+
export {
137+
_Worker as Worker,
138+
parentPort,
139+
threadId,
140+
workerData,
141+
notImplemented as markAsUntransferable,
142+
notImplemented as moveMessagePortToContext,
143+
notImplemented as receiveMessageOnPort,
144+
}
145+
146+
export default {
147+
markAsUntransferable: notImplemented,
148+
moveMessagePortToContext: notImplemented,
149+
receiveMessageOnPort: notImplemented,
150+
MessagePort,
151+
MessageChannel,
152+
BroadcastChannel,
153+
Worker: _Worker,
154+
getEnvironmentData,
155+
setEnvironmentData,
156+
SHARE_ENV,
157+
threadId,
158+
workerData,
159+
resourceLimits,
160+
parentPort,
161+
isMainThread,
162+
}

node/worker_threads_test.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { assertEquals, assertObjectMatch } from "../testing/asserts.ts";
2+
import * as workerThreads from "./worker_threads.ts";
3+
4+
Deno.test({
5+
name: "[worker_threads] isMainThread",
6+
fn() {
7+
assertEquals(workerThreads.isMainThread, true);
8+
}
9+
});
10+
11+
Deno.test({
12+
name: "[worker_threads] threadId",
13+
fn() {
14+
assertEquals(workerThreads.threadId, 0);
15+
}
16+
});
17+
18+
Deno.test({
19+
name: "[worker_threads] resourceLimits",
20+
fn() {
21+
assertObjectMatch(workerThreads.resourceLimits, {});
22+
}
23+
});
24+
25+
Deno.test({
26+
name: "[worker_threads] parentPort",
27+
fn() {
28+
assertEquals(workerThreads.parentPort, null);
29+
}
30+
});
31+
32+
Deno.test({
33+
name: "[worker_threads] workerData",
34+
fn() {
35+
assertEquals(workerThreads.workerData, null);
36+
}
37+
});
38+
39+
Deno.test({
40+
name: "[worker_threads] setEnvironmentData / getEnvironmentData",
41+
fn() {
42+
workerThreads.setEnvironmentData("test", "test");
43+
assertEquals(workerThreads.getEnvironmentData("test"), "test");
44+
}
45+
});
46+

0 commit comments

Comments
 (0)