IntroductionAs mentioned in the previous article, there are two kinds of threads in NodeJS. One is the event loop which is used to respond to user requests and process various callbacks. The other is the worker pool used to handle various time-consuming operations. The official website of nodejs mentioned a lib called webworker-threads that can use the nodejs local woker pool. Unfortunately, the last update of webworker-threads was 2 years ago, and it cannot be used at all in the latest nodejs 12. The author of webworker-threads recommended a new lib called web-worker. Web-worker is built on nodejs's worker_threads. This article will explain the use of worker_threads and web-worker in detail. worker_threadsThe source code of the worker_threads module comes from lib/worker_threads.js, which refers to the worker thread, which can start a new thread to execute JavaScript programs in parallel. Worker_threads are mainly used to handle CPU-intensive operations rather than IO operations, because the asynchronous IO of nodejs itself is already very powerful. There are 5 main attributes, 3 classes and 3 main methods in worker_threads. Next we will explain them one by one. isMainThreadisMainThread is used to determine whether the code is running in the main thread. Let's look at an example of its use: const { Worker, isMainThread } = require('worker_threads'); if (isMainThread) { console.log('in the main thread'); new Worker(__filename); } else { console.log('in the worker thread'); console.log(isMainThread); // prints 'false'. } In the above example, we introduced Worker and isMainThread from the worker_threads module. Worker is the main class of the worker thread, which we will explain in detail later. Here we use Worker to create a worker thread. MessageChannelMessageChannel represents an asynchronous two-way communication channel. There are no methods in MessageChannel. MessageChannel is mainly used to connect the MessagePorts at both ends. class MessageChannel { readonly port1: MessagePort; readonly port2: MessagePort; } When we use new MessageChannel(), two MessagePorts are automatically created. const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => console.log('received', message)); port2.postMessage({ foo: 'bar' }); // Prints: received { foo: 'bar' } from the `port1.on('message')` listener Through MessageChannel, we can communicate between MessagePorts. parentPort and MessagePortparentPort is a MessagePort type, which is mainly used for message interaction between worker threads and main threads. Messages sent via parentPort.postMessage() will be available in the main thread via worker.on('message'). Messages sent in the main thread via worker.postMessage() will be received in the worker thread via parentPort.on('message'). Let's look at the definition of MessagePort: class MessagePort extends EventEmitter { close(): void; postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void; ref(): void; unref(): void; start(): void; addListener(event: "close", listener: () => void): this; addListener(event: "message", listener: (value: any) => void): this; addListener(event: string | symbol, listener: (...args: any[]) => void): this; emit(event: "close"): boolean; emit(event: "message", value: any): boolean; emit(event: string | symbol, ...args: any[]): boolean; on(event: "close", listener: () => void): this; on(event: "message", listener: (value: any) => void): this; on(event: string | symbol, listener: (...args: any[]) => void): this; once(event: "close", listener: () => void): this; once(event: "message", listener: (value: any) => void): this; once(event: string | symbol, listener: (...args: any[]) => void): this; prependListener(event: "close", listener: () => void): this; prependListener(event: "message", listener: (value: any) => void): this; prependListener(event: string | symbol, listener: (...args: any[]) => void): this; prependOnceListener(event: "close", listener: () => void): this; prependOnceListener(event: "message", listener: (value: any) => void): this; prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this; removeListener(event: "close", listener: () => void): this; removeListener(event: "message", listener: (value: any) => void): this; removeListener(event: string | symbol, listener: (...args: any[]) => void): this; off(event: "close", listener: () => void): this; off(event: "message", listener: (value: any) => void): this; off(event: string | symbol, listener: (...args: any[]) => void): this; } MessagePort inherits from EventEmitter, which represents one end of an asynchronous two-way communication channel. This channel is called MessageChannel, and MessagePort communicates through MessageChannel. We can use MessagePort to transfer structure data, memory areas or other MessagePorts. From the source code, we can see that there are two events in MessagePort, close and message. The close event will be triggered when either end of the channel is disconnected, and the message event will be triggered when port.postMessage is called. Let's look at an example: const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); // Prints: // foobar // closed! port2.on('message', (message) => console.log(message)); port2.on('close', () => console.log('closed!')); port1.postMessage('foobar'); port1.close(); port.on('message') actually adds a listener for the message event. Port also provides the addListener method to manually add a listener. port.on('message') will automatically trigger the port.start() method, indicating the start of a port. When a port has a listener, it means that the port has a ref. When a ref exists, the program will not end. We can cancel this ref by calling the port.unref method. Next, let's take a look at how to transmit messages through the port: port.postMessage(value[, transferList]) postMessage can accept two parameters, the first parameter is value, which is a JavaScript object. The second parameter is transferList. Let's first look at a case where one parameter is passed: const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => console.log(message)); const circularData = {}; circularData.foo = circularData; // Prints: { foo: [Circular] } port2.postMessage(circularData); Normally, objects sent by postMessage are copies of value, but if you specify a transferList, the objects in the transferList will be transferred to the receiving end of the channel and will no longer exist on the sending end, just like sending the objects out. transferList is a list, and the objects in the list can be ArrayBuffer, MessagePort and FileHandle. If value contains a SharedArrayBuffer object, then the object cannot be included in transferList. Let's look at an example with two parameters: const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => console.log(message)); const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]); // post copy of uint8Array: port2.postMessage(uint8Array); port2.postMessage(uint8Array, [ uint8Array.buffer ]); //port2.postMessage(uint8Array); The above example will output:
The first postMessage is a copy, and the second postMessage is a transfer of the underlying buffer of Uint8Array. If we call port2.postMessage(uint8Array) again, we will get the following error: DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned. Buffer is the underlying storage structure of TypedArray. If buffer is transferred, the previous TypedArray will become unavailable. markAsUntransferableTo avoid this problem, we can call markAsUntransferable to mark the buffer as untransferable. Let's look at an example of markAsUntransferable: const { MessageChannel, markAsUntransferable } = require('worker_threads'); const pooledBuffer = new ArrayBuffer(8); const typedArray1 = new Uint8Array(pooledBuffer); const typedArray2 = new Float64Array(pooledBuffer); markAsUntransferable(pooledBuffer); const { port1 } = new MessageChannel(); port1.postMessage(typedArray1, [typedArray1.buffer]); console.log(typedArray1); console.log(typedArray2); SHARE_ENVSHARE_ENV is an env variable passed to the worker constructor. By setting this variable, we can read and write shared environment variables between the main thread and the worker thread. const { Worker, SHARE_ENV } = require('worker_threads'); new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV }) .on('exit', () => { console.log(process.env.SET_IN_WORKER); // Prints 'foo'. }); workerDataIn addition to postMessage(), you can also pass data from the main thread to the worker by passing workerData to the worker constructor in the main thread: const { Worker, isMainThread, workerData } = require('worker_threads'); if (isMainThread) { const worker = new Worker(__filename, { workerData: 'Hello, world!' }); } else { console.log(workerData); // Prints 'Hello, world!'. } Worker ClassLet's first look at the definition of worker: class Worker extends EventEmitter { readonly stdin: Writable | null; readonly stdout: Readable; readonly stderr: Readable; readonly threadId: number; readonly resourceLimits?: ResourceLimits; constructor(filename: string | URL, options?: WorkerOptions); postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void; ref(): void; unref(): void; terminate(): Promise<number>; getHeapSnapshot(): Promise<Readable>; addListener(event: "error", listener: (err: Error) => void): this; addListener(event: "exit", listener: (exitCode: number) => void): this; addListener(event: "message", listener: (value: any) => void): this; addListener(event: "online", listener: () => void): this; addListener(event: string | symbol, listener: (...args: any[]) => void): this; ... } Worker inherits from EventEmitter and includes 4 important events: error, exit, message and online. A worker represents an independent JavaScript execution thread. We can construct a worker by passing a filename or URL. Each worker has a pair of built-in MessagePorts that are associated with each other when the worker is created. The worker uses this pair of built-in MessagePort to communicate with the parent thread. Messages sent via parentPort.postMessage() will be available in the main thread via worker.on('message'). Messages sent in the main thread via worker.postMessage() will be received in the worker thread via parentPort.on('message'). Of course, you can also explicitly create a MessageChannel object and then pass the MessagePort as a message to other threads. Let's look at an example: const assert = require('assert'); const { Worker, MessageChannel, MessagePort, isMainThread, parentPort } = require('worker_threads'); if (isMainThread) { const worker = new Worker(__filename); const subChannel = new MessageChannel(); worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]); subChannel.port2.on('message', (value) => { console.log('Received:', value); }); } else { parentPort.once('message', (value) => { assert(value.hereIsYourPort instanceof MessagePort); value.hereIsYourPort.postMessage('The worker thread is sending this message'); value.hereIsYourPort.close(); }); } In the above example, we took advantage of the messaging capabilities of the worker and parentPort to pass a MessagePort in an explicit MessageChannel. The message is then distributed through the MessagePort. receiveMessageOnPort In addition to the port's on('message') method, we can also use receiveMessageOnPort to manually receive messages: const { MessageChannel, receiveMessageOnPort } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.postMessage({ hello: 'world' }); console.log(receiveMessageOnPort(port2)); // Prints: { message: { hello: 'world' } } console.log(receiveMessageOnPort(port2)); // Prints: undefined moveMessagePortToContextFirst, let's understand the concept of Context in nodejs. We can create a context from the VM. It is an isolated context environment, which ensures the security of different operating environments. Let's look at an example of context: const vm = require('vm'); const x = 1; const context = { x: 2 }; vm.createContext(context); // Context isolation object. const code = 'x += 40; var y = 17;'; // `x` and `y` are global variables in the context. // Initially, the value of x is 2, because that is the value of context.x. vm.runInContext(code, context); console.log(context.x); // 42 console.log(context.y); // 17 console.log(x); // 1; y is not defined. In the worker, we can move a MessagePort to another context. worker.moveMessagePortToContext(port, contextifiedSandbox) This method receives two parameters, the first parameter is the MessagePort to be moved, and the second parameter is the context object created by vm.createContext(). worker_threads thread poolAbove we mentioned the use of a single worker thread, but now one thread is often not enough in the program, we need to create a thread pool to maintain the worker thread objects. Nodejs provides the AsyncResource class as an extension of asynchronous resources. The AsyncResource class is in the async_hooks module. Next, let's see how to use the AsyncResource class to create a worker thread pool. Suppose we have a task that is used to add two numbers, and the script name is task_processor.js: const { parentPort } = require('worker_threads'); parentPort.on('message', (task) => { parentPort.postMessage(task.a + task.b); }); Here is the implementation of the worker pool: const { AsyncResource } = require('async_hooks'); const { EventEmitter } = require('events'); const path = require('path'); const { Worker } = require('worker_threads'); const kTaskInfo = Symbol('kTaskInfo'); const kWorkerFreedEvent = Symbol('kWorkerFreedEvent'); class WorkerPoolTaskInfo extends AsyncResource { constructor(callback) { super('WorkerPoolTaskInfo'); this.callback = callback; } done(err, result) { this.runInAsyncScope(this.callback, null, err, result); this.emitDestroy(); // `TaskInfo`s are used only once. } } class WorkerPool extends EventEmitter { constructor(numThreads) { super(); this.numThreads = numThreads; this.workers = []; this.freeWorkers = []; for (let i = 0; i < numThreads; i++) this.addNewWorker(); } addNewWorker() { const worker = new Worker(path.resolve(__dirname, 'task_processor.js')); worker.on('message', (result) => { // In case of success: Call the callback that was passed to `runTask`, // remove the `TaskInfo` associated with the Worker, and mark it as free // again. worker[kTaskInfo].done(null, result); worker[kTaskInfo] = null; this.freeWorkers.push(worker); this.emit(kWorkerFreedEvent); }); worker.on('error', (err) => { // In case of an uncaught exception: Call the callback that was passed to // `runTask` with the error. if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null); else this.emit('error', err); // Remove the worker from the list and start a new Worker to replace the // current one. this.workers.splice(this.workers.indexOf(worker), 1); this.addNewWorker(); }); this.workers.push(worker); this.freeWorkers.push(worker); this.emit(kWorkerFreedEvent); } runTask(task, callback) { if (this.freeWorkers.length === 0) { // No free threads, wait until a worker thread becomes free. this.once(kWorkerFreedEvent, () => this.runTask(task, callback)); return; } const worker = this.freeWorkers.pop(); worker[kTaskInfo] = new WorkerPoolTaskInfo(callback); worker.postMessage(task); } close() { for (const worker of this.workers) worker.terminate(); } } module.exports = WorkerPool; We create a new kTaskInfo property for the worker and encapsulate the asynchronous callback into WorkerPoolTaskInfo and assign it to worker.kTaskInfo. Next we can use workerPool: const WorkerPool = require('./worker_pool.js'); const os = require('os'); const pool = new WorkerPool(os.cpus().length); let finished = 0; for (let i = 0; i < 10; i++) { pool.runTask({ a: 42, b: 100 }, (err, result) => { console.log(i, err, result); if (++finished === 10) pool.close(); }); } This is the end of this article about how to use worker_threads in nodejs to create new threads. For more information about how to use worker_threads in nodejs to create threads, please search for previous articles on 123WORDPRESS.COM or continue to browse the following related articles. I hope you will support 123WORDPRESS.COM in the future! You may also be interested in:
|
<<: MySQL count detailed explanation and function example code
>>: Installing linux7.2 Internet access configuration tutorial on VMware virtual machine under win7
Table of contents What is an index The difference...
Table of contents Preface What is DrawCall How do...
Download and install. First check whether there i...
Table of contents 1. Software Package 2. Install ...
Zabbix deployment documentation After zabbix is ...
Table of contents 1. React Basic Usage Notable Fe...
Table of contents 1. What is a doubly linked list...
Installing MySQL 5.7 from TAR.GZ on Mac OS X Comp...
What is a descending index? You may be familiar w...
Table of contents 1. Create a vue-cli default pro...
Here is a Vue single sign-on demo for your refere...
Table of contents Vue2 Writing Vue3 plugin versio...
1. What is Vue Vue is a progressive framework for...
Recently, we received a request for help from a c...
Forgot your MySQL password twice? At first I did ...