在Node里耍多线程和多进程,会不会闪到腰?!
Node提供worker_threads模块,用于手动创建多线程。之前有介绍用于异步操作的libuv库,它主要作用于Node单线程事件循环机制中的异步操作,主线程碰到异步任务时,会把它扔给libuv,libuv完成后扔到任务队列里。libuv的高效,源于它基于线程池,这有些类似于C#的TPL了,后面章节再细说。
worker_threads并不依赖于libuv,它直接依赖于底层操作系统的线程实现,在每个Worker中,有独立的V8引擎和上下文。这至少意味着:(1)创建线程的代价是比较高的;(2)线程之间相互独立,需要依靠消息机制通讯,通讯开销会比较大。所以,没事别乱开线程。
//main.js //以下文件包含了主线程和Worker线程的代码,通过isMainThread来判断 //实际开发中,一般将Worker线程的代码放到单独的JS文件中 const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); if (isMainThread) { // 以下代码在主线程中======================= // 创建一个新的Worker线程 // __filename指向当前文件路径,表示Worker也在当前文件中 //workerData,用于向Worker线程传递数据 const worker = new Worker(__filename, { workerData: { start: 1, end: 100 } }); //主线程监听Worker线程,message、error和exit是事件名称 //监听从Worker线程发来的消息 //result为Worker线程中postMessage出来的值 worker.on('message', result => { console.log(`Result from worker: ${result}`); }); //监听Worker线程中的错误 worker.on('error', error => { console.error(`Worker error: ${error}`); }); //监听Worker线程的结束状态 worker.on('exit', code => { if (code !== 0) { console.error(`Worker stopped with exit code ${code}`); } else { console.log('Worker finished successfully'); } }); } else { // 以下代码在Worker线程中============================ // 访问workerData const { start, end } = workerData; // 简单的计算任务:计算[start, end]范围内的和 let sum = 0; for (let i = start; i <= end; i++) { sum += i; } // 发送结果给主线程 parentPort.postMessage(sum); } //main.js,主线程================================================ const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); // 创建一个新的MessageChannel,解构出两个通讯端口 const { port1, port2 } = new MessageChannel(); // 创建第一个Worker const worker1 = new Worker(path.resolve(__dirname, 'worker1.js')); // 创建第二个Worker const worker2 = new Worker(path.resolve(__dirname, 'worker2.js')); // 将port1发送给worker1,port2发送给worker2 worker1.postMessage({ port: port1 }, [port1]); worker2.postMessage({ port: port2 }, [port2]); //worker1.js,Worker1线程========================================= const { parentPort } = require('worker_threads'); //第一个message来自主线程,可获取通讯端口 parentPort.on('message', (message) => { const port = message.port; //第二个message来自其它Worker线程 port.on('message', (msg) => { console.log(`Worker1 received: ${msg}`); // 发送响应消息 port.postMessage('Hello from Worker1'); }); }); //worker2.js,Worker2线程========================================= const { parentPort } = require('worker_threads'); parentPort.on('message', (message) => { const port = message.port; port.postMessage('Hello from Worker2'); port.on('message', (msg) => { console.log(`Worker2 received: ${msg}`); }); }); //使用SharedArrayBuffer 和Atomics,可以绕过消息通讯,极大提升多线程通讯性能 /main.js,主线程=============================================== const { Worker } = require('worker_threads'); const path = require('path'); //创建一个共享的ArrayBuffer,并初始化 const sharedBuffer = new SharedArrayBuffer(4); // 4字节的共享内存 const sharedArray = new Int32Array(sharedBuffer); sharedArray[0] = 0; //创建两个Worker线程 const worker1 = new Worker(path.resolve(__dirname, 'worker.js'), { workerData: { sharedBuffer } }); const worker2 = new Worker(path.resolve(__dirname, 'worker.js'), { workerData: { sharedBuffer } }); //监听Worker的消息 worker1.on('message', (msg) => console.log('From Worker1:', msg)); worker2.on('message', (msg) => console.log('From Worker2:', msg)); //等待Workers完成任务 worker1.on('exit', () => console.log('Worker1 exited')); worker2.on('exit', () => console.log('Worker2 exited')); //worker.js,Worker线程========================================= const { parentPort, workerData } = require('worker_threads'); const { SharedArrayBuffer, Atomics } = require('atomics'); // 获取共享的ArrayBuffer const sharedArray = new Int32Array(workerData.sharedBuffer); // 使用Atomics,安全的进行原子操作 Atomics.add(sharedArray, 0, 1); // 增加共享数组的第一个元素 // 获取当前值 const currentValue = Atomics.load(sharedArray, 0); // 向主线程发送消息 parentPort.postMessage(`Current value: ${currentValue}`); Node不仅可以创建议多线程,还允许创建子进程。每个子进程,都是一个Node实例,有独立的V8引擎、Node运行时和内存资源。相比多线程,自然是更加消耗资源的(10-30M)。
Node提供了child_process和cluster两个模块用于创建和管理子进程。cluster建立在child_process之上,内置了负载均衡和自动重启机制,可以更加高效的利用CPU的多核性能,是专为Node服务器应用设计的。
child_process模块提供了spawn、exec、execFile、fork等方法用于创建子进程:
**spawn**:用于启动一个新的进程,可以与其进行数据流的交互。**exec**:用于运行一个命令,并将输出(stdout 和 stderr)作为回调函数的参数返回。**execFile**:与exec类似,但更适合直接执行文件而不是命令字符串。**fork**:专门用于创建新的Node.js进程,并且它有专门的通信通道,适用于父进程和子进程之间传递消息。(1)spawn方法
它是最基本的创建子进程的方法,适用于需要与子进程进行长时间交互的场景。
//在子进程中,执行系统操作命令=================================== const { spawn } = require('child_process'); // 示例:执行 ls 命令,[]中为参数 //spawn方法返回ChildProcess对象,具有 stdout、stderr、stdin属性 //分别对应子进程的标准输出、标准错误和标准输入流。 const ls = spawn('ls', ['-lh', '/usr']); // 监听子进程的标准输出 ls.stdout.on('data', (data) => { console.log(`stdout: ${data}`); }); // 监听子进程的标准错误输出 ls.stderr.on('data', (data) => { console.error(`stderr: ${data}`); }); // 监听子进程的退出事件 ls.on('close', (code) => { console.log(`子进程退出,退出码 ${code}`); }); //在子进程中,执行JS脚本========================================= const { spawn } = require('child_process'); // 示例:执行 JavaScript 文件 const child = spawn('node', ['script.js']); child.stdout.on('data', (data) => { console.log(`子进程 stdout:\n${data}`); }); child.stderr.on('data', (data) => { console.error(`子进程 stderr:\n${data}`); }); child.on('close', (code) => { console.log(`子进程退出,退出码 ${code}`); }); (2)exec方法
相比 spawn() 方法,它将整个命令(包括参数)作为一个字符串传递给底层的 shell执行,适合于简单的命令和短期执行的任务。
//在子进程中,执行系统操作命令=================================== const { exec } = require('child_process'); // 示例:执行 ls 命令 //直接在回调中监听 exec('ls -lh /usr', (error, stdout, stderr) => { if (error) { console.error(`执行错误:${error.message}`); return; } if (stderr) { console.error(`stderr: ${stderr}`); return; } console.log(`stdout: ${stdout}`); }); //在子进程中,执行JS脚本======================================== const { exec } = require('child_process'); // 示例:执行 JavaScript 文件 exec('node script.js', (error, stdout, stderr) => { if (error) { console.error(`执行错误:${error.message}`); return; } if (stderr) { console.error(`子进程 stderr:\n${stderr}`); return; } console.log(`子进程 stdout:\n${stdout}`); }); (3)execFile方法
execFile() 方法与 exec() 类似,但需要显式指定可执行文件的路径和参数列表,不会调用系统的 shell
//在子进程中,执行系统操作命令=================================== const { execFile } = require('child_process'); // 示例:执行 node 命令 execFile('node', ['--version'], (error, stdout, stderr) => { if (error) { console.error(`执行错误:${error.message}`); return; } console.log(`stdout: ${stdout}`); }); //在子进程中,执行JS脚本======================================== const { execFile } = require('child_process'); // 示例:执行 JavaScript 文件 execFile('node', ['script.js'], (error, stdout, stderr) => { if (error) { console.error(`执行错误:${error.message}`); return; } console.log(`子进程 stdout:\n${stdout}`); }); (4)fork方法
fork()是Node.js特有的,用于创建一个新的Node.js进程,并且会在父进程和子进程之间创建一个通信通道。它非常适合在多进程架构中进行进程间通信(IPC)
//main.js======================================================= const { fork } = require('child_process'); const child = fork('./child.js'); child.on('message', (msg) => { console.log(`Message from child: ${msg}`); }); child.send('Hello from parent'); // child.js===================================================== process.on('message', (msg) => { console.log(`Message from parent: ${msg}`); process.send('Hello from child'); }); cluster模块,基于child_process的fork方法,在此基础上,增加了负载均衡和自动重启等高级功能:
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; //CPU核数 if (cluster.isMaster) { //以下代码在主进程中执行 console.log(`主进程 ${process.pid} 正在运行`); // 衍生工作进程 for (let i = 0; i < numCPUs; i++) { cluster.fork(); //创建多个子进程 } // 监听工作进程的退出事件 cluster.on('exit', (worker, code, signal) => { console.log(`工作进程 ${worker.process.pid} 已退出`); }); } else { //以下代码在子进程中执行 // 每个工作进程都可以共享一个 TCP 连接 // 这里是一个 HTTP 服务器示例 http.createServer((req, res) => { res.writeHead(200); res.end('Hello World\n'); }).listen(8000); console.log(`工作进程 ${process.pid} 已启动`); } *这是一个系列文章,将全面介绍多线程、协程和单线程事件循环机制,建议收藏、点赞哦!
*你在并发编程过程中碰到了哪些难题?欢迎评论区交流~~~
我是functionMC > function MyClass(){…}
C#/TS/鸿蒙/AI等技术问题,以及如何写Bug、防脱发、送外卖等高深问题,都可以私信提问哦!
