V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
kerie
V2EX  ›  问与答

请教一个 Flink SQL 的问题,解决了星巴克感谢

  •  
  •   kerie · 2023-06-12 11:17:41 +08:00 · 2261 次点击
    这是一个创建于 540 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我是一个 Flink 小白,最近有一个监控需求,想使用 Flink SQL 实现,但很多概念还没搞清楚,遇到一个问题卡壳了,在论坛里寻 Flink 大佬指点一二,解决了送一杯星巴克作为感谢!

    Flink SQL 官网用客户(customer)和订单(order)举例,但都每分钟统计流表每个客户订单的数量。我的需求是每分钟统计维表全量每个客户订单的数量,也就是就算这一分钟某个客户没有下单,也需要统计一个 0 出来,用于做监控报警。

    为了不暴露业务需求,调整为客户和订单的场景,如果有不恰当的地方还请指出,我再补充,SQL 如下:

    CREATE TEMPORARY TABLE customers (
        id INT,
        name STRING
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://....'
    );
    
    CREATE TEMPORARY TABLE orders (
        order_id     STRING,
        customer_id  INT,
        order_time   TIMESTAMP(3),
        WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = '...'
    );
    
    CREATE TEMPORARY VIEW order_per_minute AS
    SELECT
        customer_id,
        count(*) as cnt,
        TUMBLE_END(order_time, INTERVAL '1' MINUTE) AS window_end
    FROM orders
    GROUP BY customer_id, TUMBLE(tstamp, INTERVAL '1' MINUTE);
    
    INSERT INTO destination
    SELECT
        COALESCE(window_end, CURRENT_TIMESTAMP),
        customer_id,
        COALESCE(cnt, 0),
    FROM
        customers LEFT JOIN order_per_minute
            ON customers.id = order_per_minute.customer_id;
    

    实际执行上面的代码有问题,比如说有 3 个客户 c1/c2/c3 ,但只有 2 个客户 c1/c2 每分钟都下单, 第一次执行结果是对的:

    10:01, c1, 19  
    10:01, c2, 32  
    10:01, c3, 0
    

    随后每分钟的数据,就会少掉 c3 的结果:

    10:02, c1, 18  
    10:02, c2, 22 // c3 没有输出
    10:03, c1, 18  
    10:03, c2, 22 // c3 没有输出
    

    我也不清楚 Flink SQL 能否这么用吗,还是得用 DataStream API 解决?请论坛的 Flink 大佬帮忙看一下,感谢!

    第 1 条附言  ·  2023-06-12 13:09:59 +08:00
    为了防止 customer 、order 场景误导,这里说一下实际场景:
    实际是要监控 IoT 设备的数据推送,IoT 设备全量表是预先初始化好的,保存在 MySQL 数据表; IoT 数据推送是一个 kafka 流消息,理论上每个 IoT 设备每秒都有数据。需求是监控哪些 IoT 设备数据是中断或缺失的。
    32 条回复    2023-06-13 05:19:39 +08:00
    sijue
        1
    sijue  
       2023-06-12 11:47:53 +08:00   ❤️ 1
    上面 left join 是 regural join ,没有数据可能是因为 customer 中 c3 数据在 10:02 和 10:03 没有记录,建议使用 lookup join
    sijue
        2
    sijue  
       2023-06-12 11:49:36 +08:00
    liprais
        3
    liprais  
       2023-06-12 11:51:21 +08:00   ❤️ 1
    你应该用用户表左关联
    kerie
        4
    kerie  
    OP
       2023-06-12 11:55:01 +08:00
    @sijue 对的,上面的 left join 是 regular join ,没有数据是因为 customer 表没有时间戳信息,它只是一个全量客户的表,没有带时间戳信息。
    lookup join 有上面一样的问题,不能输出没有 order 数据的 customer 统计。
    kerie
        5
    kerie  
    OP
       2023-06-12 11:57:55 +08:00
    @liprais 我用的 customers left join order_per_minute ,但不能统计出没有 order 的 customer
    liprais
        6
    liprais  
       2023-06-12 11:58:23 +08:00
    "If a table function call returns an empty result, the corresponding outer row is preserved, and the result padded with null values. Currently, a left outer join against a lateral table requires a TRUE literal in the ON clause."
    kerie
        7
    kerie  
    OP
       2023-06-12 12:08:03 +08:00
    @liprais 这里的`table function`指的是 udf (用户自定义函数)吗,不使用 udf 可以吗
    sijue
        8
    sijue  
       2023-06-12 12:25:57 +08:00
    1.有个治标的方式:维表初始化目前存量用户数据,订单数为 0 ,订单表一般是注册之后才能下单;
    2.有个疑问点:有订单,用户表和订单表都会存数据,是否存在订单数据比用户数据迟到的点
    t3zb2xzvjm4yvmn
        9
    t3zb2xzvjm4yvmn  
       2023-06-12 13:01:37 +08:00   ❤️ 1
    实现过类似的需求,首先 lookup join 肯定是不行了,事实流中没有出现的用户肯定关联不到的。

    比较彻底的解决方案是使用 datastream API ,process function 。还要状态编程,因为需要将维表的状态自己维护,比如把状态放到一个 tuple2 里,t0 是 customer id ,t1 给默认值 0 ,然后拿另外一个流的 element ,每来一条就给 t1+1 ,窗口触发时把所有的 tuple2 向下游发送。

    需要额外考虑的一点是,有可能某个时间窗口内 1 个下单的都没有(比如半夜),那么该窗口无法触发,没有任何输出,所以默认的滚动窗口\事件时间语义就不太行。还需要再实现一个窗口,事件时间、处理时间混合语义,保证即使没有事件仍然可以触发,输出所有用户下单数均为 0 的情况。

    Flink SQL 没有实现过,但是有一个简单粗暴的想法,定时把维表的数据全量发到 flink ,构造出所有包含用户的事实流,这样你只需要改造 customers 维表那里就够了。
    缺点是需要不断地读维表,对 MySQL 增加压力; source 端不断地向下游发维度信息,实际上不符合事件驱动和流式计算的原则。

    我猜测这个需求数据量不大,实时性要求也没那么高,使用 spark streaming 可能是更好的选择。
    kerie
        10
    kerie  
    OP
       2023-06-12 13:03:24 +08:00
    @sijue 1. 不行,需求就是监控哪些用户没有下订单(这里的场景我做了更改,实际是监控哪些 IoT 设备没有上传数据,我在主题里也 append 更新下)
    2. 用户表是提前全量初始化好的,不考虑更新问题,用户表没有时间信息;订单表,理论每个用户每分钟都有数据。
    David1119
        11
    David1119  
       2023-06-12 13:22:19 +08:00   ❤️ 1
    参考 9 楼的,process function 最方便,sql 做基础 etl 没问题,复杂一点的逻辑用 datastream 更灵活方便,存一下上一秒的 state ,做比较,可以判断类似连续 2 秒没数据然后报警推送,甚至连续 2s 没数据,但是 5s 内能上传上来就算正常这样的场景,随意发挥
    kerie
        12
    kerie  
    OP
       2023-06-12 13:24:19 +08:00
    @t3zb2xzvjm4yvmn 我感觉看到希望了,我的数据量不大,你知道如何粗暴的每分钟把维表的数据全量发到 flink ?
    用 DataStream API ,你说的额外考虑的场景暂时不用考虑,我们是 IoT 设备,理论不会出现 1 分钟全部设备没有数据的情况,后续优化再额外每分钟加一些 mock 数据,强制触发窗口。
    kerie
        13
    kerie  
    OP
       2023-06-12 13:28:49 +08:00
    @David1119 我再挣扎下,新手感觉 Flink SQL 更直观一些,但如果实在做不了就放弃 SQL 的方案,转 DataStream API process function 。
    fuyufjh
        14
    fuyufjh  
       2023-06-12 13:42:19 +08:00   ❤️ 1
    这是一个典型的 micro batch 需求吧,1 分钟执行一次。用时间条件做过滤能起到很好的过滤效果,执行很快的
    kerie
        15
    kerie  
    OP
       2023-06-12 13:51:57 +08:00
    @fuyufjh 大佬能讲一下用 Flink 具体怎么做吗
    t3zb2xzvjm4yvmn
        16
    t3zb2xzvjm4yvmn  
       2023-06-12 14:16:55 +08:00   ❤️ 1
    @kerie Flink SQL 好像没有现成的方法,可以自定义 source table ,你研究一下吧
    或者不在 Flink 里做,在外部写一个 Java/Python 程序用 JDBC 和 kafka API ,定时把数据推到 kafka ,用 Flink SQL 接 kafka 就比较方便了。
    kerie
        17
    kerie  
    OP
       2023-06-12 14:26:33 +08:00
    @t3zb2xzvjm4yvmn 我研究一下,感谢!
    leonhao
        18
    leonhao  
       2023-06-12 14:28:29 +08:00   ❤️ 1
    Flink SQL 无法实现,需要自己写 stream api 。如果可能出现这种情况,根本不需要 Flink ,把数据写到数据库在算就行
    kerie
        19
    kerie  
    OP
       2023-06-12 14:43:41 +08:00
    @leonhao 数据都写到数据库,还是得一个额外的程序,每分钟做一次计算吧
    leonhao
        20
    leonhao  
       2023-06-12 14:55:09 +08:00
    @kerie 用 timescaledb 之类的时间序列数据库,使用物化视图,每分钟聚合一次,都是自带的功能,很 Flink 简单多了
    kerie
        21
    kerie  
    OP
       2023-06-12 15:10:49 +08:00
    @leonhao 谢谢,时序数据库虽然有物化视图功能,但时序数据库本身不能做监控报警,还是需要外部程序做定时查询,将聚合数据推送给监控报警系统。
    weakbd
        22
    weakbd  
       2023-06-12 15:19:14 +08:00   ❤️ 1
    时间语义试着用 ProcessingTime,再结合滚动窗口去实现你的操作
    leonhao
        23
    leonhao  
       2023-06-12 15:27:26 +08:00
    @kerie grafana
    kerie
        24
    kerie  
    OP
       2023-06-12 15:31:26 +08:00
    @weakbd Flink 的时间属性和 join 语法理解有点困难,可以给个示例,我测试一把吗
    fuyufjh
        25
    fuyufjh  
       2023-06-12 15:33:54 +08:00   ❤️ 2
    @kerie 我是觉得不应该用 Flink 做。Streaming 擅长增量计算,这个问题本质是一个全量计算,“全量”指最近 1min 的全量数据。

    我觉得用数仓、定时 1min 查一次是最合适的。

    Flink 的理念是 event-driven ,反过来说,如果没有 event 则不该触发任何计算。举个简化的例子,如果这 1min 内没有任何订单,你应该也希望得到一个全部 count=0 的结果吧(而不是什么也不输出),那么这就与 event-driven 相违背了。
    BiggerLonger
        26
    BiggerLonger  
       2023-06-12 16:18:35 +08:00 via iPad   ❤️ 1
    要不时时 time window ?
    weakbd
        27
    weakbd  
       2023-06-12 16:50:19 +08:00   ❤️ 1
    #25 是的,仔细看了下 op 的需求,你不应该以你的维表为基准,以实现 count=0 的结果,在 flink sql 中我是没想到能实现的方式。不过 datastream api 更灵活是可以做到的,你可以尝试下试试使用 AggregateFunction 实现你的需求呢
    kerie
        28
    kerie  
    OP
       2023-06-12 17:27:38 +08:00
    @fuyufjh 感谢答复,你对 event-driven 的理解让我很有启发,单这个需求来看确实用数仓更合适一些,
    @weakbd 也感谢你
    kerie
        29
    kerie  
    OP
       2023-06-12 17:31:21 +08:00
    感谢各位 xdjm 的热心答复,我最终决定采纳 @t3zb2xzvjm4yvmn 的方案,大佬留下 vx ,我转你一杯咖啡的钱
    t3zb2xzvjm4yvmn
        30
    t3zb2xzvjm4yvmn  
       2023-06-12 18:08:37 +08:00
    @kerie QnVubnk1NDE=
    kerie
        31
    kerie  
    OP
       2023-06-12 18:21:16 +08:00
    alwaysdazz
        32
    alwaysdazz  
       2023-06-13 05:19:39 +08:00 via Android   ❤️ 1
    经典维表驱动流表的案例,之前遇到过该问题,用 Python udf 解决掉的。。。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1155 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 18:47 · PVG 02:47 · LAX 10:47 · JFK 13:47
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.