V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
uoddsa
V2EX  ›  PHP

redis 队列推送消息的疑问🤔️

  •  
  •   uoddsa · 2019-05-09 15:58:16 +08:00 · 6909 次点击
    这是一个创建于 2054 天前的主题,其中的信息可能已经有所发展或是发生改变。

    有个需求,需要对系统的用户进行全局推送,包含定时推送。 因为怕重复推送了所以打算是待推送的任务发在 redis list 里面。定时任务去队列取未推送的任务。可是每次取多少,怎么取。 还是我要用发布订阅。

    第 1 条附言  ·  2019-05-09 17:32:51 +08:00
    问题 ,怎么取队列里面的消息 while(true) ?还是有其他好的方法。
    29 条回复    2019-05-10 10:52:47 +08:00
    coffeSlider
        1
    coffeSlider  
       2019-05-09 16:10:31 +08:00
    取多少,怎么取不看你自己的业务吗?如果 job 是单线程,取数据就直接用 rpoplpush。
    rpdict
        2
    rpdict  
       2019-05-09 16:44:33 +08:00
    用的 redis 的 zset,按时间排序的有序集合,取的时候就取 0~当前时间戳对应的值就行了
    yxjn
        3
    yxjn  
       2019-05-09 16:52:25 +08:00
    redis 队列的缺点是需要自己写很多的异常补偿机制。毕竟 redis 本身不是作为一个完整的队列功能而存在的。
    julyclyde
        4
    julyclyde  
       2019-05-09 16:56:04 +08:00
    @rpdict zset 很容易慢的
    iyaozhen
        5
    iyaozhen  
       2019-05-09 16:56:10 +08:00 via Android
    redis 的 pub/sub 严格说不是个队列,是广播,无法并发消费

    用 list 就行,左进右出,一条条处理呗,可以多进程。还可以再建个重发队列,失败的丢进去。
    reus
        6
    reus  
       2019-05-09 17:04:40 +08:00
    如果 redis 崩了呢?你怎么知道那些发过哪些没发过?
    rpdict
        7
    rpdict  
       2019-05-09 17:12:45 +08:00
    @julyclyde 有序集合的好处就是读取的时候读第一个,如果时间没到就不用接着读了,节约的时间在这里
    julyclyde
        8
    julyclyde  
       2019-05-09 17:25:39 +08:00
    @rpdict 我是指 zset 的排序速度慢。尤其是没按顺序插入的时候
    uoddsa
        9
    uoddsa  
    OP
       2019-05-09 17:32:33 +08:00
    @iyaozhen 问题就在这里 怎么取队列里面的消息 while(true) ?还是有其他好的方法。
    yxjn
        10
    yxjn  
       2019-05-09 17:38:25 +08:00
    @uoddsa blpop 看看能不能满足你的需求
    lestat
        11
    lestat  
       2019-05-09 17:42:52 +08:00
    @rpdict laravel 里面的延时队列好像就是这么设计的
    rpdict
        12
    rpdict  
       2019-05-09 17:54:58 +08:00
    @uoddsa 是一个取舍,就好像数据库加了索引插入数据就会慢一些,但是读取会快很多,看需求是要更快的插入速度还是更准确的发送时间吧,有序集合的好处就是不用遍历所有,无序的好处就是插入快
    rpdict
        13
    rpdict  
       2019-05-09 17:57:26 +08:00
    @lestat 我是参考了有赞的延时队列,感觉大家做法都差不多?感谢回复,我去看看 laravel 怎么实现的
    Evilk
        14
    Evilk  
       2019-05-09 18:29:37 +08:00
    @yxjn 同意,redis 还是适合缓存,需要队列的话,还是选择专业的吧,比如 rabbitMQ
    strive
        15
    strive  
       2019-05-09 18:39:13 +08:00
    可以用个存储过程把要推送的用户和消息放到任务表里面,再把任务表里面数据放到 redis 的 list 里面拿出来处理就可以了
    brickyang
        16
    brickyang  
       2019-05-09 18:44:47 +08:00 via iPhone
    lovedebug
        17
    lovedebug  
       2019-05-09 18:58:27 +08:00 via Android
    这种需求用 azure service bus 更好吧。redis 是有丢消息风险的。用 kafka 都好很多。
    ericliu001
        18
    ericliu001  
       2019-05-09 19:08:25 +08:00
    lpoprpush 看下这个命令
    runnerlee
        19
    runnerlee  
       2019-05-09 19:35:27 +08:00   ❤️ 2
    laravel 的做法是同时维护三个队列: 主队列 (list), 备份队列 (reserved, zset) , 延时队列 (delayed, zset).

    消息从 list 里 lpop 出来之后会根据超时时间再次存放到备份队列里去, 这个操作用 lua 实现:

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L54
    ```
    -- Pop the first job off of the queue...
    local job = redis.call('lpop', KEYS[1])
    local reserved = false
    if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', KEYS[2], ARGV[1], reserved)
    redis.call('lpop', KEYS[3])
    end
    return {job, reserved}
    ```

    而在从主队列 pop 之前, 会根据当前时间从备份队列和延时队列两个 zset 中取出消息 rpush 到主队列中.

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/RedisQueue.php#L167

    同样也是使用 lua 进行操作
    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L105
    ```
    -- Get all of the jobs with an expired "score"...
    local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

    -- If we have values in the array, we will remove them from the first queue
    -- and add them onto the destination queue in chunks of 100, which moves
    -- all of the appropriate jobs onto the destination queue very safely.
    if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
    redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
    -- Push a notification for every job that was migrated...
    for j = i, math.min(i+99, #val) do
    redis.call('rpush', KEYS[3], 1)
    end
    end
    end

    return val
    ```

    同时为了避免重复消费, 在消息消费成功后, 会手动从备份队列删除备份消息.

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Jobs/RedisJob.php#L84

    在每次 pop 出消息并进行消费之前, 会注册一个 timeoutHandler, 通过计时器来实现中断超时任务

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L111

    所以, 当消费过程中发生异常退出或是超时中断后, 会根据重试时间, 从备份队列里面取出备份消息重新消费.
    iyaozhen
        20
    iyaozhen  
       2019-05-09 19:40:58 +08:00 via Android
    @uoddsa 死循环就行了,做好异常处理。后台运行呗
    runnerlee
        21
    runnerlee  
       2019-05-09 19:54:43 +08:00   ❤️ 1
    忘了补充一个细节,

    在用 lua 调用 lpop 之后, 会将消息 json decode 出来然后自增 attempts 字段, 再放到备份队列.

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L62

    这样是为了实现最大重试次数, 当失败到配置的最大次数之后, 会把消息保存到 mysql 后从 redis 里丢弃掉.
    fishioon
        22
    fishioon  
       2019-05-09 20:12:32 +08:00
    可以考虑下 redis 5.0 中的 stream 结构
    ihipop
        23
    ihipop  
       2019-05-09 23:56:26 +08:00 via Android
    NSQ
    scnace
        24
    scnace  
       2019-05-10 00:25:47 +08:00 via Android
    di.....disque ?
    zk123
        25
    zk123  
       2019-05-10 07:59:15 +08:00 via iPhone
    发布订阅模式的数据可靠性不保证,它的数据不保存到快照或者 aof 中,发布即焚,也没有 ack 机制,不适用这样消息队列场景。

    List 队列模式比较适合,不过自己要补偿做许多 ack 以及失败机制。建议还是考虑 MQ 或者其他 push。
    zchlwj
        26
    zchlwj  
       2019-05-10 08:54:43 +08:00
    redis 消息队列不存盘的哦,这种场景还是考虑 mq 把
    polebug
        27
    polebug  
       2019-05-10 09:03:58 +08:00 via Android
    我上次写业务 也是用的 zset 慢点无所谓 反正并发推送
    fuxinya
        28
    fuxinya  
       2019-05-10 10:13:41 +08:00 via Android
    如果是 spring 项目可以去看看 redisson
    ducklyl
        29
    ducklyl  
       2019-05-10 10:52:47 +08:00
    别用 redis 做队列,用 mq 或 rq
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2932 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 13:31 · PVG 21:31 · LAX 05:31 · JFK 08:31
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.