Hi Oliver, I believe you also need to do sort, out of order ness watermark strategy only “postpone” watermark for given expected maximum of out of orderness. Check Ververica example - https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java
Alexey ________________________________ From: Oliver Moser <olz...@gmail.com> Sent: Tuesday, November 2, 2021 12:52:59 PM To: user@flink.apache.org <user@flink.apache.org> Subject: Question on BoundedOutOfOrderness Hi! I am investigating the use of Flink for a new project and started some simple demos. Currently I am stuck at the point where I need to deal with events arriving out of order based on their event time. I’ve spent quite some time researching on SO, the docs, the Ververica training (excellent resource btw), however, I assume I still run into some conceptual misconceptions :-) I put together the following demo code, and I would expect that the console output would list the events chronologically based on their embedded event time. However, events are always printed in the same order as they are pushed into the data stream by the OutOfOrderEventSource. Sample console output: — 3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554} 4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810} 5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815} 6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815} 7> EventRecord{id=1, counter=5, eventId=1, timestamp=2021-11-02T20:10:05.819} 8> EventRecord{id=4, counter=6, eventId=0, timestamp=2021-11-02T20:10:04.819} 9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824} 10> EventRecord{id=0, counter=8, eventId=1, timestamp=2021-11-02T20:10:05.828} 11> EventRecord{id=3, counter=9, eventId=1, timestamp=2021-11-02T20:10:09.829} — My expectation would be to receive the events ordered: — 5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815} 6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815} 3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554} 4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810} 9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824} … — Given a BoundedOutOfOrderness watermarking strategy with a 20 seconds duration, my expectation would have been that for the first event that is pushed to the demo source EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554} this would set the initial watermark to "2021-11-02T20:09:41.554”, hence events that are older than this timestamp are not considered, but events younger than this timestamps are considered and ordering of events happens accordingly. That would bean that EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815} would still be considered on time. I’m sure I am missing something conceptually. Here is the code that I’m using: --- import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import static java.time.Instant.ofEpochMilli; import static java.time.LocalDateTime.ofInstant; public class SimpleOutOfOrderDemo { public static void main(String... args) throws Exception { var env = StreamExecutionEnvironment.getExecutionEnvironment(); var watermarkStrategy = WatermarkStrategy.<EventRecord>forGenerator( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))) .withTimestampAssigner((e, rt) -> e.timestamp); var source = new OutOfOrderEventSource(); env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy).print(); env.execute("Simple Out of Order Demo"); } public static class OutOfOrderEventSource implements SourceFunction<EventRecord> { static final int MAX_ELEMENTS = 10; static final long INTERVAL = 1000; AtomicInteger counter = new AtomicInteger(); @Override public void run(SourceFunction.SourceContext<EventRecord> ctx) throws Exception { var c = counter.incrementAndGet(); while (c < MAX_ELEMENTS) { var id = new Random().nextInt(5); var eventId = new Random().nextInt(2); var delay = new Random().nextInt(5); var dateTime = LocalDateTime.now().minus(Duration.of(delay, ChronoUnit.SECONDS)); var record = new EventRecord(id, eventId, dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), c); ctx.collect(record); c = counter.incrementAndGet(); Thread.sleep(INTERVAL); } } @Override public void cancel() { counter.set(MAX_ELEMENTS); } } public static class EventRecord { public int id; public int eventId; public long timestamp; public long counter; public EventRecord(int id, int eventId, long timestamp, long counter) { this.id = id; this.eventId = eventId; this.timestamp = timestamp; this.counter = counter; } @Override public String toString() { final StringBuffer sb = new StringBuffer("EventRecord{"); sb.append("id=").append(id); sb.append(", counter=").append(counter); sb.append(", eventId=").append(eventId); sb.append(", timestamp=").append(ofInstant(ofEpochMilli(timestamp), ZoneOffset.UTC)); sb.append('}'); return sb.toString(); } } } — POM: — <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.app</groupId> <artifactId>event-hubs-kafka-flink-consumer</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <flink.version>1.14.0</flink.version> <scala.binary.version>2.11</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <defaultGoal>install</defaultGoal> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>11</source> <target>11</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> <configuration> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project> — Would be cool if someone could point me in the right direction. Really great project! Thanks Oliver