Hi, 你的这条SQL 并不是interval join,是普通join。 interval join的使用文档可以参考文档[1]。可以试下使用SQL interval join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。
[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins -- Best! Xuyang 在 2022-06-10 11:26:33,"lxk" <[email protected]> 写道: >我用的是以下代码: >String s = streamTableEnvironment.explainSql("select header.customer_id" + >",item.goods_id" + >",header.id" + >",header.order_status" + >",header.shop_id" + >",header.parent_order_id" + >",header.order_at" + >",header.pay_at" + >",header.channel_id" + >",header.root_order_id" + >",item.id" + >",item.row_num" + >",item.p_sp_sub_amt" + >",item.display_qty" + >",item.qty" + >",item.bom_type" + >" from header JOIN item on header.id = item.order_id"); > >System.out.println("explain:" + s); > > > > >plan信息为: >explain:== Abstract Syntax Tree == >LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], >shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], >channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], >p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20]) >+- LogicalJoin(condition=[=($0, $13)], joinType=[inner]) > :- LogicalTableScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]]) > +- LogicalTableScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]]) > > >== Optimized Physical Plan == >Calc(select=[customer_id, goods_id, id, order_status, shop_id, >parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, >p_sp_sub_amt, display_qty, qty, bom_type]) >+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, >order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[id]]) > : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, > order_at, pay_at, channel_id, root_order_id]) > : +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, > shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, > last_updated_at, business_flag, mysql_op_type]) > +- Exchange(distribution=[hash[order_id]]) > +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, > bom_type, display_qty]) > +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, > s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, > display_qty, is_first_flag]) > > >== Optimized Execution Plan == >Calc(select=[customer_id, goods_id, id, order_status, shop_id, >parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, >p_sp_sub_amt, display_qty, qty, bom_type]) >+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, >order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[id]]) > : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, > order_at, pay_at, channel_id, root_order_id]) > : +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, > shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, > last_updated_at, business_flag, mysql_op_type]) > +- Exchange(distribution=[hash[order_id]]) > +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, > bom_type, display_qty]) > +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, > s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, > display_qty, is_first_flag]) > > > > > > > > > > > > > >在 2022-06-10 11:02:56,"Shengkai Fang" <[email protected]> 写道: >>你好,能提供下具体的 plan 供大家查看下吗? >> >>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN >><YOUR_QUERY>").print() 打印下相关的信息。 >> >>Best, >>Shengkai >> >>lxk <[email protected]> 于2022年6月10日周五 10:29写道: >> >>> flink 版本:1.14.4 >>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval >>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 >>> 水印是直接使用kafka 自带的时间戳生成watermark >>> >>> >>> 以下是代码 ---interval join >>> >>> SingleOutputStreamOperator<HeaderFull> headerFullStream = >>> headerFilterStream.keyBy(data -> data.getId()) >>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) >>> .between(Time.seconds(-10), Time.seconds(20)) >>> .process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() { >>> @Override >>> public void processElement(OrderHeader left, OrderItem right, Context >>> context, Collector<HeaderFull> collector) throws Exception { >>> HeaderFull headerFull = new HeaderFull(); >>> BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); >>> beanUtilsBean.copyProperties(headerFull, left); >>> beanUtilsBean.copyProperties(headerFull, right); >>> String event_date = left.getOrder_at().substring(0, 10); >>> headerFull.setEvent_date(event_date); >>> headerFull.setItem_id(right.getId()); >>> collector.collect(headerFull); >>> } >>> } >>> 使用sql 进行join >>> Configuration conf = new Configuration(); >>> conf.setString("table.exec.mini-batch.enabled","true"); >>> conf.setString("table.exec.mini-batch.allow-latency","15 s"); >>> conf.setString("table.exec.mini-batch.size","100"); >>> conf.setString("table.exec.state.ttl","20 s"); >>> env.configure(conf); >>> Table headerTable = >>> streamTableEnvironment.fromDataStream(headerFilterStream); >>> Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream); >>> >>> >>> streamTableEnvironment.createTemporaryView("header",headerTable); >>> streamTableEnvironment.createTemporaryView("item",itemTable); >>> >>> Table result = streamTableEnvironment.sqlQuery("select header.customer_id" >>> + >>> ",item.goods_id" + >>> ",header.id" + >>> ",header.order_status" + >>> ",header.shop_id" + >>> ",header.parent_order_id" + >>> ",header.order_at" + >>> ",header.pay_at" + >>> ",header.channel_id" + >>> ",header.root_order_id" + >>> ",item.id" + >>> ",item.row_num" + >>> ",item.p_sp_sub_amt" + >>> ",item.display_qty" + >>> ",item.qty" + >>> ",item.bom_type" + >>> " from header JOIN item on header.id = item.order_id"); >>> >>> >>> DataStream<Row> rowDataStream = >>> streamTableEnvironment.toChangelogStream(result); >>> 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval >>> join,为啥两者最终关联上的结果差异这么大。 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>
