12. nodejs中使用worker_threads来创建新的线程
简介
之前的文章中提到了,nodejs中有两种线程,一种是event loop用来相应用户的请求和处理各种callback。另一种就是worker pool用来处理各种耗时操作。
nodejs的官网提到了一个能够使用nodejs本地woker pool的lib叫做webworker-threads。
可惜的是webworker-threads的最后一次更新还是在2年前,而在最新的nodejs 12中,根本无法使用。
而webworker-threads的作者则推荐了一个新的lib叫做web-worker。
web-worker是构建于nodejs的worker_threads之上的,本文将会详细讲解worker_threads和web-worker的使用。
worker_threads
worker_threads模块的源代码源自lib/worker_threads.js,它指的是工作线程,可以开启一个新的线程来并行执行javascript程序。
worker_threads主要用来处理CPU密集型操作,而不是IO操作,因为nodejs本身的异步IO已经非常强大了。
worker_threads中主要有5个属性,3个class和3个主要的方法。接下来我们将会一一讲解。
isMainThread
isMainThread用来判断代码是否在主线程中运行,我们看一个使用的例子:
const { Worker, isMainThread } = require('worker_threads');
if (isMainThread) {
console.log('在主线程中');
new Worker(__filename);
} else {
console.log('在工作线程中');
console.log(isMainThread); // 打印 'false'。
}
上面的例子中,我们从worker_threads模块中引入了Worker和isMainThread,Worker就是工作线程的主类,我们将会在后面详细讲解,这里我们使用Worker创建了一个工作线程。
MessageChannel
MessageChannel代表的是一个异步双向通信channel。MessageChannel中没有方法,主要通过MessageChannel来连接两端的MessagePort。
class MessageChannel {
readonly port1: MessagePort;
readonly port2: MessagePort;
}
当我们使用new MessageChannel()的时候,会自动创建两个MessagePort。
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener
通过MessageChannel,我们可以进行MessagePort间的通信。
parentPort和MessagePort
parentPort是一个MessagePort类型,parentPort主要用于worker线程和主线程进行消息交互。
通过parentPort.postMessage()发送的消息在主线程中将可以通过worker.on('message')接收。
主线程中通过worker.postMessage()发送的消息将可以在工作线程中通过parentPort.on('message')接收。
我们看一下MessagePort的定义:
class MessagePort extends EventEmitter {
close(): void;
postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
ref(): void;
unref(): void;
start(): void;
addListener(event: "close", listener: () => void): this;
addListener(event: "message", listener: (value: any) => void): this;
addListener(event: string | symbol, listener: (...args: any[]) => void): this;
emit(event: "close"): boolean;
emit(event: "message", value: any): boolean;
emit(event: string | symbol, ...args: any[]): boolean;
on(event: "close", listener: () => void): this;
on(event: "message", listener: (value: any) => void): this;
on(event: string | symbol, listener: (...args: any[]) => void): this;
once(event: "close", listener: () => void): this;
once(event: "message", listener: (value: any) => void): this;
once(event: string | symbol, listener: (...args: any[]) => void): this;
prependListener(event: "close", listener: () => void): this;
prependListener(event: "message", listener: (value: any) => void): this;
prependListener(event: string | symbol, listener: (...args: any[]) => void): this;
prependOnceListener(event: "close", listener: () => void): this;
prependOnceListener(event: "message", listener: (value: any) => void): this;
prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;
removeListener(event: "close", listener: () => void): this;
removeListener(event: "message", listener: (value: any) => void): this;
removeListener(event: string | symbol, listener: (...args: any[]) => void): this;
off(event: "close", listener: () => void): this;
off(event: "message", listener: (value: any) => void): this;
off(event: string | symbol, listener: (...args: any[]) => void): this;
}
MessagePort继承自EventEmitter,它表示的是异步双向通信channel的一端。这个channel就叫做MessageChannel,MessagePort通过MessageChannel来进行通信。
我们可以通过MessagePort来传输结构体数据,内存区域或者其他的MessagePorts。
从源代码中,我们可以看到MessagePort中有两个事件,close和message。
close事件将会在channel的中任何一端断开连接的时候触发,而message事件将会在port.postMessage时候触发,下面我们看一个例子:
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
// Prints:
// foobar
// closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));
port1.postMessage('foobar');
port1.close();
port.on('message')实际上为message事件添加了一个listener,port还提供了addListener方法来手动添加listener。
port.on('message')会自动触发port.start()方法,表示启动一个port。
当port有listener存在的时候,这表示port存在一个ref,当存在ref的时候,程序是不会结束的。我们可以通过调用port.unref方法来取消这个ref。
接下来我们看一下怎么通过port来传输消息:
port.postMessage(value[, transferList])
postMessage可以接受两个参数,第一个参数是value,这是一个JavaScript对象。第二个参数是transferList。
先看一个传递一个参数的情况:
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log(message));
const circularData = {};
circularData.foo = circularData;
// Prints: { foo: [Circular] }
port2.postMessage(circularData);
通常来说postMessage发送的对象都是value的拷贝,但是如果你指定了transferList,那么在transferList中的对象将会被transfer到channel的接受端,并且不再存在于发送端,就好像把对象传送出去一样。
transferList是一个list,list中的对象可以是ArrayBuffer, MessagePort 和 FileHandle。
如果value中包含SharedArrayBuffer对象,那么该对象不能被包含在transferList中。
看一个包含两个参数的例子:
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log(message));
const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// post uint8Array的拷贝:
port2.postMessage(uint8Array);
port2.postMessage(uint8Array, [ uint8Array.buffer ]);
//port2.postMessage(uint8Array);
上面的例子将输出:
Uint8Array(4) [ 1, 2, 3, 4 ]
Uint8Array(4) [ 1, 2, 3, 4 ]
第一个postMessage是拷贝,第二个postMessage是transfer Uint8Array底层的buffer。
如果我们再次调用port2.postMessage(uint8Array),我们会得到下面的错误:
DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.
buffer是TypedArray的底层存储结构,如果buffer被transfer,那么之前的TypedArray将会变得不可用。
markAsUntransferable
要想避免这个问题,我们可以调用markAsUntransferable将buffer标记为不可transferable. 我们看一个markAsUntransferable的例子:
const { MessageChannel, markAsUntransferable } = require('worker_threads');
const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);
markAsUntransferable(pooledBuffer);
const { port1 } = new MessageChannel();
port1.postMessage(typedArray1, [ typedArray1.buffer ]);
console.log(typedArray1);
console.log(typedArray2);