对于这个问题,我还是有很大的疑问,再把我这个场景描述一下: 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。) 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。 从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner join其实是一个window join吗?
[email protected] 发件人: lxk 发送时间: 2022-06-10 18:18 收件人: user-zh 主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问 现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题 Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))") .watermark("rowtime", "rowtime - INTERVAL '2' SECOND") .build()); Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))") .watermark("rowtime", "rowtime - INTERVAL '2' SECOND") .build()); streamTableEnvironment.createTemporaryView("header",headerTable); streamTableEnvironment.createTemporaryView("item",itemTable); Table result = streamTableEnvironment.sqlQuery("select header.customer_id" + ",item.goods_id" + ",header.id" + ",header.order_status" + ",header.shop_id" + ",header.parent_order_id" + ",header.order_at" + ",header.pay_at" + ",header.channel_id" + ",header.root_order_id" + ",item.id" + ",item.row_num" + ",item.p_sp_sub_amt" + ",item.display_qty" + ",item.qty" + ",item.bom_type" + " from header JOIN item on header.id = item.order_id and item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND header.rowtime + INTERVAL '20' SECOND"); String intervalJoin = streamTableEnvironment.explainSql("select header.customer_id" + ",item.goods_id" + ",header.id" + ",header.order_status" + ",header.shop_id" + ",header.parent_order_id" + ",header.order_at" + ",header.pay_at" + ",header.channel_id" + ",header.root_order_id" + ",item.id" + ",item.row_num" + ",item.p_sp_sub_amt" + ",item.display_qty" + ",item.qty" + ",item.bom_type" + " from header JOIN item on header.id = item.order_id and item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND header.rowtime + INTERVAL '20' SECOND"); System.out.println(intervalJoin); DataStream<Row> rowDataStream = streamTableEnvironment.toChangelogStream(result); 执行计划: == Abstract Syntax Tree == LogicalProject(customer_id=[$2], goods_id=[$16], id=[$0], order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$13], row_num=[$15], p_sp_sub_amt=[$20], display_qty=[$23], qty=[$18], bom_type=[$21]) +- LogicalJoin(condition=[AND(=($0, $14), >=($25, -($12, 10000:INTERVAL SECOND)), <=($25, +($12, 20000:INTERVAL SECOND)))], joinType=[inner]) :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($12, 2000:INTERVAL SECOND)]) : +- LogicalProject(id=[$0], order_status=[$1], customer_id=[$2], shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], channel_id=[$7], root_order_id=[$8], last_updated_at=[$9], business_flag=[$10], mysql_op_type=[$11], rowtime=[CAST(SUBSTRING($9, 0, 19)):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)]) : +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]]) +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($12, 2000:INTERVAL SECOND)]) +- LogicalProject(id=[$0], order_id=[$1], row_num=[$2], goods_id=[$3], s_sku_code=[$4], qty=[$5], p_paid_sub_amt=[$6], p_sp_sub_amt=[$7], bom_type=[$8], last_updated_at=[$9], display_qty=[$10], is_first_flag=[$11], rowtime=[CAST(SUBSTRING($9, 0, 19)):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)]) +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]]) == Optimized Physical Plan == Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, display_qty, qty, bom_type]) +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-20000, leftUpperBound=10000, leftTimeIndex=9, rightTimeIndex=8], where=[AND(=(id, order_id), >=(rowtime0, -(rowtime, 10000:INTERVAL SECOND)), <=(rowtime0, +(rowtime, 20000:INTERVAL SECOND)))], select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, rowtime, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, rowtime0]) :- Exchange(distribution=[hash[id]]) : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 2000:INTERVAL SECOND)]) : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) : +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) +- Exchange(distribution=[hash[order_id]]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 2000:INTERVAL SECOND)]) +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, display_qty, is_first_flag]) == Optimized Execution Plan == Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, display_qty, qty, bom_type]) +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-20000, leftUpperBound=10000, leftTimeIndex=9, rightTimeIndex=8], where=[((id = order_id) AND (rowtime0 >= (rowtime - 10000:INTERVAL SECOND)) AND (rowtime0 <= (rowtime + 20000:INTERVAL SECOND)))], select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, rowtime, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, rowtime0]) :- Exchange(distribution=[hash[id]]) : +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 2000:INTERVAL SECOND)]) : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) : +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) +- Exchange(distribution=[hash[order_id]]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 2000:INTERVAL SECOND)]) +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, display_qty, is_first_flag]) 在 2022-06-10 17:16:31,"lxk" <[email protected]> 写道: >使用sql 进行interval >join,我目前的问题是感觉时间转换这块不太友好,我目前流里面的事件时间字段是string类型,数据样式是2022-06-10 >13:08:55,但是我使用TO_TIMESTAMP这个函数进行转换一直报错 > > > > > > > > > > > > > > > > > >在 2022-06-10 15:04:31,"Xuyang" <[email protected]> 写道: >>Hi, datastream的这个interval join的api应该对标的是sql中的interval >>join。但是你目前写的这个sql,是普通join。普通join和interval >>join在业务含义和实现上都是有区别的。所以你直接拿datastream api的interval >>join和sql上的普通join结果对比,其实是有问题的。所以我之前的建议是让你试下让sql也使用interval join,这样双方才有可比性。 >> >> >>另外sql中设置的table.exec.state.ttl这个参数,只是代表的state会20s清空过期数据,但我看你要比较的时间窗口是-10s和20s,貌似也不大一样。 >> >> >> >> >>-- >> >> Best! >> Xuyang >> >> >> >> >> >>在 2022-06-10 14:33:37,"lxk" <[email protected]> 写道: >>> >>> >>> >>>我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 20s,为什么会丢数据? >>> >>>sql中我设置这个table.exec.state.ttl参数 >>>为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>在 2022-06-10 14:15:29,"Xuyang" <[email protected]> 写道: >>>>Hi, 你的这条SQL 并不是interval join,是普通join。 >>>>interval join的使用文档可以参考文档[1]。可以试下使用SQL interval >>>>join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。 >>>> >>>> >>>> >>>> >>>>[1] >>>>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>-- >>>> >>>> Best! >>>> Xuyang >>>> >>>> >>>> >>>> >>>> >>>>在 2022-06-10 11:26:33,"lxk" <[email protected]> 写道: >>>>>我用的是以下代码: >>>>>String s = streamTableEnvironment.explainSql("select header.customer_id" + >>>>>",item.goods_id" + >>>>>",header.id" + >>>>>",header.order_status" + >>>>>",header.shop_id" + >>>>>",header.parent_order_id" + >>>>>",header.order_at" + >>>>>",header.pay_at" + >>>>>",header.channel_id" + >>>>>",header.root_order_id" + >>>>>",item.id" + >>>>>",item.row_num" + >>>>>",item.p_sp_sub_amt" + >>>>>",item.display_qty" + >>>>>",item.qty" + >>>>>",item.bom_type" + >>>>>" from header JOIN item on header.id = item.order_id"); >>>>> >>>>>System.out.println("explain:" + s); >>>>> >>>>> >>>>> >>>>> >>>>>plan信息为: >>>>>explain:== Abstract Syntax Tree == >>>>>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], >>>>>order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5], >>>>>pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$12], >>>>>row_num=[$14], p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], >>>>>bom_type=[$20]) >>>>>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner]) >>>>> :- LogicalTableScan(table=[[default_catalog, default_database, >>>>> Unregistered_DataStream_Source_5]]) >>>>> +- LogicalTableScan(table=[[default_catalog, default_database, >>>>> Unregistered_DataStream_Source_8]]) >>>>> >>>>> >>>>>== Optimized Physical Plan == >>>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, >>>>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, >>>>>row_num, p_sp_sub_amt, display_qty, qty, bom_type]) >>>>>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, >>>>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >>>>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >>>>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >>>>>rightInputSpec=[NoUniqueKey]) >>>>> :- Exchange(distribution=[hash[id]]) >>>>> : +- Calc(select=[id, order_status, customer_id, shop_id, >>>>> parent_order_id, order_at, pay_at, channel_id, root_order_id]) >>>>> : +- TableSourceScan(table=[[default_catalog, default_database, >>>>> Unregistered_DataStream_Source_5]], fields=[id, order_status, >>>>> customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, >>>>> root_order_id, last_updated_at, business_flag, mysql_op_type]) >>>>> +- Exchange(distribution=[hash[order_id]]) >>>>> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, >>>>> bom_type, display_qty]) >>>>> +- TableSourceScan(table=[[default_catalog, default_database, >>>>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, >>>>> goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, >>>>> last_updated_at, display_qty, is_first_flag]) >>>>> >>>>> >>>>>== Optimized Execution Plan == >>>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, >>>>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, >>>>>row_num, p_sp_sub_amt, display_qty, qty, bom_type]) >>>>>+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, >>>>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >>>>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >>>>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >>>>>rightInputSpec=[NoUniqueKey]) >>>>> :- Exchange(distribution=[hash[id]]) >>>>> : +- Calc(select=[id, order_status, customer_id, shop_id, >>>>> parent_order_id, order_at, pay_at, channel_id, root_order_id]) >>>>> : +- TableSourceScan(table=[[default_catalog, default_database, >>>>> Unregistered_DataStream_Source_5]], fields=[id, order_status, >>>>> customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, >>>>> root_order_id, last_updated_at, business_flag, mysql_op_type]) >>>>> +- Exchange(distribution=[hash[order_id]]) >>>>> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, >>>>> bom_type, display_qty]) >>>>> +- TableSourceScan(table=[[default_catalog, default_database, >>>>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, >>>>> goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, >>>>> last_updated_at, display_qty, is_first_flag]) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>>在 2022-06-10 11:02:56,"Shengkai Fang" <[email protected]> 写道: >>>>>>你好,能提供下具体的 plan 供大家查看下吗? >>>>>> >>>>>>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN >>>>>><YOUR_QUERY>").print() 打印下相关的信息。 >>>>>> >>>>>>Best, >>>>>>Shengkai >>>>>> >>>>>>lxk <[email protected]> 于2022年6月10日周五 10:29写道: >>>>>> >>>>>>> flink 版本:1.14.4 >>>>>>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval >>>>>>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 >>>>>>> 水印是直接使用kafka 自带的时间戳生成watermark >>>>>>> >>>>>>> >>>>>>> 以下是代码 ---interval join >>>>>>> >>>>>>> SingleOutputStreamOperator<HeaderFull> headerFullStream = >>>>>>> headerFilterStream.keyBy(data -> data.getId()) >>>>>>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) >>>>>>> .between(Time.seconds(-10), Time.seconds(20)) >>>>>>> .process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() { >>>>>>> @Override >>>>>>> public void processElement(OrderHeader left, OrderItem right, Context >>>>>>> context, Collector<HeaderFull> collector) throws Exception { >>>>>>> HeaderFull headerFull = new HeaderFull(); >>>>>>> BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); >>>>>>> beanUtilsBean.copyProperties(headerFull, left); >>>>>>> beanUtilsBean.copyProperties(headerFull, right); >>>>>>> String event_date = left.getOrder_at().substring(0, 10); >>>>>>> headerFull.setEvent_date(event_date); >>>>>>> headerFull.setItem_id(right.getId()); >>>>>>> collector.collect(headerFull); >>>>>>> } >>>>>>> } >>>>>>> 使用sql 进行join >>>>>>> Configuration conf = new Configuration(); >>>>>>> conf.setString("table.exec.mini-batch.enabled","true"); >>>>>>> conf.setString("table.exec.mini-batch.allow-latency","15 s"); >>>>>>> conf.setString("table.exec.mini-batch.size","100"); >>>>>>> conf.setString("table.exec.state.ttl","20 s"); >>>>>>> env.configure(conf); >>>>>>> Table headerTable = >>>>>>> streamTableEnvironment.fromDataStream(headerFilterStream); >>>>>>> Table itemTable = >>>>>>> streamTableEnvironment.fromDataStream(filterItemStream); >>>>>>> >>>>>>> >>>>>>> streamTableEnvironment.createTemporaryView("header",headerTable); >>>>>>> streamTableEnvironment.createTemporaryView("item",itemTable); >>>>>>> >>>>>>> Table result = streamTableEnvironment.sqlQuery("select >>>>>>> header.customer_id" >>>>>>> + >>>>>>> ",item.goods_id" + >>>>>>> ",header.id" + >>>>>>> ",header.order_status" + >>>>>>> ",header.shop_id" + >>>>>>> ",header.parent_order_id" + >>>>>>> ",header.order_at" + >>>>>>> ",header.pay_at" + >>>>>>> ",header.channel_id" + >>>>>>> ",header.root_order_id" + >>>>>>> ",item.id" + >>>>>>> ",item.row_num" + >>>>>>> ",item.p_sp_sub_amt" + >>>>>>> ",item.display_qty" + >>>>>>> ",item.qty" + >>>>>>> ",item.bom_type" + >>>>>>> " from header JOIN item on header.id = item.order_id"); >>>>>>> >>>>>>> >>>>>>> DataStream<Row> rowDataStream = >>>>>>> streamTableEnvironment.toChangelogStream(result); >>>>>>> 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval >>>>>>> join,为啥两者最终关联上的结果差异这么大。 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>>
