Node 集群源码初探

更新日期: 2019-06-23阅读: 2.7k标签: 源码

引言

当我们谈起 nodejs 时,由于 JavaScript 只能在单线程上运行, 导致 一个 Node 进程只能运行在一个CPU上, 无法发挥现代 CPU 多核的特性。 这对于一个 服务端语言来说, 是比较掣肘其发展的。 好在 Node 在 v0.10 后, 可以使用 Cluster 模块搭建 多进程服务, 并在 v0.12 重写了该模块, 大幅提高其性能, 下面我们将走进 Node-Cluster ,看看 Node 是如何实现集群的。


多进程单线程 与 单进程多线程 模型

首先,这涉及一个 程序设计 的两种模型 本文提及的 Nodejs 便是使用 多进程单线程 实现的, 而 Java 等语言 则使用 单进程多线程 模型 故本文着重 多进程单线程 模型, 至于 单进程多线程 模型,则会在比较中稍微涉及。


多进程单线程

正如上文说到, JavaScript 只能在单线程中运行, 故 Node 在 JS层面只能 在某进程的一个线程中运行。 若要实现集群, 则必须创建多个进程, 以实现多个应用实例同时运行。

Cluster 模块, 提供了 master-worker 模式 启动多个应用模式。

接下来我们就走入这个模块, 看看其内部具体做了哪些事情。


Cluster

Cluster 是什么?

  1. 在服务器上启动多个进程
  2. 每个进程里都跑的同一份代码
  3. 每个进程竟然还能监听同一个端口 (下文分析实现原理)

其中:

  1. 负责启动其他进程的叫做 Master 进程,他好比是个『包工头』,不做具体的工作,只负责启动其他进程。
  2. 其他被启动的叫 Worker 进程,顾名思义就是干活的『工人』。它们接收请求,对外提供服务。
  3. Worker 进程的数量一般根据服务器的 CPU 核数来定,这样就可以完美利用多核资源。
const cluster = require('cluster');  
const http = require('http');  
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {  
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}


worker 进程的创建

使用 Cluster 模块的 fork 方法来创建出子进程

cluster.fork();

先从 worker 进程的初始化开始看, master 进程在 fork 其 worker进程时 会在其环境变量(workerEnv)中 附加上一个 唯一ID(NODE UNIQUE ID) 该ID 是一个从 0 开始的递增数。

var ids = 0;

//.....

cluster.fork = function(env) {  
  cluster.setupMaster();
  const id = ++ids;
  const workerProcess = createWorkerProcess(id, env);
  const worker = new Worker({
    id: id,
    process: workerProcess
  });

// ....

function createWorkerProcess(id, env) {  
  const workerEnv = util._extend({}, process.env);
  const execArgv = cluster.settings.execArgv.slice();
  const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/;

  util._extend(workerEnv, env);
  workerEnv.NODE_UNIQUE_ID = '' + id;

  // .....

  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

然后 Node 在实例初始化时,使用该 ID 判断使用 clild.js Or master.js

const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';  
module.exports = require(`internal/cluster/${childOrMaster}`);

把目光再集中到 这个 fork() 函数中来,

exports.fork = function(modulePath /*, args, options*/) {

  // Get options and args arguments.
  var execArgv;
  var options = {};
  var args = [];
  var pos = 1;
  if (pos < arguments.length && Array.isArray(arguments[pos])) {
    args = arguments[pos++];
  }

  if (pos < arguments.length && arguments[pos] != null) {
    if (typeof arguments[pos] !== 'object') {
      throw new TypeError('Incorrect value of args option');
    }

    options = util._extend({}, arguments[pos++]);
  }

  // Prepare arguments for fork:
  execArgv = options.execArgv || process.execArgv;

  if (execArgv === process.execArgv && process._eval != null) {
    const index = execArgv.lastIndexOf(process._eval);
    if (index > 0) {
      // Remove the -e switch to avoid fork bombing ourselves.
      execArgv = execArgv.slice();
      execArgv.splice(index - 1, 2);
    }
  }

  args = execArgv.concat([modulePath], args);

  if (typeof options.stdio === 'string') {
    options.stdio = stdioStringToArray(options.stdio);
  } else if (!Array.isArray(options.stdio)) {
    // Use a separate fd=3 for the IPC channel. Inherit stdin, stdout,
    // and stderr from the parent if silent isn't set.
    options.stdio = options.silent ? stdioStringToArray('pipe') :
      stdioStringToArray('inherit');
  } else if (options.stdio.indexOf('ipc') === -1) {
    throw new TypeError('Forked processes must have an IPC channel');
  }

  options.execPath = options.execPath || process.execPath;
  options.shell = false;

  return spawn(options.execPath, args, options);
};

在函数中 做了一些 参数准备,而重点在于 对 这个 options.stdio 的处理。

options.stdio 用于配置子进程与父进程之间建立的管道, 其值应该为一个数组, 但是为了方便, 值可以是以下的字符串之一:

'pipe' - 等同于 ['pipe', 'pipe', 'pipe'] (默认)

'ignore' - 等同于 ['ignore', 'ignore', 'ignore']

'inherit' - 等同于 [process.stdin, process.stdout, process.stderr] 或 [0,1,2]

其每个值 分别对应 [process.stdin, process.stdout, process.stderr] 标准输入、标准输出、标准错误 输出到父进程的方式。

而使用 Fork 方式 衍生出的子进程,又必须加上 一个 IPC通道 用于父子间 传递消息或文件描述符, 故 stdioStringToArray 函数代码如下

function stdioStringToArray(option) {  
  switch (option) {
    case 'ignore':
    case 'pipe':
    case 'inherit':
      return [option, option, option, 'ipc'];
    default:
      throw new TypeError('Incorrect value of stdio option: ' + option);
  }
}


多进程之间 的 进程间通信

下面我们就重点来看,这个 IPC通道 是如何实现的,以及其工作原理

进程间通信(Inter-process communication, IPC)其实是个很简单的概念,只要你将这个进程的数据传到 另外一个进程就是 IPC 了, 要实现 这个数据传递的方式有非常的多, 如以下

在 Node 中 IPC 实现分两种, 在 Windows 上通过 命名管道, 在 UNIX 上则使用 UNIX domain sockets (UDS), 详情参见 官方文档

目前 Linux 还是主流的服务端操作系统, 主要分析下在 Linux 下 UDS 的使用方式

UDS 是在 Socket 的基础上发展而来的, Socket 一般我们指的都是 IP Socket , 通过网络协议进行通信, 但是在同一台设备上的通信,是否可以绕开网络层的限制呢。 这里我们就可以通过 UDS 来实现, 所以把 它称之为 LocalSocket ,看起来更贴切一点。

在 Linux 中, 一切都可以当做是 文件。 UDS 也不例外。

在 Node 的 child_process.js 模块中, 有如下代码

stdio = stdio.reduce(function(acc, stdio, i) {  
  // ......
else if (stdio === 'ipc') {  
      if (sync || ipc !== undefined) {
        // Cleanup previously created pipes
        cleanup();
        if (!sync)
          throw new errors.Error('ERR_IPC_ONE_PIPE');
        else
          throw new errors.Error('ERR_IPC_SYNC_FORK');
      }

      ipc = new Pipe(PipeConstants.IPC);
      ipcFd = i;

      acc.push({
        type: 'pipe',
        handle: ipc,
        ipc: true
      });
    }

当 stdio 的 类型为 ipc 时,会创建一个 ipc 管道, 其fd 为 'ipc' 在stdio数组 中的索引

ipc = new Pipe(PipeConstants.IPC);  
ipcFd = i;

此时目光应该被这个 Pipe 所吸引了吧, 那么它又是什么呢, 话不多少直接上 libuv 中对应 Pipe 的实现。

void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {  
  // This constructor should not be exposed to public javascript.
  // Therefore we assert that we are not trying to call this as a
  // normal function.
  CHECK(args.IsConstructCall());
  CHECK(args[0]->IsInt32());
  Environment* env = Environment::GetCurrent(args);

  int type_value = args[0].As<Int32>()->Value();
  PipeWrap::SocketType type = static_cast<PipeWrap::SocketType>(type_value);

  bool ipc;
  ProviderType provider;
  switch (type) {
    case SOCKET:
      provider = PROVIDER_PIPEWRAP;
      ipc = false;
      break;
    case SERVER:
      provider = PROVIDER_PIPESERVERWRAP;
      ipc = false;
      break;
    case IPC:
      provider = PROVIDER_PIPEWRAP;
      ipc = true;
      break;
    default:
      UNREACHABLE();
  }

  new PipeWrap(env, args.This(), provider, ipc);
}


PipeWrap::PipeWrap(Environment* env,  
                   Local<Object> object,
                   ProviderType provider,
                   bool ipc)
    : ConnectionWrap(env, object, provider) {
  int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
  CHECK_EQ(r, 0);  // How do we proxy this error up to javascript?
                   // Suggestion: uv_pipe_init() returns void.
  UpdateWriteQueueSize();
}
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {  
  uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
  handle->shutdown_req = NULL;
  handle->connect_req = NULL;
  handle->pipe_fname = NULL;
  handle->ipc = ipc;
  return 0;
}
void uv__stream_init(uv_loop_t* loop,  
                     uv_stream_t* stream,
                     uv_handle_type type) {
  int err;

  uv__handle_init(loop, (uv_handle_t*)stream, type);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  stream->close_cb = NULL;
  stream->connection_cb = NULL;
  stream->connect_req = NULL;
  stream->shutdown_req = NULL;
  stream->accepted_fd = -1;
  stream->queued_fds = NULL;
  stream->delayed_error = 0;
  QUEUE_INIT(&stream->write_queue);
  QUEUE_INIT(&stream->write_completed_queue);
  stream->write_queue_size = 0;

  if (loop->emfile_fd == -1) {
    err = uv__open_cloexec("/dev/null", O_RDONLY);
    if (err < 0)
        /* In the rare case that "/dev/null" isn't mounted open "/"
         * instead.
         */
        err = uv__open_cloexec("/", O_RDONLY);
    if (err >= 0)
      loop->emfile_fd = err;
  }

#if defined(__APPLE__)
  stream->select = NULL;
#endif /* defined(__APPLE_) */

  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

可以看到对应 ipc 管道, 底层是通过文件流的方式实现的。 那么这个管道 就一样拥有 和 stream 一样的模式,即 open—write/read—close

父进程在实际创建子进程前,会创建IPC通道并监听它,然后才真正创建出子进程,并通过环境变量(NODE CHANNEL FD)告诉子进程这个IPC通信的文件描述符(fd)。子进程在启动的过程中,根据文件描述符去连接这个已存在的IPC通道,从而完成父子进程之间的连接。

当父进程 send 数据到子进程时, 便通过这个 fd 向 这个特殊的文件开始写入数据,此时调用底层stream 的 write 方法,而子进程由于在启动的过程中便已经连接上 该通道, 在应用层通过 message 事件(底层应是 stream 的 read方法),来接收数据。

由于这个IPC通道是 双工通信的, 故子进程也可以实现向父进程通信。


问题解析

多进程之间是如何实现 端口共享的?

那么 node 又是如何实现多进程 监听同一个端口呢,这里的关键在于 句柄传递 。

如下代码:

//主进程代码

var child = require('child_process').fork('child.js');  
// Open up the server object and send the handle
var server = require('net').createServer();  
server.on('connection', function (socket) {  
    socket.end('handled by parent\n');
});
server.listen(1337, function () {  
    child.send('server', server);
});

//子进程代码
process.on('message', function (m, server) {  
    if (m === 'server') {
        server.on('connection', function (socket) {
            socket.end('handled by child\n');
        });
    }
});

在示例中,父进程直接将创建出来的 Tcp对象 传递到子进程中,但是我们知道两个进程之间又是无法直接共享内存的, 那么这又是怎么实现的呢?

来看这个 send方法

child.send(message, [sendHandle])

目前,子进程对象send()方法可以发送的句柄类型包括如下几种:

1.net.socket,tcp套接字

2.net.Server,tcp服务器,任意建立在tcp服务上的应用层服务都可以享受到它带来的好处。

3.net.Native,c++层面的tcp套接字或IPC管道。

4.dgram.socket,UDP套接字

5.dgram.Native,C++层面的UDP套接字

send()方法在将消息发送到IPC管道前,将消息组装成两个对象,一个参数是handle,另一个是message, 而发送到子进程的 实际上是这个句柄的 文件描述符, message对象也会序列化为字符串。

而子进程由于之前连接了IPC通道, 可以读取到父进程发送的消息。获取到这个消息后, 通过JSON.parse 还原为对象, 并分析 其中的cmd值, 若 message.cmd = NODE_HANDLE, 则表示父进程传递的是一个句柄,此时便会通过 获取到的 fd 和传递的句柄类型,还原出这个Tcp对象。

还原出这个server 对象后, 便是去监听这个端口了。

setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))

而在 libuv 中在 setsockopt 时设置了 SO_REUSEADDR 选项(关于这个选项的含义可以自行百度),简而言之,设置了这个选项后, 在Linux层面允许我们使用不同的进程 就相同的网卡和端口进行监听, 对于独立启动的进程,并不知道彼此的fd, 所以当一个进程监听成功后, 后面的进程便会失败,但是 通过 send 方式还原出来的 server对象,他们的fd是相同的,此时该子进程就可以通过这个fd 去监听这个端口。

但是 fd 在同一时刻只能被一个进程所占用, 换言之就是网络请求向服务器端发送时,只有一个幸运的进程能够抢到连接,也就是说只有他能为这个请求进行服务。这些进程也都是抢占式的。


总结

随着这些模块逐渐完善, Nodejs 在服务端的使用场景也越来越丰富,如果你仅仅是因为JS 这个后缀而注意到它的话, 那么我希望你能暂停脚步,好好了解一下这门年轻的语言,相信它会给你带来惊喜。


参考

https://github.com/nodejs/node

https://nodejs.org/dist/latest-v10.x/docs/api/net.html#net

https://www.jianshu.com/p/335a9e101c3f

http://docs.libuv.org/en/v1.x/stream.html

原文: https://blog.souche.com/untitled-25/


链接: https://www.fly63.com/article/detial/3840

Js中的 forEach 源码

在日常 Coding 中,码农们肯定少不了对数组的操作,其中很常用的一个操作就是对数组进行遍历,查看数组中的元素,然后一顿操作猛如虎。今天暂且简单地说说在 JavaScript 中 forEach。

微信小程序代码源码案例大全

克隆项目代码到本地(git应该都要会哈,现在源码几乎都会放github上,会git才方便,不会的可以自学一下哦,不会的也没关系,gitHub上也提供直接下载的链接);打开微信开发者工具;

Vue源码之实例方法

在 Vue 内部,有一段这样的代码:上面5个函数的作用是在Vue的原型上面挂载方法。initMixin 函数;可以看到在 initMixin 方法中,实现了一系列的初始化操作,包括生命周期流程以及响应式系统流程的启动

vue源码解析:nextTick

nextTick的使用:vue中dom的更像并不是实时的,当数据改变后,vue会把渲染watcher添加到异步队列,异步执行,同步代码执行完成后再统一修改dom,我们看下面的代码。

React源码解析之ReactDOM.render()

React更新的方式有三种:(1)ReactDOM.render() || hydrate(ReactDOMServer渲染)(2)setState(3)forceUpdate;接下来,我们就来看下ReactDOM.render()源码

React源码解析之ExpirationTime

在React中,为防止某个update因为优先级的原因一直被打断而未能执行。React会设置一个ExpirationTime,当时间到了ExpirationTime的时候,如果某个update还未执行的话,React将会强制执行该update,这就是ExpirationTime的作用。

扒开V8引擎的源码,我找到了你们想要的前端算法

算法对于前端工程师来说总有一层神秘色彩,这篇文章通过解读V8源码,带你探索 Array.prototype.sort 函数下的算法实现。来,先把你用过的和听说过的排序算法都列出来:

jQuery源码之extend的实现

extend是jQuery中一个比较核心的代码,如果有查看jQuery的源码的话,就会发现jQuery在多处调用了extend方法。作用:对任意对象进行扩;’扩展某个实例对象

vuex源码:state及strict属性

state也就是vuex里的值,也即是整个vuex的状态,而strict和state的设置有关,如果设置strict为true,那么不能直接修改state里的值,只能通过mutation来设置

webpack 源码从零开始:apable模型

最近在看webpack的源码,发现有个比较头疼的点是:代码看起来非常跳跃,往往看不到几行就插入一段新内容,为了理解又不得不先学习相关的前置知识。层层嵌套之后,发现最基础的还是tapable模型,因此先对这部分的内容做一个介绍。

点击更多...

内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!