背景
在nodejs主进程中,开启一个额外的子进程A,进程A负责和线程池通信,完成cpu密集型的任务。通过nodejs主进程创建出来的多个nodejs工作进程可以把任务提交到进程A,然后拿到处理结果。
尽管我们可以在主进程中保存工作进程的实例,但是想通过主进程,完成工作进程和进程A的通信还是非常麻烦,步骤如下
1 首先主进程要监听每个工作进程发过来的任务
2 然后把任务传给进程A
3 进程A处理完后,通知主进程,主进程再把结果发给对任务对应的子进程 其中,工作进程需要保存任务对应的上下文(比如说回调),因为工作进程可能同时给主进程发送了多个任务,当主进程通知工作进程某个任务完成的时候,工作进程需要通过任务找到对应的上下文,然后进行下一步处理,比如执行回调。
在主进程中开启一个服务,实现没有继承关系的子进程间通信,选取的进程间通信方式是unix域,没有选tcp是因为同主机的进程间通信,使用tcp过于重和低效(需要经过协议栈的封包和解包)。子进程可以通过该服务和主进程通信,然后主进程转发请求给处理cpu型任务的子进程。结构如下
在主进程而不是进程A中开启unix域服务是因为以后新增处理其他任务的子进程时,可以复用该unix域服务,起到api网关的作用。但是多了一层,会多了一些通信的成本。更直接的可以使用以下结构
客户端
const net = require('net');
const { EventEmitter } = require('events');
class Work extends EventEmitter {}
class UnixDomainClient extends EventEmitter {
constructor(options) {
super();
this.options = options;
}
send(data) {
const work = new Work();
const socket = net.connect(this.options.path);
socket.end(JSON.stringify(data));
socket.on('error', (e) => {
work.emit('error', e);
});
let res = null;
socket.on('data', (chunk) => {
res = res ? Buffer.concat([res, chunk]) : chunk;
});
socket.on('end', () => {
work.emit('message', res && res.toString());
});
return work;
}
}
const work = new UnixDomainClient({path: '/tmp/test.sock'}).send('hello');
work.on('message', function(res) {
console.log(res);
})
服务器
const fs = require('fs');
const net = require('net');
const constants = {
UNIX_PATH: '/tmp/test.sock',
}
if (fs.existsSync(constants.UNIX_PATH)) {
fs.unlinkSync(constants.UNIX_PATH);
}
const server = net.createServer({ allowHalfOpen: true }, (client) => {
let data = null;
client.on('data', (chunk) => {
data = data ? Buffer.concat([data, chunk]) : chunk;
});
client.on('end', () => {
console.log(`recive msg: ${data.toString()}`)
client.end('world');
});
});
server.listen(constants.UNIX_PATH, () => {
console.log(`bind uinx path ${constants.UNIX_PATH}`);
});
server.on('error', (error) => {
console.log(`unix domain server error ${error.toString()}`);
});
process.on('exit', () => {
if (fs.existsSync(constants.UNIX_PATH)) {
fs.unlinkSync(constants.UNIX_PATH);
}
});