Thanks Guowei and Alexey, looking at the references you provided helped. I managed to put together simple examples using both the streaming Table API as well as using the CEP library, and I’m now able to process events in event time order. I believe to me my interpretation of setting up watermarks etc was slightly misleading me into wrong assumptions.
Thanks again! > On 03.11.2021, at 09:36, Guowei Ma <guowei....@gmail.com> wrote: > > Hi Oliver > > I think Alexey is right that you could not assume that the record would be > output in the event time order. > And there is a small addition.I see your output and there are actually > multiple concurrencies (probably 11 subtasks). You also can't expect these > concurrencies to be ordered according to event time. > > Best, > Guowei > > > On Wed, Nov 3, 2021 at 6:46 AM Alexey Trenikhun <yen...@msn.com > <mailto:yen...@msn.com>> wrote: > Hi Oliver, > I believe you also need to do sort, out of order ness watermark strategy only > “postpone” watermark for given expected maximum of out of orderness. Check > Ververica example - > https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java > > <https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java> > > Alexey > From: Oliver Moser <olz...@gmail.com <mailto:olz...@gmail.com>> > Sent: Tuesday, November 2, 2021 12:52:59 PM > To: user@flink.apache.org <mailto:user@flink.apache.org> > <user@flink.apache.org <mailto:user@flink.apache.org>> > Subject: Question on BoundedOutOfOrderness > > Hi! > > I am investigating the use of Flink for a new project and started some simple > demos. > > Currently I am stuck at the point where I need to deal with events arriving > out of order based on their event time. I’ve spent quite some time > researching on SO, the docs, the Ververica training (excellent resource btw), > however, I assume I still run into some conceptual misconceptions :-) > > I put together the following demo code, and I would expect that the console > output would list the events chronologically based on their embedded event > time. However, events are always printed in the same order as they are pushed > into the data stream by the OutOfOrderEventSource. > > Sample console output: > > — > 3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554} > 4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810} > 5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815} > 6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815} > 7> EventRecord{id=1, counter=5, eventId=1, timestamp=2021-11-02T20:10:05.819} > 8> EventRecord{id=4, counter=6, eventId=0, timestamp=2021-11-02T20:10:04.819} > 9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824} > 10> EventRecord{id=0, counter=8, eventId=1, timestamp=2021-11-02T20:10:05.828} > 11> EventRecord{id=3, counter=9, eventId=1, timestamp=2021-11-02T20:10:09.829} > — > > My expectation would be to receive the events ordered: > > — > 5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815} > 6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815} > 3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554} > 4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810} > 9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824} > … > — > > > Given a BoundedOutOfOrderness watermarking strategy with a 20 seconds > duration, my expectation would have been that for the first event that is > pushed to the demo source > > EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554} > > this would set the initial watermark to "2021-11-02T20:09:41.554”, hence > events that are older than this timestamp are not considered, but events > younger than this timestamps are considered and ordering of events happens > accordingly. That would bean that > > EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815} > > would still be considered on time. > > I’m sure I am missing something conceptually. > > Here is the code that I’m using: > > --- > > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > > import java.time.Duration; > import java.time.LocalDateTime; > import java.time.ZoneOffset; > import java.time.temporal.ChronoUnit; > import java.util.Random; > import java.util.concurrent.atomic.AtomicInteger; > > import static java.time.Instant.ofEpochMilli; > import static java.time.LocalDateTime.ofInstant; > > public class SimpleOutOfOrderDemo { > > public static void main(String... args) throws Exception { > var env = StreamExecutionEnvironment.getExecutionEnvironment(); > var watermarkStrategy = > WatermarkStrategy.<EventRecord>forGenerator( > > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))) > .withTimestampAssigner((e, rt) -> e.timestamp); > > var source = new OutOfOrderEventSource(); > > env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy).print(); > env.execute("Simple Out of Order Demo"); > } > > public static class OutOfOrderEventSource implements > SourceFunction<EventRecord> { > > static final int MAX_ELEMENTS = 10; > > static final long INTERVAL = 1000; > > AtomicInteger counter = new AtomicInteger(); > > @Override > public void run(SourceFunction.SourceContext<EventRecord> ctx) throws > Exception { > var c = counter.incrementAndGet(); > while (c < MAX_ELEMENTS) { > var id = new Random().nextInt(5); > var eventId = new Random().nextInt(2); > var delay = new Random().nextInt(5); > var dateTime = LocalDateTime.now().minus(Duration.of(delay, > ChronoUnit.SECONDS)); > var record = new EventRecord(id, eventId, > dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), c); > ctx.collect(record); > c = counter.incrementAndGet(); > Thread.sleep(INTERVAL); > } > } > > @Override > public void cancel() { > counter.set(MAX_ELEMENTS); > } > } > > public static class EventRecord { > > public int id; > public int eventId; > public long timestamp; > public long counter; > > public EventRecord(int id, int eventId, long timestamp, long counter) { > this.id <http://this.id/> = id; > this.eventId = eventId; > this.timestamp = timestamp; > this.counter = counter; > } > > @Override > public String toString() { > final StringBuffer sb = new StringBuffer("EventRecord{"); > sb.append("id=").append(id); > sb.append(", counter=").append(counter); > sb.append(", eventId=").append(eventId); > sb.append(", timestamp=").append(ofInstant(ofEpochMilli(timestamp), > ZoneOffset.UTC)); > sb.append('}'); > return sb.toString(); > } > } > } > > — > POM: > > — > <?xml version="1.0" encoding="UTF-8"?> > <project xmlns="http://maven.apache.org/POM/4.0.0 > <http://maven.apache.org/POM/4.0.0>" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance > <http://www.w3.org/2001/XMLSchema-instance>" > xsi:schemaLocation=" <>http://maven.apache.org/POM/4.0.0 > <http://maven.apache.org/POM/4.0.0> > http://maven.apache.org/maven-v4_0_0.xsd"> > <http://maven.apache.org/maven-v4_0_0.xsd%22%3E> > <modelVersion>4.0.0</modelVersion> > <groupId>com.example.app</groupId> > <artifactId>event-hubs-kafka-flink-consumer</artifactId> > <packaging>jar</packaging> > <version>1.0-SNAPSHOT</version> > <properties> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > > <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> > <flink.version>1.14.0</flink.version> > <scala.binary.version>2.11</scala.binary.version> > </properties> > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > </dependencies> > <build> > <defaultGoal>install</defaultGoal> > <plugins> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-compiler-plugin</artifactId> > <version>3.6.1</version> > <configuration> > <source>11</source> > <target>11</target> > </configuration> > </plugin> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-resources-plugin</artifactId> > <version>3.0.2</version> > <configuration> > <encoding>UTF-8</encoding> > </configuration> > </plugin> > </plugins> > </build> > </project> > — > > > Would be cool if someone could point me in the right direction. Really great > project! > > Thanks > > Oliver > >