我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 20s,为什么会丢数据?
sql中我设置这个table.exec.state.ttl参数
为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。
在 2022-06-10 14:15:29,"Xuyang" <[email protected]> 写道:
>Hi, 你的这条SQL 并不是interval join,是普通join。
>interval join的使用文档可以参考文档[1]。可以试下使用SQL interval
>join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。
>
>
>
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins
>
>
>
>
>
>
>
>--
>
> Best!
> Xuyang
>
>
>
>
>
>在 2022-06-10 11:26:33,"lxk" <[email protected]> 写道:
>>我用的是以下代码:
>>String s = streamTableEnvironment.explainSql("select header.customer_id" +
>>",item.goods_id" +
>>",header.id" +
>>",header.order_status" +
>>",header.shop_id" +
>>",header.parent_order_id" +
>>",header.order_at" +
>>",header.pay_at" +
>>",header.channel_id" +
>>",header.root_order_id" +
>>",item.id" +
>>",item.row_num" +
>>",item.p_sp_sub_amt" +
>>",item.display_qty" +
>>",item.qty" +
>>",item.bom_type" +
>>" from header JOIN item on header.id = item.order_id");
>>
>>System.out.println("explain:" + s);
>>
>>
>>
>>
>>plan信息为:
>>explain:== Abstract Syntax Tree ==
>>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1],
>>shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6],
>>channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14],
>>p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20])
>>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner])
>> :- LogicalTableScan(table=[[default_catalog, default_database,
>> Unregistered_DataStream_Source_5]])
>> +- LogicalTableScan(table=[[default_catalog, default_database,
>> Unregistered_DataStream_Source_8]])
>>
>>
>>== Optimized Physical Plan ==
>>Calc(select=[customer_id, goods_id, id, order_status, shop_id,
>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num,
>>p_sp_sub_amt, display_qty, qty, bom_type])
>>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id,
>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at,
>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty,
>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey],
>>rightInputSpec=[NoUniqueKey])
>> :- Exchange(distribution=[hash[id]])
>> : +- Calc(select=[id, order_status, customer_id, shop_id,
>> parent_order_id, order_at, pay_at, channel_id, root_order_id])
>> : +- TableSourceScan(table=[[default_catalog, default_database,
>> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id,
>> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id,
>> last_updated_at, business_flag, mysql_op_type])
>> +- Exchange(distribution=[hash[order_id]])
>> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt,
>> bom_type, display_qty])
>> +- TableSourceScan(table=[[default_catalog, default_database,
>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id,
>> s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at,
>> display_qty, is_first_flag])
>>
>>
>>== Optimized Execution Plan ==
>>Calc(select=[customer_id, goods_id, id, order_status, shop_id,
>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num,
>>p_sp_sub_amt, display_qty, qty, bom_type])
>>+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id,
>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at,
>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty,
>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey],
>>rightInputSpec=[NoUniqueKey])
>> :- Exchange(distribution=[hash[id]])
>> : +- Calc(select=[id, order_status, customer_id, shop_id,
>> parent_order_id, order_at, pay_at, channel_id, root_order_id])
>> : +- TableSourceScan(table=[[default_catalog, default_database,
>> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id,
>> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id,
>> last_updated_at, business_flag, mysql_op_type])
>> +- Exchange(distribution=[hash[order_id]])
>> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt,
>> bom_type, display_qty])
>> +- TableSourceScan(table=[[default_catalog, default_database,
>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id,
>> s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at,
>> display_qty, is_first_flag])
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-06-10 11:02:56,"Shengkai Fang" <[email protected]> 写道:
>>>你好,能提供下具体的 plan 供大家查看下吗?
>>>
>>>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN
>>><YOUR_QUERY>").print() 打印下相关的信息。
>>>
>>>Best,
>>>Shengkai
>>>
>>>lxk <[email protected]> 于2022年6月10日周五 10:29写道:
>>>
>>>> flink 版本:1.14.4
>>>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval
>>>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。
>>>> 水印是直接使用kafka 自带的时间戳生成watermark
>>>>
>>>>
>>>> 以下是代码 ---interval join
>>>>
>>>> SingleOutputStreamOperator<HeaderFull> headerFullStream =
>>>> headerFilterStream.keyBy(data -> data.getId())
>>>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id()))
>>>> .between(Time.seconds(-10), Time.seconds(20))
>>>> .process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() {
>>>> @Override
>>>> public void processElement(OrderHeader left, OrderItem right, Context
>>>> context, Collector<HeaderFull> collector) throws Exception {
>>>> HeaderFull headerFull = new HeaderFull();
>>>> BeanUtilsBean beanUtilsBean = new BeanUtilsBean();
>>>> beanUtilsBean.copyProperties(headerFull, left);
>>>> beanUtilsBean.copyProperties(headerFull, right);
>>>> String event_date = left.getOrder_at().substring(0, 10);
>>>> headerFull.setEvent_date(event_date);
>>>> headerFull.setItem_id(right.getId());
>>>> collector.collect(headerFull);
>>>> }
>>>> }
>>>> 使用sql 进行join
>>>> Configuration conf = new Configuration();
>>>> conf.setString("table.exec.mini-batch.enabled","true");
>>>> conf.setString("table.exec.mini-batch.allow-latency","15 s");
>>>> conf.setString("table.exec.mini-batch.size","100");
>>>> conf.setString("table.exec.state.ttl","20 s");
>>>> env.configure(conf);
>>>> Table headerTable =
>>>> streamTableEnvironment.fromDataStream(headerFilterStream);
>>>> Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream);
>>>>
>>>>
>>>> streamTableEnvironment.createTemporaryView("header",headerTable);
>>>> streamTableEnvironment.createTemporaryView("item",itemTable);
>>>>
>>>> Table result = streamTableEnvironment.sqlQuery("select header.customer_id"
>>>> +
>>>> ",item.goods_id" +
>>>> ",header.id" +
>>>> ",header.order_status" +
>>>> ",header.shop_id" +
>>>> ",header.parent_order_id" +
>>>> ",header.order_at" +
>>>> ",header.pay_at" +
>>>> ",header.channel_id" +
>>>> ",header.root_order_id" +
>>>> ",item.id" +
>>>> ",item.row_num" +
>>>> ",item.p_sp_sub_amt" +
>>>> ",item.display_qty" +
>>>> ",item.qty" +
>>>> ",item.bom_type" +
>>>> " from header JOIN item on header.id = item.order_id");
>>>>
>>>>
>>>> DataStream<Row> rowDataStream =
>>>> streamTableEnvironment.toChangelogStream(result);
>>>> 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval
>>>> join,为啥两者最终关联上的结果差异这么大。
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>