V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
fourstring
V2EX  ›  Node.js

在 ZeroMQ 的 Response/Request 模式中,服务端回调函数逻辑内调用 fs.readFile 时,为何会阻塞执行?(内有代码)

  •  
  •   fourstring · 2019-07-27 07:47:56 +08:00 · 4900 次点击
    这是一个创建于 1929 天前的主题,其中的信息可能已经有所发展或是发生改变。

    Node 版本 10.16.0 ZeroMQ 版本 5.1.0 使用 ZeroMQ 的代码如下:

    服务端 /zmq-rep.js

    "use strict";
    const fs = require("fs");
    const zmq = require("zeromq");
    const moment = require("moment");
    const responder = zmq.socket("rep");
    
    responder.on("message", data => {
      let reqData = JSON.parse(data);
      let filename = reqData.path;
      console.log(`Request received:${filename}`);
      fs.readFile(filename, (err, data) => {
        if (err) {
          throw Error(err);
        }
        console.log(`Sending response for ${filename} at ${moment().format()}`);
        responder.send(
          JSON.stringify({
            content: data.toString(),
            timestamp: moment().format("x"),
            pid: process.pid
          })
        );
      });
    });
    
    responder.bind("tcp://127.0.0.1:50221", err => {
      if (err) {
        throw Error(err);
      }
      console.log("Now listening for requests...");
    });
    
    process.on("SIGINT", () => {
      console.log("Now closing server...");
      responder.close();
    });
    

    客户端 /zmq-req.js

    "use strict";
    const zmq = require("zeromq");
    const moment = require("moment");
    const filename = process.argv[2];
    const request = zmq.socket("req");
    request.on("message", data => {
      let response = JSON.parse(data);
      console.log(
        `Fetch file content:${response.content} at ${
          response.timestamp
        } processed by ${response.pid}`
      );
    });
    request.connect("tcp://127.0.0.1:50221");
    
    for (let i = 1; i <= 5; i++) {
      console.log(
        `Sending request to get ${filename} for ${i} time(s) at ${moment().format()}`
      );
      request.send(
        JSON.stringify({
          path: filename
        })
      );
    }
    

    使用如下方式调用:

    # Terminal 1
    $ node zmq-rep.js
    # Terminal 2
    $ node zmq-req.js target.txt
    

    服务端输出如下:

    Now listening for requests...
    Request received:target.txt
    Sending response for target.txt at 2019-07-27T07:36:47+08:00
    Request received:target.txt
    Sending response for target.txt at 2019-07-27T07:36:47+08:00
    Request received:target.txt
    Sending response for target.txt at 2019-07-27T07:36:47+08:00
    Request received:target.txt
    Sending response for target.txt at 2019-07-27T07:36:47+08:00
    Request received:target.txt
    Sending response for target.txt at 2019-07-27T07:36:47+08:00
    

    客户端输出如下

    Sending request to get target.txt for 1 time(s) at 2019-07-27T07:36:47+08:00
    Sending request to get target.txt for 2 time(s) at 2019-07-27T07:36:47+08:00
    Sending request to get target.txt for 3 time(s) at 2019-07-27T07:36:47+08:00  
    Sending request to get target.txt for 4 time(s) at 2019-07-27T07:36:47+08:00
    Sending request to get target.txt for 5 time(s) at 2019-07-27T07:36:47+08:00  
    Fetch file content:50221 at 1564184207708 processed by 12984
    Fetch file content:50221 at 1564184207734 processed by 12984
    Fetch file content:50221 at 1564184207747 processed by 12984
    Fetch file content:50221 at 1564184207766 processed by 12984
    Fetch file content:50221 at 1564184207794 processed by 12984
    

    可以看到,客户端代码发送请求是非阻塞的,但服务端代码是阻塞的。《 Node.js 8 the right way 》对这个现象的解释是:"Node.js event loop was left spinning while the fs.readFile for each request was being processed."但查阅文档,fs.readFile 本身是非阻塞的,如果这一解释成立的话,fs.readFile 不就是阻塞函数了吗?

    我另外写了一份只使用 net 模块进行通讯的代码,现象与使用 ZeroMQ 进行通信不同,在这种情况下,服务端的回调并没有阻塞。

    服务端 /tcp-server.js

    "use strict";
    const fs = require("fs");
    const net = require("net");
    const moment = require("moment");
    
    const server = net
      .createServer(connection => {
        console.log("Request received");
        connection.on("data", data => {
          let reqData = JSON.parse(data);
          console.log(`Reading ${reqData.path} at ${moment().format()}`);
          fs.readFile(reqData.path, (err, data) => {
            connection.write(
              JSON.stringify({
                content: data.toString(),
                timestamp: moment().format()
              })
            );
          });
        });
      })
      .listen({ port: 50221, hostname: "127.0.0.1" })
      .on("listening", () => {
        console.log("Now listening...");
      });
    

    客户端 /tcp-client.js

    "use strict";
    const net = require("net");
    const filename = process.argv[2];
    const connectionPool = [];
    
    for (let i = 1; i <= 5; i++) {
      connectionPool.push(
        net
          .createConnection(50221, "127.0.0.1", () => {
            console.log(
              `Connetcion ${i} established and start fetching file ${filename} on server...`
            );
          })
          .on("data", data => {
            console.log(`Receive response:${data}`);
          })
      );
    }
    connectionPool.forEach(connection => {
      connection.write(
        JSON.stringify({
          path: filename
        })
      );
    });
    
    process.on("SIGINT", () => {
      connectionPool.forEach(connection => {
        connection.end();
      });
    });
    

    调用如下:

    # Terminal 1
    $ node tcp-server.js
    # Terminal 2
    $ node tcp-client.js target.txt
    

    客户端输出如下:

    Connetcion 1 established and start fetching file target.txt on server...
    Connetcion 2 established and start fetching file target.txt on server...
    Connetcion 3 established and start fetching file target.txt on server...      
    Connetcion 4 established and start fetching file target.txt on server...      
    Connetcion 5 established and start fetching file target.txt on server...      
    Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}
    Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}
    Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}  
    Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}  
    Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}
    

    服务端输出如下:

    Now listening...
    Request received
    Request received
    Request received
    Request received
    Request received
    Reading target.txt at 2019-07-27T07:44:53+08:00
    Reading target.txt at 2019-07-27T07:44:53+08:00
    Reading target.txt at 2019-07-27T07:44:53+08:00
    Reading target.txt at 2019-07-27T07:44:53+08:00
    Reading target.txt at 2019-07-27T07:44:53+08:00
    

    于是可以看到,不论在服务端还是客户端的 io 都是非阻塞的。

    这种不同是否与 ZeroMQ 的实现有关呢?感谢大佬们的回复!

    5 条回复    2019-07-27 09:07:10 +08:00
    v2byy
        1
    v2byy  
       2019-07-27 08:14:43 +08:00 via iPhone   ❤️ 1
    zeromq 的 req/rep 模式,server 只能收到 req,然后 rep,再收 req,再 rep。如果要异步,建议用 dealer/router 模式
    fourstring
        2
    fourstring  
    OP
       2019-07-27 08:22:37 +08:00
    @v2byy #1 非常感谢,这和 ZeroMQ response 模式服务端回调的执行方式有关吗?按我的理解在 node 中执行回调不应该都是异步非阻塞的吗?或者说 ZeroMQ 在一个 request 的回调完成之前不会读下一个 request ?
    v2byy
        3
    v2byy  
       2019-07-27 08:59:31 +08:00   ❤️ 1
    @fourstring zeromq 有好几种 message model,你说的 request/reply 模式是同步的,就是 server 必须进行一次 reply 之后才能再次接收 client 请求。看下 zeromq 的文档吧。

    http://zguide.zeromq.org/page:all#The-Request-Reply-Mechanisms
    Alexhohom
        4
    Alexhohom  
       2019-07-27 09:03:53 +08:00
    最近也在看 zmq,req-rep 会阻塞。想不阻塞可以考虑 Push-Pull 模式吧,可能要开两组端口。
    rawidn
        5
    rawidn  
       2019-07-27 09:07:10 +08:00 via Android
    可以考虑使用 grpc 之类的呢
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   4367 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 05:31 · PVG 13:31 · LAX 21:31 · JFK 00:31
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.