Thanks Guowei and Alexey, looking at the references you provided helped. I 
managed to put together simple examples using both the streaming Table API as 
well as using the CEP library, and I’m now able to process events in event time 
order. I believe to me my interpretation of setting up watermarks etc was 
slightly misleading me into wrong assumptions.

Thanks again!

> On 03.11.2021, at 09:36, Guowei Ma <guowei....@gmail.com> wrote:
> 
> Hi Oliver
> 
> I think Alexey is right that you could not assume that the record would be 
> output in the event time order.
> And there is a small addition.I see your output and there are actually 
> multiple concurrencies (probably 11 subtasks). You also can't expect these 
> concurrencies to be ordered according to event time.
> 
> Best,
> Guowei
> 
> 
> On Wed, Nov 3, 2021 at 6:46 AM Alexey Trenikhun <yen...@msn.com 
> <mailto:yen...@msn.com>> wrote:
> Hi Oliver,
> I believe you also need to do sort, out of order ness watermark strategy only 
> “postpone” watermark for given expected maximum of out of orderness. Check 
> Ververica example - 
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java
>  
> <https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java>
> 
> Alexey
> From: Oliver Moser <olz...@gmail.com <mailto:olz...@gmail.com>>
> Sent: Tuesday, November 2, 2021 12:52:59 PM
> To: user@flink.apache.org <mailto:user@flink.apache.org> 
> <user@flink.apache.org <mailto:user@flink.apache.org>>
> Subject: Question on BoundedOutOfOrderness
>  
> Hi!
> 
> I am investigating the use of Flink for a new project and started some simple 
> demos. 
> 
> Currently I am stuck at the point where I need to deal with events arriving 
> out of order based on their event time. I’ve spent quite some time 
> researching on SO, the docs, the Ververica training (excellent resource btw), 
> however, I assume I still run into some conceptual misconceptions :-)
> 
> I put together the following demo code, and I would expect that the console 
> output would list the events chronologically based on their embedded event 
> time. However, events are always printed in the same order as they are pushed 
> into the data stream by the OutOfOrderEventSource.
> 
> Sample console output:
> 
> —
> 3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
> 4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810}
> 5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
> 6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815}
> 7> EventRecord{id=1, counter=5, eventId=1, timestamp=2021-11-02T20:10:05.819}
> 8> EventRecord{id=4, counter=6, eventId=0, timestamp=2021-11-02T20:10:04.819}
> 9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824}
> 10> EventRecord{id=0, counter=8, eventId=1, timestamp=2021-11-02T20:10:05.828}
> 11> EventRecord{id=3, counter=9, eventId=1, timestamp=2021-11-02T20:10:09.829}
> —
> 
> My expectation would be to receive the events ordered: 
> 
> —
> 5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
> 6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815}
> 3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
> 4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810}
> 9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824}
> …
> —
> 
> 
> Given a BoundedOutOfOrderness watermarking strategy with a 20 seconds 
> duration, my expectation would have been that for the first event that is 
> pushed to the demo source 
> 
> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
> 
> this would set the initial watermark to "2021-11-02T20:09:41.554”, hence 
> events that are older than this timestamp are not considered, but events 
> younger than this timestamps are considered and ordering of events happens 
> accordingly. That would bean that 
> 
> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
> 
> would still be considered on time.
> 
> I’m sure I am missing something conceptually.
> 
> Here is the code that I’m using:
> 
> ---
> 
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> 
> import java.time.Duration;
> import java.time.LocalDateTime;
> import java.time.ZoneOffset;
> import java.time.temporal.ChronoUnit;
> import java.util.Random;
> import java.util.concurrent.atomic.AtomicInteger;
> 
> import static java.time.Instant.ofEpochMilli;
> import static java.time.LocalDateTime.ofInstant;
> 
> public class SimpleOutOfOrderDemo {
> 
>    public static void main(String... args) throws Exception {
>       var env = StreamExecutionEnvironment.getExecutionEnvironment();
>       var watermarkStrategy =
>          WatermarkStrategy.<EventRecord>forGenerator(
>             
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
>             .withTimestampAssigner((e, rt) -> e.timestamp);
> 
>       var source = new OutOfOrderEventSource();
>       
> env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy).print();
>       env.execute("Simple Out of Order Demo");
>    }
> 
>    public static class OutOfOrderEventSource implements 
> SourceFunction<EventRecord> {
> 
>       static final int MAX_ELEMENTS = 10;
> 
>       static final long INTERVAL = 1000;
> 
>       AtomicInteger counter = new AtomicInteger();
> 
>       @Override
>       public void run(SourceFunction.SourceContext<EventRecord> ctx) throws 
> Exception {
>          var c = counter.incrementAndGet();
>          while (c < MAX_ELEMENTS) {
>             var id = new Random().nextInt(5);
>             var eventId = new Random().nextInt(2);
>             var delay = new Random().nextInt(5);
>             var dateTime = LocalDateTime.now().minus(Duration.of(delay, 
> ChronoUnit.SECONDS));
>             var record = new EventRecord(id, eventId, 
> dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), c);
>             ctx.collect(record);
>             c = counter.incrementAndGet();
>             Thread.sleep(INTERVAL);
>          }
>       }
> 
>       @Override
>       public void cancel() {
>          counter.set(MAX_ELEMENTS);
>       }
>    }
> 
>    public static class EventRecord {
> 
>       public int id;
>       public int eventId;
>       public long timestamp;
>       public long counter;
> 
>       public EventRecord(int id, int eventId, long timestamp, long counter) {
>          this.id <http://this.id/> = id;
>          this.eventId = eventId;
>          this.timestamp = timestamp;
>          this.counter = counter;
>       }
> 
>       @Override
>       public String toString() {
>          final StringBuffer sb = new StringBuffer("EventRecord{");
>          sb.append("id=").append(id);
>          sb.append(", counter=").append(counter);
>          sb.append(", eventId=").append(eventId);
>          sb.append(", timestamp=").append(ofInstant(ofEpochMilli(timestamp), 
> ZoneOffset.UTC));
>          sb.append('}');
>          return sb.toString();
>       }
>    }
> }
> 
> —
> POM:
> 
> —
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0 
> <http://maven.apache.org/POM/4.0.0>" 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance 
> <http://www.w3.org/2001/XMLSchema-instance>"
>        xsi:schemaLocation=" <>http://maven.apache.org/POM/4.0.0 
> <http://maven.apache.org/POM/4.0.0> 
> http://maven.apache.org/maven-v4_0_0.xsd";> 
> <http://maven.apache.org/maven-v4_0_0.xsd%22%3E>
>    <modelVersion>4.0.0</modelVersion>
>    <groupId>com.example.app</groupId>
>    <artifactId>event-hubs-kafka-flink-consumer</artifactId>
>    <packaging>jar</packaging>
>    <version>1.0-SNAPSHOT</version>
>    <properties>
>       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>       
> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
>       <flink.version>1.14.0</flink.version>
>       <scala.binary.version>2.11</scala.binary.version>
>    </properties>
>    <dependencies>
>       <dependency>
>          <groupId>org.apache.flink</groupId>
>          <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>          <version>${flink.version}</version>
>       </dependency>
>       <dependency>
>          <groupId>org.apache.flink</groupId>
>          <artifactId>flink-clients_${scala.binary.version}</artifactId>
>          <version>${flink.version}</version>
>       </dependency>
>    </dependencies>
>    <build>
>       <defaultGoal>install</defaultGoal>
>       <plugins>
>          <plugin>
>             <groupId>org.apache.maven.plugins</groupId>
>             <artifactId>maven-compiler-plugin</artifactId>
>             <version>3.6.1</version>
>             <configuration>
>                <source>11</source>
>                <target>11</target>
>             </configuration>
>          </plugin>
>          <plugin>
>             <groupId>org.apache.maven.plugins</groupId>
>             <artifactId>maven-resources-plugin</artifactId>
>             <version>3.0.2</version>
>             <configuration>
>                <encoding>UTF-8</encoding>
>             </configuration>
>          </plugin>
>       </plugins>
>    </build>
> </project>
> —
> 
> 
> Would be cool if someone could point me in the right direction. Really great 
> project!
> 
> Thanks
> 
> Oliver
> 
> 

Reply via email to