V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
sun2920989
V2EX  ›  PHP

一个可以在 PHP -FPM 环境来并发访问 HTTP 接口的工具类

  •  
  •   sun2920989 · 2021-04-26 20:13:20 +08:00 · 2451 次点击
    这是一个创建于 1349 天前的主题,其中的信息可能已经有所发展或是发生改变。

    https://github.com/CodeApePro/TcpMockHttp

    这并不是一个开源的项目,只是个人在使用的一段工具类代码.

    原理是通过 sockets 扩展创建 tcp 连接,在 tcp 连接上传输符合 http1.1 协议的数据,从而将发送数据与读取结果分开.

    发出来以供遇到类似场景时参考.

    45 条回复    2021-06-02 10:53:10 +08:00
    MarlonFan
        1
    MarlonFan  
       2021-04-26 21:36:11 +08:00
    Guzzle 的 asnyc request 好像就是做这个的
    sun2920989
        2
    sun2920989  
    OP
       2021-04-26 21:58:56 +08:00
    @MarlonFan 还是有点区别的, Guzzle 和 curl_multi 给我的感觉差不多.当然我只是做了简单测试,最后并没有在项目用引用 Guzzle.
    sxbxjhwm
        3
    sxbxjhwm  
       2021-05-08 15:12:37 +08:00
    我前阵子也封装了一个。。不过是基于 curl_multi 的
    https://github.com/jshensh/php-curl-class#%E5%B9%B6%E5%8F%91%E8%AF%B7%E6%B1%82
    sun2920989
        4
    sun2920989  
    OP
       2021-05-08 15:17:25 +08:00
    @sxbxjhwm #3 之前考虑过,主要是觉得不够灵活,而且无法在逻辑里面向上跳出.所以才换了个思路用了这个方式.
    sxbxjhwm
        5
    sxbxjhwm  
       2021-05-08 15:19:56 +08:00
    @sun2920989 curl_multi_exec 执行完后直接干别的就可以了阿,判断和获取主要在这段上 https://github.com/jshensh/php-curl-class/blob/master/src/Multi.php#L110-L152
    sun2920989
        6
    sun2920989  
    OP
       2021-05-08 15:24:06 +08:00
    @sxbxjhwm #5 我说的向上跳出的意思是,某个方法需要对 http 响应的结果进行处理,但是又不希望在这个方法里就阻塞获取结果.这时候就需要用回调或 yield 向上跳出,如果使用 curl_multi 的话,我没有什么思路来处理这个场景.
    sxbxjhwm
        7
    sxbxjhwm  
       2021-05-08 15:28:25 +08:00
    @sun2920989 curl_multi_exec 不是阻塞的,后续的获取可以分开写,你是想单独先抽某个结果出来用还是?
    sxbxjhwm
        8
    sxbxjhwm  
       2021-05-08 15:35:46 +08:00
    @sun2920989 curl_multi_exec 通过第二个引用传参 &$still_running 来告诉你栈中还有多少正在执行的 curl 句柄。阻塞的是后续获取的那部分 while 段源码,它通过判断 $still_running 来确认有没有全部执行完,同时在 while 循环体内也有通过 curl_multi_exec 对 $still_running 重新赋值的操作,所以在什么阻塞获取是完全可以由你自己决定的。
    sun2920989
        9
    sun2920989  
    OP
       2021-05-08 15:37:07 +08:00
    @sxbxjhwm #7 简单来说,比如原有的业务流程有个方法,内容是请求一次数据,对这份数据进行一些逻辑处理,然后返回.此时如果需要多次调用这个方法,当然是每个请求顺序执行的.如果此时遇到业务效率瓶颈,期望这个方法是可以并发的,那么此时只实现 http 请求时的并发是不够的,必须要将整个方法并发,如果使用 curl_multi 之类的方案的话,必须要将原有的方法彻底重写来直接支持批量获取数据批量处理数据批量返回,对于现有业务代码这些修改是很大的,我选择的方案是保持这个方法每次还是只处理一次请求的逻辑,通过回调跳出的方式来实现.具体的我在我项目的 demo.php 中的最后一个示例简单举了个例子.
    sxbxjhwm
        10
    sxbxjhwm  
       2021-05-08 15:43:20 +08:00
    @sun2920989 你是希望预处理数据返回阿,那 curl_multi_info_read 和 curl_multi_getcontent 完全可以满足你的需求的,具体可以看这一段 https://github.com/jshensh/php-curl-class/blob/master/src/Multi.php#L118-L129 。我在这里就是将原有的 curl 句柄进行处理后放在待返回的数组里最后统一扔出来的。如果需要更细致的并发我可能就是采用 pcntl_ 系列方法处理了
    sun2920989
        11
    sun2920989  
    OP
       2021-05-08 15:50:06 +08:00
    @sxbxjhwm #10 其实并不能满足我的需求的,单单在 exec 之前要组织全部的请求,不能再后续添加,就无法应用在我这边的场景下.就像我刚才说的,除非经过彻底的改写原来的方法,直接支持批量参数过来,否则是无法直接使用 curl_multi 的.
    sun2920989
        12
    sun2920989  
    OP
       2021-05-08 15:55:20 +08:00
    @sxbxjhwm #10 之所以选择使用 tcp 连接,就是因为对于一条连接中,产生的每一次动作都是真实的,建连,发送数据,获取数据,执行了就会有效果.而对于 curl_multi 而言,我可以简单的理解为在 exec 之前,其实一切都没有发生,都是从 exec 开始才执行,那么,在一个原有的业务调用链中,什么时候进行 exec 就是一个无法解决的问题.
    sxbxjhwm
        13
    sxbxjhwm  
       2021-05-08 16:09:40 +08:00
    @sun2920989 你没看我封装的源码。。我实现了请求错误时的重试机制,原理就是在 exec 后继续使用 curl_multi_add_handle 向栈里推句柄实现的
    sun2920989
        14
    sun2920989  
    OP
       2021-05-08 16:20:16 +08:00
    @sxbxjhwm #13 但是在 exec 之后再次 add 的,就不是并发的了,只能在下一次 exec 执行了.
    sun2920989
        15
    sun2920989  
    OP
       2021-05-08 16:23:00 +08:00
    @sxbxjhwm #13 假设一个方法原来是发送一次请求的,改为使用 curl_multi 之后,是否要在方法内 exec,如果 exec 那么多次调用这个方法就不是并发的,如果不 exec,那么在什么时候 exec 呢,期望调用这个方法的人在调用一次或多次这个方法后再次主动调用一个执行 exec 的方法吗?
    sun2920989
        16
    sun2920989  
    OP
       2021-05-08 16:30:43 +08:00
    @sxbxjhwm #13 按照我在 demo 里面写的例子扩展一下,可以写出类似下面的代码

    function testGetData(){

    $result = Pool::call('GET','http://api.ipify.org/?format=json');

    $decode_function = function($result){

    return json_decode($result,true);

    };

    return new Decode($result,$decode_function);
    }

    $result1 = testGetData();

    $result2 = testGetData();

    Helper::wait($result1);

    Helper::wait($result2);


    这里面 testGetData 是可以被随意调用多次的,无需提前准备好全部调用,而且这个方法本身是可并发的,并且可以对返回值进行处理.如果使用 curl_multi 或者按照你的项目的示例的话,我不知道如何来实现这个效果.
    sun2920989
        17
    sun2920989  
    OP
       2021-05-08 16:41:18 +08:00
    @sxbxjhwm #13 还有一个问题是连接数量的限制,对于我们的业务场景而言,并发连接数是有上限的,比如 api 端并发限制设置为 10,我需要调用 50 次获取 50 个不同的数据,直接发送过去是必然要很多 503 的,直接使用原始 tcp 连接的话可以做到在固定的 tcp 连接数量的情况下来传递更多的请求.这个也是 curl_multi 很难控制的.
    sxbxjhwm
        18
    sxbxjhwm  
       2021-05-08 17:39:16 +08:00
    @sun2920989 curl_multi_exec 本来就是 while 在一直调用的,它只是启动这个栈中所有的 curl 句柄而已 https://www.php.net/manual/zh/function.curl-multi-exec.php ,限制并发相关的源码在我之前贴的 GitHub 链接里其实都有,如果你实际测试一下就会发现它也能很好做到并发控制与重试的。
    sun2920989
        19
    sun2920989  
    OP
       2021-05-08 18:12:28 +08:00
    @sxbxjhwm #18 我之前试用过,可能了解的不够深入,但是确实无法实现我的要求,示例代码上面已经贴了,我无法使用 curl_multi 来做到这个效果,而这种效果对于我的项目来说是必要的.所以如果可以实现,您可以试着贴一段示例代码我借鉴一下.
    sxbxjhwm
        20
    sxbxjhwm  
       2021-05-08 18:28:26 +08:00 via Android
    @sun2920989 等周一吧,我发个 gist
    sun2920989
        21
    sun2920989  
    OP
       2021-05-08 18:31:08 +08:00
    @sxbxjhwm #20 好的.
    sun2920989
        22
    sun2920989  
    OP
       2021-05-08 18:45:46 +08:00
    @sxbxjhwm #20 另外,在 curl_multi_select 返回值为-1 时,建议主动增加延迟.在某些情况下可以防止 cpu 使用率异常过高的问题.
    sun2920989
        23
    sun2920989  
    OP
       2021-05-08 19:00:32 +08:00
    @sxbxjhwm #20 一个可能可行的方案是,在一个 curl_multi 实例中只添加一个 curl 句柄,发送请求时执行 curl_multi_init,curl_init,curl_multi_add_handle 和 curl_multi_exec,读取结果时执行 curl_multi_select 和 curl_multi_getcontent 及关闭连接.也可以基本实现类似于 tcp 连接时将发送和读取分开的效果.经过包装后也可以实现我示例代码的效果.可是这样对于每一次请求,都需要一个 curl 实例和一个 curl_multi 实例.我觉得这不是一个好的思路.除此之外,我没有其他的思路.
    sun2920989
        24
    sun2920989  
    OP
       2021-05-08 19:43:59 +08:00
    已实测 23 楼的想法实际性能很差.
    sun2920989
        25
    sun2920989  
    OP
       2021-05-08 20:18:43 +08:00
    更换了一个思路,使用全局唯一的 curl_multi_init,每次请求时创建 curl_init 并添加,然后执行一个循环的 curl_multi_exec,获取数据时执行 curl_multi_select 和 curl_multi_getcontent.整体性能和我使用 tcp 连接基本差不多.但是有两个问题,一是此时等待数据时针对某一个变量进行等待其实已经没有意义,需要等待所有结果返回时第一个 wait 才能返回,然后后面的 wait 就是立即返回.二是此时没有很好的方式来控制并发数,因为我无法在一个 curl_init 中不获取返回值的情况下再传输一次 http 请求数据,所以此时如果判断超过了并发限制,只能报错,无法自动处理.而使用 tcp 时由于 http1.1 的管道,我可以不读取第一次的响应结果直接再次发送第二个请求,然后按照顺序来获取每一个响应.
    sun2920989
        26
    sun2920989  
    OP
       2021-05-08 20:57:14 +08:00 via iPhone
    找到一个参数,CURLMOPT_PIPELINING,也许有一些作用,周一尝试一下。感谢您的回复,让我对 curl 有了更多的理解。
    sxbxjhwm
        27
    sxbxjhwm  
       2021-05-10 02:26:47 +08:00
    @sun2920989 https://gist.github.com/jshensh/b7f05cc2e1ebf600a1eedbf4a30bc346

    我这边比较偷懒是直接在原来 Multi 类的基础上修改的,而且因为我原先的设计返回数据的 key 需要与传入的 key 对应,用了 next 直接操作数组内部指针,生怕追加任务指针会被 reset,所以就没有给 class 再加 push 方法,理论上后续再加,再次执行 exec 都是没问题的。我这边输出的结果很明显的展示了在执行 curl_multi_exec 后进行任意操作都不会影响已发出的请求,传入 callback 也可以预处理每一个返回值,至于并发控制这块,因为需要达到精准控制数量所以只能写在 while 段里所以不能很好的避免阻塞:

    root@debian:~/php-curl-class# php test.php
    Start: 1620584188.6943
    curl_multi_exec: 1620584188.6959
    sleep: 1620584191.6963
    test1 received data at: 1620584193.697
    test3 received data at: 1620584193.6971
    test4 received data at: 1620584193.6979
    test2 received data at: 1620584193.6989
    test5 received data at: 1620584198.6994
    test6 received data at: 1620584198.6994
    test7 received data at: 1620584198.7006
    test8 received data at: 1620584198.7009
    array(8) {
    ["test1"]=>
    string(40) "strlen: 259 data: float(1620584193.6961)"
    ["test3"]=>
    string(40) "strlen: 259 data: float(1620584193.6966)"
    ["test4"]=>
    string(40) "strlen: 259 data: float(1620584193.6969)"
    ["test2"]=>
    string(40) "strlen: 259 data: float(1620584193.6965)"
    ["test5"]=>
    string(40) "strlen: 259 data: float(1620584198.6985)"
    ["test6"]=>
    string(39) "strlen: 258 data: float(1620584198.699)"
    ["test7"]=>
    string(40) "strlen: 259 data: float(1620584198.6994)"
    ["test8"]=>
    string(37) "strlen: 256 data: float(1620584198.7)"
    }
    Done: 1620584198.7015
    sun2920989
        28
    sun2920989  
    OP
       2021-05-10 09:00:59 +08:00 via iPhone
    @sxbxjhwm 好的 感谢。我参考一下。
    sxbxjhwm
        29
    sxbxjhwm  
       2021-05-10 09:04:45 +08:00
    @sun2920989 curl_multi_exec 输出之前都可以抽出来作为 push 方法的内容,sleep 那部分是假装你执行自己需要的功能,sleep 之后是真正的 exec
    sun2920989
        30
    sun2920989  
    OP
       2021-05-10 09:09:35 +08:00 via iPhone
    @sxbxjhwm 嗯看到了。问题还是那个,如果请求不是一起添加进去的,而是一个一个添加的话,程序将要么在超过并发限制时报错,要么在这时阻塞。没有其他的好办法。虽然 http1.1 的管道是一个建议最好不要使用或者少使用的特性,但是确实可以解决我遇到的问题。
    sxbxjhwm
        31
    sxbxjhwm  
       2021-05-10 09:53:53 +08:00
    @sun2920989 如果是我处理这样的需求,可能会选择直接用 cli 起个 daemon 进程,用 redis 负责队列,cli 自己从 redis 取任务 + 控制并发,这样通用性更强一些
    sun2920989
        32
    sun2920989  
    OP
       2021-05-10 10:09:56 +08:00
    @sxbxjhwm #31 我这边需要在 fpm 环境来实现这些效果.加速一些页面的打开效率.所以不方便使用后台脚本的方式.除非再做一次 fpm 进程与后台脚本之间的信息交互,总之还是非常感谢您的持续回复,极大的拓宽了我对 curl_multi 的了解.
    sxbxjhwm
        33
    sxbxjhwm  
       2021-05-10 10:21:43 +08:00
    @sun2920989 阿我原先还以为你做的是一个采集爬虫,没想到是 fpm 。。curl_multi_exec 这东西其实本身是异步的,只不过你需要在同步的 php 里调用只能在适当的地方写阻塞,其实不光是 curl,其他类似的队列要实现并发控制也得用阻塞的方法来控制,php 不像 node 一类的语言,可以通过触发事件来实现完全的异步,这个是最大的问题
    sun2920989
        34
    sun2920989  
    OP
       2021-05-10 10:30:50 +08:00
    @sxbxjhwm #33 是的,php-fpm 本身是同步的,总有一个地方要阻塞来获取网络请求的结果.
    sxbxjhwm
        35
    sxbxjhwm  
       2021-05-10 10:43:29 +08:00
    @sun2920989 之前做过一个 cli 的爬虫,当时是用 swoole 解决的。做法就是专门起一个负责 task 的进程,用 \Swoole\Server 监听一个 sock 文件,具体文档 https://wiki.swoole.com/#/consts?id=socket-%e7%b1%bb%e5%9e%8b 。做的时候有些偷懒没有用 redis,而是直接用 \Swoole\Table 做基础的共享数据存储,sock 负责队列控制。虽然用 swoole 实现在语法上很好看,用起来却比 curl_multi_exec 要麻烦得多,后续再也没有用过这个方案,在这边讨论这个方案稍微有点超纲,算是之前踩过的坑提醒一下,附一个当时写的部分源码:

    ```php
    protected function execute(Input $input, Output $output)
    {
    /**
    * $initDataTable 基础配置表,用于实例化爬虫类
    * |------------------------------------|
    * | key | value |
    * |------------------------------------|
    * | service | service |
    * | baseUrl | https:// |
    * | cookieJar | json_encode(cookieJar) |
    * |------------------------------------|
    */
    $initDataTable = new \Swoole\Table(4);
    $initDataTable->column('value', \Swoole\Table::TYPE_STRING, 512);
    $initDataTable->create();

    /**
    * $acquiredListTable 记录已由 getList 方法执行过的操作
    * |------------------------------|
    * | key | value |
    * |------------------------------|
    * | getDatacenters | 0 |
    * | getIpsGroupList | 0 |
    * | getIpsList | 0 |
    * | getSwitchList | 0 |
    * | getHardwareModelList | 0 |
    * | getPurchaseList | 0 |
    * | getServerList | 0 |
    * | getHardwareList | 0 |
    * |------------------------------|
    */
    $acquiredListTable = new \Swoole\Table(16);
    $acquiredListTable->column('count', \Swoole\Table::TYPE_INT, 1);
    $acquiredListTable->create();

    $workingAtomic = new \Swoole\Atomic();
    $successedAtomic = new \Swoole\Atomic();
    $failedAtomic = new \Swoole\Atomic();

    $serv = new \Swoole\Server(Env::get('runtime_path') . 'task.sock', 0, SWOOLE_PROCESS, SWOOLE_SOCK_UNIX_STREAM);
    $serv->table = ['initData' => $initDataTable, 'acquiredList' => $acquiredListTable];
    $serv->atomic = ['working' => $workingAtomic, 'successed' => $successedAtomic, 'failed' => $failedAtomic];

    $serv->set(array('task_worker_num' => 15));

    $serv->on('receive', function($serv, $fd, $from_id, $data) use ($output) {
    $this->receive($serv, $fd, $from_id, $data, $output);
    });

    $serv->on('task', function ($serv, $task_id, $from_id, $data) use ($output) {
    $this->task($serv, $task_id, $from_id, $data, $output);
    });

    $serv->on('finish', function ($serv, $task_id, $data) use ($output) {
    $this->finish($serv, $task_id, $data, $output);
    });

    $serv->start();
    }
    ```
    sun2920989
        36
    sun2920989  
    OP
       2021-05-10 11:00:17 +08:00
    @sxbxjhwm #35 好的.我了解一下.
    sun2920989
        37
    sun2920989  
    OP
       2021-05-10 11:15:09 +08:00
    @sxbxjhwm #35 找到三个参数,按照 libcurl 文档的描述,搭配使用可以实现通过 http1.1 管道来控制并发的效果.CURLMOPT_PIPELINING,CURLMOPT_MAX_HOST_CONNECTIONS,CURLMOPT_MAX_PIPELINE_LENGTH.但是这三个参数如果搭配使用对 curl 的版本有着严格的要求,需要大于等于 7.30 小于 7.62.我这边版本和线上一致不符合这个区间.所以没有测试出全部效果.不过设置 CURLMOPT_PIPELINING 为 1 来启用管道是可以实现的.
    sun2920989
        38
    sun2920989  
    OP
       2021-05-10 11:17:32 +08:00
    @sxbxjhwm #35 测试时还发现一个有意思的事情,在设置了启用管道时,我的倾向是期望在没有超过并发数之前尽可能使用新的连接,虽然建连也有一点损耗但是由于获取数据比较慢,所以也会比复用已有连接更快.但是 curl 库的默认方式似乎是优先尽可能复用已有的链接,直到一条连接上传输的请求超过阈值之后,才会去开一个新的连接.鉴于这个倾向的不同,所以即使我的 curl 版本符合这个区间,也不是我期望的效果.
    sxbxjhwm
        39
    sxbxjhwm  
       2021-05-10 11:48:37 +08:00
    @sun2920989 我其实有一点不理解,既然是同步的 fpm,为什么不等到获取数据后统一再处理呢,效率上没差的
    sun2920989
        40
    sun2920989  
    OP
       2021-05-10 13:16:52 +08:00 via iPhone
    @sxbxjhwm 主要是使我的工具类可以在项目内尽量通用,尽量复用现有逻辑。而不是将每个地方都做很多修改。
    sun2920989
        41
    sun2920989  
    OP
       2021-05-10 16:27:28 +08:00
    一个新的思路是,既然 curl_multi 的管道复用,会尽可能的使用已有连接,那么对于未超过限制的请求,每次创建一个 curl_multi_init 和 curl_init,并打开 curl_multi 的管道复用,对于超过限制的请求,就使用已存在的 curl_multi_init 再次发送一个新的 curl_init.至此,已经基本可以使用 curl_multi 实现我之前用 tcp 模拟时实现的全部功能.唯一的问题和之前测试结果一样,多个 curl_multi_init 实例同时存在同时执行时效率较差,比使用 tcp 模拟或使用全局唯一的 curl_multi_init 效率下降非常多.
    @sxbxjhwm #39 仅供参考.
    sxbxjhwm
        42
    sxbxjhwm  
       2021-05-10 16:49:01 +08:00
    @sun2920989 我的例子并没有用到多个 curl_multi_init 的实例,curl_multi_init 返回的是一个栈,你就把它当成一个队列,所有 curl_multi_exec 都在复用这个栈,如果你有需求大可以多创建几个队列,但这样显然没有意义。
    sun2920989
        43
    sun2920989  
    OP
       2021-05-10 16:51:18 +08:00
    @sxbxjhwm #42 只是为了做不阻塞情况下的并发数量限制.
    sxbxjhwm
        44
    sxbxjhwm  
       2021-06-02 10:29:09 +08:00
    sun2920989
        45
    sun2920989  
    OP
       2021-06-02 10:53:10 +08:00
    @sxbxjhwm #44 好的 谢谢.
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2809 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 11:36 · PVG 19:36 · LAX 03:36 · JFK 06:36
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.