Hi Tianwang,一旦,

我感觉这个场景其实可以在Flink SQL中做一个优化,我建了一个issue[1],欢迎讨论~

[1] https://issues.apache.org/jira/browse/FLINK-18996

赵一旦 <[email protected]> 于2020年8月17日周一 上午11:52写道:

> 大概看了下。这个问题我业务中涉及到过。我是DataStream API做的。
> 不过我是在任务设计阶段就考虑了所有case,然后提前考虑了这些问题的。
> watermark是可以重设的。其次我还更改了interval join的算子实现,默认1.10只支持inner join。不支持left/right
> join。
> 并且inner join后采用最大的timestamp。这个比较复杂,实际如果做left join,业务上可能更希望使用left的时间,right
> join则使用right的时间。out join则只能使用留下的那个的时间,inner join情况需要看业务。
>
>
> 你这个问题主要就是watermark重设就可以了。
>
>
>
> Tianwang Li <[email protected]> 于2020年8月16日周日 上午10:45写道:
>
> > 展开讨论一些特点从场景。
> > 1、inner join场景。有什么办法取两条流的的rowtime 的max吗?
> > 使用SQL语句的场合,怎么实现?
> > 例如:
> > SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as
> > rowtime, ...
> >
> > 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。
> >
> > Tianwang Li <[email protected]> 于2020年8月16日周日 上午10:40写道:
> >
> > > 展开讨论一些特点场景。
> > >
> > > Benchao Li <[email protected]> 于2020年7月6日周一 下午11:08写道:
> > >
> > >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
> > >>
> > >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
> > >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
> > >>
> > >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
> > >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
> > >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
> > >> 的时间最早的那个。
> > >>
> > >> 元始(Bob Hu) <[email protected]> 于2020年7月5日周日 下午8:48写道:
> > >>
> > >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢
> > >> >
> > >> >
> > >> > ------------------ 原始邮件 ------------------
> > >> > *发件人:* "Benchao Li"<[email protected]>;
> > >> > *发送时间:* 2020年7月5日(星期天) 中午11:58
> > >> > *收件人:* "元始(Bob Hu)"<[email protected]>;
> > >> > *抄送:* "user-zh"<[email protected]>;
> > >> > *主题:* Re: flink interval join后按窗口聚组问题
> > >> >
> > >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> > >> > 所以如果用事件时间的time interval
> join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> > >> > 就会有些问题,很多数据被作为late数据直接丢掉了。
> > >> >
> > >> > 元始(Bob Hu) <[email protected]> 于2020年7月3日周五 下午3:29写道:
> > >> >
> > >> >> 您好,我想请教一个问题:
> > >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left
> join关联不上的数据会被丢掉。
> > >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between
> > >> a.rowtime
> > >> >> and a.rowtime + INTERVAL '1' HOUR
> > >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime =
> rowTime
> > +
> > >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> > >> >> allowedLateness +
> > >> >>
> > >>
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> > >> >> rightRelativeSize) +
> > >> >>
> > >>
> >
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> > >> >> by的时候这种右表数据为空的数据就丢掉了啊。
> > >> >> flink版本 1.10.0。
> > >> >>
> > >> >> 下面是我的一段测试代码:
> > >> >>
> > >> >> import org.apache.commons.net.ntp.TimeStamp;
> > >> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> > >> >> import org.apache.flink.api.common.typeinfo.Types;
> > >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> > >> >> import org.apache.flink.streaming.api.TimeCharacteristic;
> > >> >> import org.apache.flink.streaming.api.datastream.DataStream;
> > >> >> import
> > >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > >> >> import
> > >>
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> > >> >> import org.apache.flink.streaming.api.functions.ProcessFunction;
> > >> >> import
> > org.apache.flink.streaming.api.functions.source.SourceFunction;
> > >> >> import org.apache.flink.streaming.api.watermark.Watermark;
> > >> >> import org.apache.flink.table.api.EnvironmentSettings;
> > >> >> import org.apache.flink.table.api.Table;
> > >> >> import org.apache.flink.table.api.java.StreamTableEnvironment;
> > >> >> import org.apache.flink.table.functions.ScalarFunction;
> > >> >> import org.apache.flink.types.Row;
> > >> >> import org.apache.flink.util.Collector;
> > >> >> import org.apache.flink.util.IOUtils;
> > >> >>
> > >> >> import java.io.BufferedReader;
> > >> >> import java.io.InputStreamReader;
> > >> >> import java.io.Serializable;
> > >> >> import java.net.InetSocketAddress;
> > >> >> import java.net.Socket;
> > >> >> import java.sql.Timestamp;
> > >> >> import java.text.SimpleDateFormat;
> > >> >> import java.util.ArrayList;
> > >> >> import java.util.Date;
> > >> >> import java.util.List;
> > >> >>
> > >> >> public class TimeBoundedJoin {
> > >> >>
> > >> >>     public static AssignerWithPeriodicWatermarks<Row>
> > >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
> > >> >>         AssignerWithPeriodicWatermarks<Row> timestampExtractor =
> new
> > >> AssignerWithPeriodicWatermarks<Row>() {
> > >> >>             private long currentMaxTimestamp = 0;
> > >> >>             private long lastMaxTimestamp = 0;
> > >> >>             private long lastUpdateTime = 0;
> > >> >>             boolean firstWatermark = true;
> > >> >> //            Integer maxIdleTime = 30;
> > >> >>
> > >> >>             @Override
> > >> >>             public Watermark getCurrentWatermark() {
> > >> >>                 if(firstWatermark) {
> > >> >>                     lastUpdateTime = System.currentTimeMillis();
> > >> >>                     firstWatermark = false;
> > >> >>                 }
> > >> >>                 if(currentMaxTimestamp != lastMaxTimestamp) {
> > >> >>                     lastMaxTimestamp = currentMaxTimestamp;
> > >> >>                     lastUpdateTime = System.currentTimeMillis();
> > >> >>                 }
> > >> >>                 if(maxIdleTime != null &&
> System.currentTimeMillis()
> > -
> > >> lastUpdateTime > maxIdleTime * 1000) {
> > >> >>                     return new Watermark(new Date().getTime() -
> > >> finalMaxOutOfOrderness * 1000);
> > >> >>                 }
> > >> >>                 return new Watermark(currentMaxTimestamp -
> > >> finalMaxOutOfOrderness * 1000);
> > >> >>
> > >> >>             }
> > >> >>
> > >> >>             @Override
> > >> >>             public long extractTimestamp(Row row, long
> > >> previousElementTimestamp) {
> > >> >>                 Object value = row.getField(1);
> > >> >>                 long timestamp;
> > >> >>                 try {
> > >> >>                     timestamp = (long)value;
> > >> >>                 } catch (Exception e) {
> > >> >>                     timestamp = ((Timestamp)value).getTime();
> > >> >>                 }
> > >> >>                 if(timestamp > currentMaxTimestamp) {
> > >> >>                     currentMaxTimestamp = timestamp;
> > >> >>                 }
> > >> >>                 return timestamp;
> > >> >>             }
> > >> >>         };
> > >> >>         return timestampExtractor;
> > >> >>     }
> > >> >>
> > >> >>     public static void main(String[] args) throws Exception {
> > >> >>         StreamExecutionEnvironment bsEnv =
> > >> StreamExecutionEnvironment.getExecutionEnvironment();
> > >> >>         EnvironmentSettings bsSettings =
> > >>
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >> >>         StreamTableEnvironment bsTableEnv =
> > >> StreamTableEnvironment.create(bsEnv, bsSettings);
> > >> >>         bsEnv.setParallelism(1);
> > >> >>
> > >>  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >> >>
> > >> >>
> > >> >> //        DataStream<Row> ds1 =
> > bsEnv.addSource(sourceFunction(9000));
> > >> >>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> > >> HH:mm:ss");
> > >> >>         List<Row> list = new ArrayList<>();
> > >> >>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> > >> 00:00:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 00:20:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 00:40:00").getTime()), 100));
> > >> >>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> > >> 01:00:01").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:20:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:30:00").getTime()), 100));
> > >> >>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
> > >> 02:00:02").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:20:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 02:40:00").getTime()), 100));
> > >> >>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> > >> 03:00:03").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 03:20:00").getTime()), 100));
> > >> >>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
> > >> 03:40:00").getTime()), 100));
> > >> >>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> > >> 04:00:04").getTime()), 100));
> > >> >>         DataStream<Row> ds1 = bsEnv.addSource(new
> > >> SourceFunction<Row>() {
> > >> >>             @Override
> > >> >>             public void run(SourceContext<Row> ctx) throws
> Exception
> > {
> > >> >>                 for(Row row : list) {
> > >> >>                     ctx.collect(row);
> > >> >>                     Thread.sleep(1000);
> > >> >>                 }
> > >> >>
> > >> >>             }
> > >> >>
> > >> >>             @Override
> > >> >>             public void cancel() {
> > >> >>
> > >> >>             }
> > >> >>         });
> > >> >>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null,
> > 0));
> > >> >>         ds1.getTransformation().setOutputType((new
> > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
> > >> >>         bsTableEnv.createTemporaryView("order_info", ds1,
> "order_id,
> > >> order_time, fee, rowtime.rowtime");
> > >> >>
> > >> >>         List<Row> list2 = new ArrayList<>();
> > >> >>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
> > >> 01:00:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 01:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 01:30:00").getTime())));
> > >> >>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
> > >> 02:00:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 02:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 02:40:00").getTime())));
> > >> >> //        list2.add(Row.of("003",new
> Timestamp(sdf.parse("2020-05-13
> > >> 03:00:03").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 03:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 03:40:00").getTime())));
> > >> >>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
> > >> 04:00:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 04:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 04:40:00").getTime())));
> > >> >>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
> > >> 05:00:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 05:20:00").getTime())));
> > >> >>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
> > >> 05:40:00").getTime())));
> > >> >>         DataStream<Row> ds2 = bsEnv.addSource(new
> > >> SourceFunction<Row>() {
> > >> >>             @Override
> > >> >>             public void run(SourceContext<Row> ctx) throws
> Exception
> > {
> > >> >>                 for(Row row : list2) {
> > >> >>                     ctx.collect(row);
> > >> >>                     Thread.sleep(1000);
> > >> >>                 }
> > >> >>
> > >> >>             }
> > >> >>
> > >> >>             @Override
> > >> >>             public void cancel() {
> > >> >>
> > >> >>             }
> > >> >>         });
> > >> >>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null,
> > 0));
> > >> >>         ds2.getTransformation().setOutputType((new
> > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
> > >> >>         bsTableEnv.createTemporaryView("pay", ds2, "order_id,
> > >> pay_time, rowtime.rowtime");
> > >> >>
> > >> >>         Table joinTable =  bsTableEnv.sqlQuery("SELECT
> a.*,b.order_id
> > >> from order_info a left join pay b on a.order_id=b.order_id and
> b.rowtime
> > >> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id
> > >> <>'000' ");
> > >> >>
> > >> >>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new
> > >> ProcessFunction<Row, Object>() {
> > >> >>             @Override
> > >> >>             public void processElement(Row value, Context ctx,
> > >> Collector<Object> out) throws Exception {
> > >> >>                 SimpleDateFormat sdf = new
> > >> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
> > >> >>                 System.err.println("row:" + value + ",rowtime:" +
> > >> value.getField(3) + ",watermark:" +
> > >> sdf.format(ctx.timerService().currentWatermark()));
> > >> >>             }
> > >> >>         });
> > >> >>
> > >> >>         bsTableEnv.execute("job");
> > >> >>     }
> > >> >> }
> > >> >>
> > >> >>
> > >> >
> > >> > --
> > >> >
> > >> > Best,
> > >> > Benchao Li
> > >> >
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> > >>
> > >
> > >
> > > --
> > > **************************************
> > >  tivanli
> > > **************************************
> > >
> >
> >
> > --
> > **************************************
> >  tivanli
> > **************************************
> >
>


-- 

Best,
Benchao Li

回复