Hi Oliver,
I believe you also need to do sort, out of order ness watermark strategy only 
“postpone” watermark for given expected maximum of out of orderness. Check 
Ververica example - 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java

Alexey
________________________________
From: Oliver Moser <olz...@gmail.com>
Sent: Tuesday, November 2, 2021 12:52:59 PM
To: user@flink.apache.org <user@flink.apache.org>
Subject: Question on BoundedOutOfOrderness

Hi!

I am investigating the use of Flink for a new project and started some simple 
demos.

Currently I am stuck at the point where I need to deal with events arriving out 
of order based on their event time. I’ve spent quite some time researching on 
SO, the docs, the Ververica training (excellent resource btw), however, I 
assume I still run into some conceptual misconceptions :-)

I put together the following demo code, and I would expect that the console 
output would list the events chronologically based on their embedded event 
time. However, events are always printed in the same order as they are pushed 
into the data stream by the OutOfOrderEventSource.

Sample console output:

—
3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810}
5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815}
7> EventRecord{id=1, counter=5, eventId=1, timestamp=2021-11-02T20:10:05.819}
8> EventRecord{id=4, counter=6, eventId=0, timestamp=2021-11-02T20:10:04.819}
9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824}
10> EventRecord{id=0, counter=8, eventId=1, timestamp=2021-11-02T20:10:05.828}
11> EventRecord{id=3, counter=9, eventId=1, timestamp=2021-11-02T20:10:09.829}
—

My expectation would be to receive the events ordered:

—
5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815}
3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810}
9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824}
…
—


Given a BoundedOutOfOrderness watermarking strategy with a 20 seconds duration, 
my expectation would have been that for the first event that is pushed to the 
demo source

EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}

this would set the initial watermark to "2021-11-02T20:09:41.554”, hence events 
that are older than this timestamp are not considered, but events younger than 
this timestamps are considered and ordering of events happens accordingly. That 
would bean that

EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}

would still be considered on time.

I’m sure I am missing something conceptually.

Here is the code that I’m using:

---

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import static java.time.Instant.ofEpochMilli;
import static java.time.LocalDateTime.ofInstant;

public class SimpleOutOfOrderDemo {

   public static void main(String... args) throws Exception {
      var env = StreamExecutionEnvironment.getExecutionEnvironment();
      var watermarkStrategy =
         WatermarkStrategy.<EventRecord>forGenerator(
            WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
            .withTimestampAssigner((e, rt) -> e.timestamp);

      var source = new OutOfOrderEventSource();
      
env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy).print();
      env.execute("Simple Out of Order Demo");
   }

   public static class OutOfOrderEventSource implements 
SourceFunction<EventRecord> {

      static final int MAX_ELEMENTS = 10;

      static final long INTERVAL = 1000;

      AtomicInteger counter = new AtomicInteger();

      @Override
      public void run(SourceFunction.SourceContext<EventRecord> ctx) throws 
Exception {
         var c = counter.incrementAndGet();
         while (c < MAX_ELEMENTS) {
            var id = new Random().nextInt(5);
            var eventId = new Random().nextInt(2);
            var delay = new Random().nextInt(5);
            var dateTime = LocalDateTime.now().minus(Duration.of(delay, 
ChronoUnit.SECONDS));
            var record = new EventRecord(id, eventId, 
dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), c);
            ctx.collect(record);
            c = counter.incrementAndGet();
            Thread.sleep(INTERVAL);
         }
      }

      @Override
      public void cancel() {
         counter.set(MAX_ELEMENTS);
      }
   }

   public static class EventRecord {

      public int id;
      public int eventId;
      public long timestamp;
      public long counter;

      public EventRecord(int id, int eventId, long timestamp, long counter) {
         this.id = id;
         this.eventId = eventId;
         this.timestamp = timestamp;
         this.counter = counter;
      }

      @Override
      public String toString() {
         final StringBuffer sb = new StringBuffer("EventRecord{");
         sb.append("id=").append(id);
         sb.append(", counter=").append(counter);
         sb.append(", eventId=").append(eventId);
         sb.append(", timestamp=").append(ofInstant(ofEpochMilli(timestamp), 
ZoneOffset.UTC));
         sb.append('}');
         return sb.toString();
      }
   }
}

—
POM:

—
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.example.app</groupId>
   <artifactId>event-hubs-kafka-flink-consumer</artifactId>
   <packaging>jar</packaging>
   <version>1.0-SNAPSHOT</version>
   <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
      <flink.version>1.14.0</flink.version>
      <scala.binary.version>2.11</scala.binary.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
      </dependency>
      <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-clients_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
      </dependency>
   </dependencies>
   <build>
      <defaultGoal>install</defaultGoal>
      <plugins>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
               <source>11</source>
               <target>11</target>
            </configuration>
         </plugin>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-resources-plugin</artifactId>
            <version>3.0.2</version>
            <configuration>
               <encoding>UTF-8</encoding>
            </configuration>
         </plugin>
      </plugins>
   </build>
</project>
—


Would be cool if someone could point me in the right direction. Really great 
project!

Thanks

Oliver


Reply via email to