Hi!

我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal
join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join

carlc <[email protected]> 于2021年8月4日周三 下午3:57写道:

> 感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
>
> create view v_bl_user_count as (
>     select user_id, count(1)
>     from mysql_user_blacklist
>     group by user_id
> );
>
> select t1.`user_id`
>      , t1.`event_type`
>      , t1.`current_ts`
> from kafka_user_event t1
> left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
> t1.`user_id` = t2.`user_id`
> where t1.`event_type` = ‘LOGIN’
>
> 异常信息:
> org.apache.flink.table.api.TableException: Processing-time temporal join
> is not supported yet.
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>
>
>
>
> > 在 2021年8月4日,14:18,Caizhi Weng <[email protected]> 写道:
> >
> > Hi!
> >
> > 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
> >
> > 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
> > table join 了。
> >
> > carlc <[email protected]> 于2021年8月4日周三 上午10:41写道:
> >
> >> 请教下如何在维表上做聚合操作?  如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
> >>
> >> -- 模拟需求(有点牵强...):
> >> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表
> mysql_user_blacklist
> >> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
> >>
> >> -- 1. 创建user_blacklist表
> >> CREATE TABLE `user_blacklist` (
> >> `user_id` bigint(20) NOT NULL,
> >> `create_time` datetime NOT NULL,
> >> PRIMARY KEY (`user_id`,`create_time`)
> >> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> >> INSERT INTO user_blacklist (`user_id`, `create_time`)
> >> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> >> (2,'2021-01-04 00:00:00');
> >>
> >> -- 2. 模拟kafka数据:
> >> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> >> 00:00:00"}
> >> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> >> 00:00:00"}
> >>
> >> -- 操作步骤:
> >> 当发送第1条kafka数据得到如下输出:
> >> | OP| user_id| event_type | current_ts| bl_count |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> >> 当再次发送第1条kafka数据得到如下输出:
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
> >>
> >> — SQL 如下:
> >>
> >> create table kafka_user_event
> >> (
> >> `user_id` BIGINT,
> >> `event_type` STRING,
> >> `current_ts` timestamp(3),
> >> `proc_time` AS PROCTIME()
> >> ) WITH (
> >> 'connector' = 'kafka',
> >> ...
> >> );
> >>
> >> create table mysql_user_blacklist
> >> (
> >> user_id BIGINT,
> >> create_time timestamp(3),
> >> primary key (user_id,create_time) not enforced
> >> ) WITH (
> >> 'connector' = 'jdbc',
> >> …
> >> );
> >>
> >> create view v2_user_event as (
> >> select t1.`user_id`
> >> , t1.`event_type`
> >> , t1.`current_ts`
> >> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
> >> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> >> from kafka_user_event t1
> >> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS
> t2
> >> on t1.`user_id` = t2.`user_id`
> >> where t1.`event_type` = 'LOGIN'
> >> );
> >>
> >> select * from v2_user_event;
> >>
> >>
>
>

回复