Hi all, I am building an example with DataStream using Flink that has a fake source generator of LogLine(Date d, String line). I want to work with Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks. If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream I can group by second and concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I misunderstood something when I was reading about Event Time. Could anyone help me please? My source code is as follow.
Thanks for the ideas. Kind Regards, Felipe package flink.example.streaming; import flink.util.LogLine; import flink.util.LogSourceFunction; import flink.util.UtilDate; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import javax.annotation.Nullable; import java.util.Date; public class EventTimeStreamExampleJava { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<LogLine> dataStream = env .addSource(new LogSourceFunction()) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) .keyBy(lineLog -> lineLog.getSec()) // .timeWindow(Time.seconds(2)) .reduce((log1, log2) -> new LogLine(log1.getTime(), log1.getLine() + " | " + log2.getLine())) ; dataStream.print(); env.execute("Window LogRead"); } public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<LogLine> { private final long maxOutOfOrderness = 3500; // 3.5 seconds private long currentMaxTimestamp; @Override public long extractTimestamp(LogLine element, long previousElementTimestamp) { long timestamp = element.getTime().getSeconds(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Nullable @Override public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the out-of-orderness bound return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } } package flink.util; import org.apache.flink.streaming.api.functions.source.SourceFunction; public class LogSourceFunction implements SourceFunction<LogLine> { private volatile boolean isRunning = true; @Override public void run(SourceContext<LogLine> ctx) throws Exception { while (isRunning) { ctx.collect(new LogLine(UtilDate.getRandomSec(), UtilDate.getRandomString())); } } @Override public void cancel() { isRunning = false; } } package flink.util; import java.util.Date; import java.util.Objects; public class LogLine { private Date time; private int sec; private String line; public LogLine() { } public LogLine(Date time, String line) { this.sec = time.getSeconds(); this.time = time; this.line = line; } public LogLine(int sec, String line) { this.sec = sec; this.time = UtilDate.getRandomDate(sec); this.line = line; } public int getSec() { return sec; } public void setSec(int sec) { this.sec = sec; } public Date getTime() { return time; } public String getLine() { return line; } public void setTime(Date time) { this.time = time; } public void setLine(String line) { this.line = line; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; LogLine logLine = (LogLine) o; return Objects.equals(time, logLine.time) && Objects.equals(sec, logLine.sec) && Objects.equals(line, logLine.line); } @Override public int hashCode() { return Objects.hash(time, sec, line); } @Override public String toString() { return "LogLine{" + "time=" + time + ", sec=" + sec + ", line='" + line + '}'; } } -- *---- Felipe Oliveira Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>*