Hi Bob,

This is Flink user mailing list. Please send to this mailing list using
english.
If you want to use Chinese, you can send it to user...@flink.apache.org

元始(Bob Hu) <657390...@qq.com> 于2020年7月3日周五 下午3:29写道:

> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
> and a.rowtime + INTERVAL '1' HOUR
> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> allowedLateness +
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> rightRelativeSize) +
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
>
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
>     public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer 
> maxIdleTime, long finalMaxOutOfOrderness) {
>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new 
> AssignerWithPeriodicWatermarks<Row>() {
>             private long currentMaxTimestamp = 0;
>             private long lastMaxTimestamp = 0;
>             private long lastUpdateTime = 0;
>             boolean firstWatermark = true;
> //            Integer maxIdleTime = 30;
>
>             @Override
>             public Watermark getCurrentWatermark() {
>                 if(firstWatermark) {
>                     lastUpdateTime = System.currentTimeMillis();
>                     firstWatermark = false;
>                 }
>                 if(currentMaxTimestamp != lastMaxTimestamp) {
>                     lastMaxTimestamp = currentMaxTimestamp;
>                     lastUpdateTime = System.currentTimeMillis();
>                 }
>                 if(maxIdleTime != null && System.currentTimeMillis() - 
> lastUpdateTime > maxIdleTime * 1000) {
>                     return new Watermark(new Date().getTime() - 
> finalMaxOutOfOrderness * 1000);
>                 }
>                 return new Watermark(currentMaxTimestamp - 
> finalMaxOutOfOrderness * 1000);
>
>             }
>
>             @Override
>             public long extractTimestamp(Row row, long 
> previousElementTimestamp) {
>                 Object value = row.getField(1);
>                 long timestamp;
>                 try {
>                     timestamp = (long)value;
>                 } catch (Exception e) {
>                     timestamp = ((Timestamp)value).getTime();
>                 }
>                 if(timestamp > currentMaxTimestamp) {
>                     currentMaxTimestamp = timestamp;
>                 }
>                 return timestamp;
>             }
>         };
>         return timestampExtractor;
>     }
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
>         bsEnv.setParallelism(1);
>         bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>         List<Row> list = new ArrayList<>();
>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 
> 00:00:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
> 00:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
> 00:40:00").getTime()), 100));
>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 
> 01:00:01").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
> 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
> 02:30:00").getTime()), 100));
>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 
> 02:00:02").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
> 02:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
> 02:40:00").getTime()), 100));
>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 
> 03:00:03").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
> 03:20:00").getTime()), 100));
>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
> 03:40:00").getTime()), 100));
>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 
> 04:00:04").getTime()), 100));
>         DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, 
> Types.SQL_TIMESTAMP, Types.INT)));
>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id, 
> order_time, fee, rowtime.rowtime");
>
>         List<Row> list2 = new ArrayList<>();
>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 
> 01:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 01:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 01:30:00").getTime())));
>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 
> 02:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 02:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 02:40:00").getTime())));
> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 
> 03:00:03").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 03:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 03:40:00").getTime())));
>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 
> 04:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 04:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 04:40:00").getTime())));
>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 
> 05:00:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 05:20:00").getTime())));
>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
> 05:40:00").getTime())));
>         DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 for(Row row : list2) {
>                     ctx.collect(row);
>                     Thread.sleep(1000);
>                 }
>
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>         ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, 
> Types.SQL_TIMESTAMP)));
>         bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, 
> rowtime.rowtime");
>
>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from 
> order_info a left join pay b on a.order_id=b.order_id and b.rowtime between 
> a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>
>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new 
> ProcessFunction<Row, Object>() {
>             @Override
>             public void processElement(Row value, Context ctx, 
> Collector<Object> out) throws Exception {
>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
> HH:mm:ss.SSS");
>                 System.err.println("row:" + value + ",rowtime:" + 
> value.getField(3) + ",watermark:" + 
> sdf.format(ctx.timerService().currentWatermark()));
>             }
>         });
>
>         bsTableEnv.execute("job");
>     }
> }
>
>

-- 

Best,
Benchao Li

Reply via email to