使用sql 进行interval join,我目前的问题是感觉时间转换这块不太友好,我目前流里面的事件时间字段是string类型,数据样式是2022-06-10 13:08:55,但是我使用TO_TIMESTAMP这个函数进行转换一直报错
在 2022-06-10 15:04:31,"Xuyang" <[email protected]> 写道: >Hi, datastream的这个interval join的api应该对标的是sql中的interval >join。但是你目前写的这个sql,是普通join。普通join和interval join在业务含义和实现上都是有区别的。所以你直接拿datastream >api的interval join和sql上的普通join结果对比,其实是有问题的。所以我之前的建议是让你试下让sql也使用interval >join,这样双方才有可比性。 > > >另外sql中设置的table.exec.state.ttl这个参数,只是代表的state会20s清空过期数据,但我看你要比较的时间窗口是-10s和20s,貌似也不大一样。 > > > > >-- > > Best! > Xuyang > > > > > >在 2022-06-10 14:33:37,"lxk" <[email protected]> 写道: >> >> >> >>我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 20s,为什么会丢数据? >> >>sql中我设置这个table.exec.state.ttl参数 >>为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2022-06-10 14:15:29,"Xuyang" <[email protected]> 写道: >>>Hi, 你的这条SQL 并不是interval join,是普通join。 >>>interval join的使用文档可以参考文档[1]。可以试下使用SQL interval >>>join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。 >>> >>> >>> >>> >>>[1] >>>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins >>> >>> >>> >>> >>> >>> >>> >>>-- >>> >>> Best! >>> Xuyang >>> >>> >>> >>> >>> >>>在 2022-06-10 11:26:33,"lxk" <[email protected]> 写道: >>>>我用的是以下代码: >>>>String s = streamTableEnvironment.explainSql("select header.customer_id" + >>>>",item.goods_id" + >>>>",header.id" + >>>>",header.order_status" + >>>>",header.shop_id" + >>>>",header.parent_order_id" + >>>>",header.order_at" + >>>>",header.pay_at" + >>>>",header.channel_id" + >>>>",header.root_order_id" + >>>>",item.id" + >>>>",item.row_num" + >>>>",item.p_sp_sub_amt" + >>>>",item.display_qty" + >>>>",item.qty" + >>>>",item.bom_type" + >>>>" from header JOIN item on header.id = item.order_id"); >>>> >>>>System.out.println("explain:" + s); >>>> >>>> >>>> >>>> >>>>plan信息为: >>>>explain:== Abstract Syntax Tree == >>>>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], >>>>order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5], >>>>pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], >>>>p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20]) >>>>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner]) >>>> :- LogicalTableScan(table=[[default_catalog, default_database, >>>> Unregistered_DataStream_Source_5]]) >>>> +- LogicalTableScan(table=[[default_catalog, default_database, >>>> Unregistered_DataStream_Source_8]]) >>>> >>>> >>>>== Optimized Physical Plan == >>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, >>>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, >>>>p_sp_sub_amt, display_qty, qty, bom_type]) >>>>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, >>>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >>>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >>>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >>>>rightInputSpec=[NoUniqueKey]) >>>> :- Exchange(distribution=[hash[id]]) >>>> : +- Calc(select=[id, order_status, customer_id, shop_id, >>>> parent_order_id, order_at, pay_at, channel_id, root_order_id]) >>>> : +- TableSourceScan(table=[[default_catalog, default_database, >>>> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, >>>> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, >>>> last_updated_at, business_flag, mysql_op_type]) >>>> +- Exchange(distribution=[hash[order_id]]) >>>> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, >>>> bom_type, display_qty]) >>>> +- TableSourceScan(table=[[default_catalog, default_database, >>>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, >>>> goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, >>>> last_updated_at, display_qty, is_first_flag]) >>>> >>>> >>>>== Optimized Execution Plan == >>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, >>>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, >>>>p_sp_sub_amt, display_qty, qty, bom_type]) >>>>+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, >>>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >>>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >>>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >>>>rightInputSpec=[NoUniqueKey]) >>>> :- Exchange(distribution=[hash[id]]) >>>> : +- Calc(select=[id, order_status, customer_id, shop_id, >>>> parent_order_id, order_at, pay_at, channel_id, root_order_id]) >>>> : +- TableSourceScan(table=[[default_catalog, default_database, >>>> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, >>>> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, >>>> last_updated_at, business_flag, mysql_op_type]) >>>> +- Exchange(distribution=[hash[order_id]]) >>>> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, >>>> bom_type, display_qty]) >>>> +- TableSourceScan(table=[[default_catalog, default_database, >>>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, >>>> goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, >>>> last_updated_at, display_qty, is_first_flag]) >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>在 2022-06-10 11:02:56,"Shengkai Fang" <[email protected]> 写道: >>>>>你好,能提供下具体的 plan 供大家查看下吗? >>>>> >>>>>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN >>>>><YOUR_QUERY>").print() 打印下相关的信息。 >>>>> >>>>>Best, >>>>>Shengkai >>>>> >>>>>lxk <[email protected]> 于2022年6月10日周五 10:29写道: >>>>> >>>>>> flink 版本:1.14.4 >>>>>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval >>>>>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 >>>>>> 水印是直接使用kafka 自带的时间戳生成watermark >>>>>> >>>>>> >>>>>> 以下是代码 ---interval join >>>>>> >>>>>> SingleOutputStreamOperator<HeaderFull> headerFullStream = >>>>>> headerFilterStream.keyBy(data -> data.getId()) >>>>>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) >>>>>> .between(Time.seconds(-10), Time.seconds(20)) >>>>>> .process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() { >>>>>> @Override >>>>>> public void processElement(OrderHeader left, OrderItem right, Context >>>>>> context, Collector<HeaderFull> collector) throws Exception { >>>>>> HeaderFull headerFull = new HeaderFull(); >>>>>> BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); >>>>>> beanUtilsBean.copyProperties(headerFull, left); >>>>>> beanUtilsBean.copyProperties(headerFull, right); >>>>>> String event_date = left.getOrder_at().substring(0, 10); >>>>>> headerFull.setEvent_date(event_date); >>>>>> headerFull.setItem_id(right.getId()); >>>>>> collector.collect(headerFull); >>>>>> } >>>>>> } >>>>>> 使用sql 进行join >>>>>> Configuration conf = new Configuration(); >>>>>> conf.setString("table.exec.mini-batch.enabled","true"); >>>>>> conf.setString("table.exec.mini-batch.allow-latency","15 s"); >>>>>> conf.setString("table.exec.mini-batch.size","100"); >>>>>> conf.setString("table.exec.state.ttl","20 s"); >>>>>> env.configure(conf); >>>>>> Table headerTable = >>>>>> streamTableEnvironment.fromDataStream(headerFilterStream); >>>>>> Table itemTable = >>>>>> streamTableEnvironment.fromDataStream(filterItemStream); >>>>>> >>>>>> >>>>>> streamTableEnvironment.createTemporaryView("header",headerTable); >>>>>> streamTableEnvironment.createTemporaryView("item",itemTable); >>>>>> >>>>>> Table result = streamTableEnvironment.sqlQuery("select >>>>>> header.customer_id" >>>>>> + >>>>>> ",item.goods_id" + >>>>>> ",header.id" + >>>>>> ",header.order_status" + >>>>>> ",header.shop_id" + >>>>>> ",header.parent_order_id" + >>>>>> ",header.order_at" + >>>>>> ",header.pay_at" + >>>>>> ",header.channel_id" + >>>>>> ",header.root_order_id" + >>>>>> ",item.id" + >>>>>> ",item.row_num" + >>>>>> ",item.p_sp_sub_amt" + >>>>>> ",item.display_qty" + >>>>>> ",item.qty" + >>>>>> ",item.bom_type" + >>>>>> " from header JOIN item on header.id = item.order_id"); >>>>>> >>>>>> >>>>>> DataStream<Row> rowDataStream = >>>>>> streamTableEnvironment.toChangelogStream(result); >>>>>> 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval >>>>>> join,为啥两者最终关联上的结果差异这么大。 >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>
