????????????????????????
flink?????? interval join??????window group????????????????????left
join????????????????????????
??????????????select * from a,b where a.id=b.id and b.rowtime between a.rowtime
and a.rowtime + INTERVAL '1' HOUR
????????leftRelativeSize=1??????rightRelativeSize=0??????cleanUpTime = rowTime
+ leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
allowedLateness +
1????????????????????????1.5??????????????????null??????watermark??????????Math.max(leftRelativeSize,
rightRelativeSize) +
allowedLateness????????1????????????????????????????watermark??????????rowtime????0.5????????,??????????????????group
by????????????????????????????????????????
flink???? 1.10.0??
????????????????????????
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IOUtils;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class TimeBoundedJoin {
public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer
maxIdleTime, long finalMaxOutOfOrderness) {
AssignerWithPeriodicWatermarks<Row> timestampExtractor = new
AssignerWithPeriodicWatermarks<Row>() {
private long currentMaxTimestamp = 0;
private long lastMaxTimestamp = 0;
private long lastUpdateTime = 0;
boolean firstWatermark = true;
// Integer maxIdleTime = 30;
@Override
public Watermark getCurrentWatermark() {
if(firstWatermark) {
lastUpdateTime = System.currentTimeMillis();
firstWatermark = false;
}
if(currentMaxTimestamp != lastMaxTimestamp) {
lastMaxTimestamp = currentMaxTimestamp;
lastUpdateTime = System.currentTimeMillis();
}
if(maxIdleTime != null && System.currentTimeMillis() -
lastUpdateTime > maxIdleTime * 1000) {
return new Watermark(new Date().getTime() -
finalMaxOutOfOrderness * 1000);
}
return new Watermark(currentMaxTimestamp -
finalMaxOutOfOrderness * 1000);
}
@Override
public long extractTimestamp(Row row, long
previousElementTimestamp) {
Object value = row.getField(1);
long timestamp;
try {
timestamp = (long)value;
} catch (Exception e) {
timestamp = ((Timestamp)value).getTime();
}
if(timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
};
return timestampExtractor;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<Row> list = new ArrayList<>();
list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
00:00:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
00:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
00:40:00").getTime()), 100));
list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
01:00:01").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
02:30:00").getTime()), 100));
list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
02:00:02").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
02:40:00").getTime()), 100));
list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
03:00:03").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
03:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13
03:40:00").getTime()), 100));
list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
04:00:04").getTime()), 100));
DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list) {
ctx.collect(row);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING,
Types.SQL_TIMESTAMP, Types.INT)));
bsTableEnv.createTemporaryView("order_info", ds1, "order_id,
order_time, fee, rowtime.rowtime");
List<Row> list2 = new ArrayList<>();
list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13
01:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
01:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
01:30:00").getTime())));
list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13
02:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
02:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
02:40:00").getTime())));
// list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13
03:00:03").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
03:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
03:40:00").getTime())));
list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13
04:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
04:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
04:40:00").getTime())));
list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13
05:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
05:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13
05:40:00").getTime())));
DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list2) {
ctx.collect(row);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING,
Types.SQL_TIMESTAMP)));
bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time,
rowtime.rowtime");
Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from
order_info a left join pay b on a.order_id=b.order_id and b.rowtime between
a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
bsTableEnv.toAppendStream(joinTable, Row.class).process(new
ProcessFunction<Row, Object>() {
@Override
public void processElement(Row value, Context ctx,
Collector<Object> out) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
System.err.println("row:" + value + ",rowtime:" +
value.getField(3) + ",watermark:" +
sdf.format(ctx.timerService().currentWatermark()));
}
});
bsTableEnv.execute("job");
}
}