我是一个 Flink 小白,最近有一个监控需求,想使用 Flink SQL 实现,但很多概念还没搞清楚,遇到一个问题卡壳了,在论坛里寻 Flink 大佬指点一二,解决了送一杯星巴克作为感谢!
Flink SQL 官网用客户(customer)和订单(order)举例,但都每分钟统计流表每个客户订单的数量。我的需求是每分钟统计维表全量每个客户订单的数量,也就是就算这一分钟某个客户没有下单,也需要统计一个 0 出来,用于做监控报警。
为了不暴露业务需求,调整为客户和订单的场景,如果有不恰当的地方还请指出,我再补充,SQL 如下:
CREATE TEMPORARY TABLE customers (
id INT,
name STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://....'
);
CREATE TEMPORARY TABLE orders (
order_id STRING,
customer_id INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '...'
);
CREATE TEMPORARY VIEW order_per_minute AS
SELECT
customer_id,
count(*) as cnt,
TUMBLE_END(order_time, INTERVAL '1' MINUTE) AS window_end
FROM orders
GROUP BY customer_id, TUMBLE(tstamp, INTERVAL '1' MINUTE);
INSERT INTO destination
SELECT
COALESCE(window_end, CURRENT_TIMESTAMP),
customer_id,
COALESCE(cnt, 0),
FROM
customers LEFT JOIN order_per_minute
ON customers.id = order_per_minute.customer_id;
实际执行上面的代码有问题,比如说有 3 个客户 c1/c2/c3 ,但只有 2 个客户 c1/c2 每分钟都下单, 第一次执行结果是对的:
10:01, c1, 19
10:01, c2, 32
10:01, c3, 0
随后每分钟的数据,就会少掉 c3 的结果:
10:02, c1, 18
10:02, c2, 22 // c3 没有输出
10:03, c1, 18
10:03, c2, 22 // c3 没有输出
我也不清楚 Flink SQL 能否这么用吗,还是得用 DataStream API 解决?请论坛的 Flink 大佬帮忙看一下,感谢!
1
sijue 2023-06-12 11:47:53 +08:00 1
上面 left join 是 regural join ,没有数据可能是因为 customer 中 c3 数据在 10:02 和 10:03 没有记录,建议使用 lookup join
|
2
sijue 2023-06-12 11:49:36 +08:00
补充:维表 join 常用 look up join ,参考: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/
|
3
liprais 2023-06-12 11:51:21 +08:00 1
你应该用用户表左关联
|
4
kerie OP @sijue 对的,上面的 left join 是 regular join ,没有数据是因为 customer 表没有时间戳信息,它只是一个全量客户的表,没有带时间戳信息。
lookup join 有上面一样的问题,不能输出没有 order 数据的 customer 统计。 |
5
kerie OP @liprais 我用的 customers left join order_per_minute ,但不能统计出没有 order 的 customer
|
6
liprais 2023-06-12 11:58:23 +08:00
"If a table function call returns an empty result, the corresponding outer row is preserved, and the result padded with null values. Currently, a left outer join against a lateral table requires a TRUE literal in the ON clause."
|
8
sijue 2023-06-12 12:25:57 +08:00
1.有个治标的方式:维表初始化目前存量用户数据,订单数为 0 ,订单表一般是注册之后才能下单;
2.有个疑问点:有订单,用户表和订单表都会存数据,是否存在订单数据比用户数据迟到的点 |
9
t3zb2xzvjm4yvmn 2023-06-12 13:01:37 +08:00 1
实现过类似的需求,首先 lookup join 肯定是不行了,事实流中没有出现的用户肯定关联不到的。
比较彻底的解决方案是使用 datastream API ,process function 。还要状态编程,因为需要将维表的状态自己维护,比如把状态放到一个 tuple2 里,t0 是 customer id ,t1 给默认值 0 ,然后拿另外一个流的 element ,每来一条就给 t1+1 ,窗口触发时把所有的 tuple2 向下游发送。 需要额外考虑的一点是,有可能某个时间窗口内 1 个下单的都没有(比如半夜),那么该窗口无法触发,没有任何输出,所以默认的滚动窗口\事件时间语义就不太行。还需要再实现一个窗口,事件时间、处理时间混合语义,保证即使没有事件仍然可以触发,输出所有用户下单数均为 0 的情况。 Flink SQL 没有实现过,但是有一个简单粗暴的想法,定时把维表的数据全量发到 flink ,构造出所有包含用户的事实流,这样你只需要改造 customers 维表那里就够了。 缺点是需要不断地读维表,对 MySQL 增加压力; source 端不断地向下游发维度信息,实际上不符合事件驱动和流式计算的原则。 我猜测这个需求数据量不大,实时性要求也没那么高,使用 spark streaming 可能是更好的选择。 |
10
kerie OP @sijue 1. 不行,需求就是监控哪些用户没有下订单(这里的场景我做了更改,实际是监控哪些 IoT 设备没有上传数据,我在主题里也 append 更新下)
2. 用户表是提前全量初始化好的,不考虑更新问题,用户表没有时间信息;订单表,理论每个用户每分钟都有数据。 |
11
David1119 2023-06-12 13:22:19 +08:00 1
参考 9 楼的,process function 最方便,sql 做基础 etl 没问题,复杂一点的逻辑用 datastream 更灵活方便,存一下上一秒的 state ,做比较,可以判断类似连续 2 秒没数据然后报警推送,甚至连续 2s 没数据,但是 5s 内能上传上来就算正常这样的场景,随意发挥
|
12
kerie OP @t3zb2xzvjm4yvmn 我感觉看到希望了,我的数据量不大,你知道如何粗暴的每分钟把维表的数据全量发到 flink ?
用 DataStream API ,你说的额外考虑的场景暂时不用考虑,我们是 IoT 设备,理论不会出现 1 分钟全部设备没有数据的情况,后续优化再额外每分钟加一些 mock 数据,强制触发窗口。 |
13
kerie OP @David1119 我再挣扎下,新手感觉 Flink SQL 更直观一些,但如果实在做不了就放弃 SQL 的方案,转 DataStream API process function 。
|
14
fuyufjh 2023-06-12 13:42:19 +08:00 1
这是一个典型的 micro batch 需求吧,1 分钟执行一次。用时间条件做过滤能起到很好的过滤效果,执行很快的
|
16
t3zb2xzvjm4yvmn 2023-06-12 14:16:55 +08:00 1
@kerie Flink SQL 好像没有现成的方法,可以自定义 source table ,你研究一下吧
或者不在 Flink 里做,在外部写一个 Java/Python 程序用 JDBC 和 kafka API ,定时把数据推到 kafka ,用 Flink SQL 接 kafka 就比较方便了。 |
17
kerie OP @t3zb2xzvjm4yvmn 我研究一下,感谢!
|
18
leonhao 2023-06-12 14:28:29 +08:00 1
Flink SQL 无法实现,需要自己写 stream api 。如果可能出现这种情况,根本不需要 Flink ,把数据写到数据库在算就行
|
20
leonhao 2023-06-12 14:55:09 +08:00
@kerie 用 timescaledb 之类的时间序列数据库,使用物化视图,每分钟聚合一次,都是自带的功能,很 Flink 简单多了
|
21
kerie OP @leonhao 谢谢,时序数据库虽然有物化视图功能,但时序数据库本身不能做监控报警,还是需要外部程序做定时查询,将聚合数据推送给监控报警系统。
|
22
weakbd 2023-06-12 15:19:14 +08:00 1
时间语义试着用 ProcessingTime,再结合滚动窗口去实现你的操作
|
25
fuyufjh 2023-06-12 15:33:54 +08:00 2
@kerie 我是觉得不应该用 Flink 做。Streaming 擅长增量计算,这个问题本质是一个全量计算,“全量”指最近 1min 的全量数据。
我觉得用数仓、定时 1min 查一次是最合适的。 Flink 的理念是 event-driven ,反过来说,如果没有 event 则不该触发任何计算。举个简化的例子,如果这 1min 内没有任何订单,你应该也希望得到一个全部 count=0 的结果吧(而不是什么也不输出),那么这就与 event-driven 相违背了。 |
26
BiggerLonger 2023-06-12 16:18:35 +08:00 via iPad 1
要不时时 time window ?
|
27
weakbd 2023-06-12 16:50:19 +08:00 1
#25 是的,仔细看了下 op 的需求,你不应该以你的维表为基准,以实现 count=0 的结果,在 flink sql 中我是没想到能实现的方式。不过 datastream api 更灵活是可以做到的,你可以尝试下试试使用 AggregateFunction 实现你的需求呢
|
28
kerie OP |
29
kerie OP 感谢各位 xdjm 的热心答复,我最终决定采纳 @t3zb2xzvjm4yvmn 的方案,大佬留下 vx ,我转你一杯咖啡的钱
|
30
t3zb2xzvjm4yvmn 2023-06-12 18:08:37 +08:00
@kerie QnVubnk1NDE=
|
31
kerie OP @t3zb2xzvjm4yvmn 已转
|
32
alwaysdazz 2023-06-13 05:19:39 +08:00 via Android 1
经典维表驱动流表的案例,之前遇到过该问题,用 Python udf 解决掉的。。。
|