Hi Oliver

I think Alexey is right that you could not assume that the record would be
output in the event time order.
And there is a small addition.I see your output and there are actually
multiple concurrencies (probably 11 subtasks). You also can't expect these
concurrencies to be ordered according to event time.

Best,
Guowei


On Wed, Nov 3, 2021 at 6:46 AM Alexey Trenikhun <yen...@msn.com> wrote:

> Hi Oliver,
> I believe you also need to do sort, out of order ness watermark strategy
> only “postpone” watermark for given expected maximum of out of orderness.
> Check Ververica example -
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java
>
> Alexey
> ------------------------------
> *From:* Oliver Moser <olz...@gmail.com>
> *Sent:* Tuesday, November 2, 2021 12:52:59 PM
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Question on BoundedOutOfOrderness
>
> Hi!
>
> I am investigating the use of Flink for a new project and started some
> simple demos.
>
> Currently I am stuck at the point where I need to deal with events
> arriving out of order based on their event time. I’ve spent quite some time
> researching on SO, the docs, the Ververica training (excellent resource
> btw), however, I assume I still run into some conceptual misconceptions :-)
>
> I put together the following demo code, and I would expect that the
> console output would list the events chronologically based on their
> embedded event time. However, events are always printed in the same order
> as they are pushed into the data stream by the OutOfOrderEventSource.
>
> Sample console output:
>
> —
> 3> EventRecord{id=3, counter=1, eventId=0,
> timestamp=2021-11-02T20:10:01.554}
> 4> EventRecord{id=2, counter=2, eventId=1,
> timestamp=2021-11-02T20:10:02.810}
> 5> EventRecord{id=0, counter=3, eventId=0,
> timestamp=2021-11-02T20:09:59.815}
> 6> EventRecord{id=4, counter=4, eventId=1,
> timestamp=2021-11-02T20:10:00.815}
> 7> EventRecord{id=1, counter=5, eventId=1,
> timestamp=2021-11-02T20:10:05.819}
> 8> EventRecord{id=4, counter=6, eventId=0,
> timestamp=2021-11-02T20:10:04.819}
> 9> EventRecord{id=0, counter=7, eventId=1,
> timestamp=2021-11-02T20:10:03.824}
> 10> EventRecord{id=0, counter=8, eventId=1,
> timestamp=2021-11-02T20:10:05.828}
> 11> EventRecord{id=3, counter=9, eventId=1,
> timestamp=2021-11-02T20:10:09.829}
> —
>
> My expectation would be to receive the events ordered:
>
> —
> 5> EventRecord{id=0, counter=3, eventId=0,
> timestamp=2021-11-02T20:09:59.815}
> 6> EventRecord{id=4, counter=4, eventId=1,
> timestamp=2021-11-02T20:10:00.815}
> 3> EventRecord{id=3, counter=1, eventId=0,
> timestamp=2021-11-02T20:10:01.554}
> 4> EventRecord{id=2, counter=2, eventId=1,
> timestamp=2021-11-02T20:10:02.810}
> 9> EventRecord{id=0, counter=7, eventId=1,
> timestamp=2021-11-02T20:10:03.824}
> …
> —
>
>
> Given a BoundedOutOfOrderness watermarking strategy with a 20 seconds
> duration, my expectation would have been that for the first event that is
> pushed to the demo source
>
> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
>
> this would set the initial watermark to "2021-11-02T20:09:41.554”, hence
> events that are older than this timestamp are not considered, but events
> younger than this timestamps are considered and ordering of events happens
> accordingly. That would bean that
>
> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
>
> would still be considered on time.
>
> I’m sure I am missing something conceptually.
>
> Here is the code that I’m using:
>
> ---
>
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>
> import java.time.Duration;
> import java.time.LocalDateTime;
> import java.time.ZoneOffset;
> import java.time.temporal.ChronoUnit;
> import java.util.Random;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import static java.time.Instant.ofEpochMilli;
> import static java.time.LocalDateTime.ofInstant;
>
> public class SimpleOutOfOrderDemo {
>
>    public static void main(String... args) throws Exception {
>       var env = StreamExecutionEnvironment.getExecutionEnvironment();
>       var watermarkStrategy =
>          WatermarkStrategy.<EventRecord>forGenerator(
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
>             .withTimestampAssigner((e, rt) -> e.timestamp);
>
>       var source = new OutOfOrderEventSource();
>
> env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy).print();
>       env.execute("Simple Out of Order Demo");
>    }
>
>    public static class OutOfOrderEventSource implements
> SourceFunction<EventRecord> {
>
>       static final int MAX_ELEMENTS = 10;
>
>       static final long INTERVAL = 1000;
>
>       AtomicInteger counter = new AtomicInteger();
>
>       @Override
>       public void run(SourceFunction.SourceContext<EventRecord> ctx)
> throws Exception {
>          var c = counter.incrementAndGet();
>          while (c < MAX_ELEMENTS) {
>             var id = new Random().nextInt(5);
>             var eventId = new Random().nextInt(2);
>             var delay = new Random().nextInt(5);
>             var dateTime = LocalDateTime.now().minus(Duration.of(delay,
> ChronoUnit.SECONDS));
>             var record = new EventRecord(id, eventId,
> dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), c);
>             ctx.collect(record);
>             c = counter.incrementAndGet();
>             Thread.sleep(INTERVAL);
>          }
>       }
>
>       @Override
>       public void cancel() {
>          counter.set(MAX_ELEMENTS);
>       }
>    }
>
>    public static class EventRecord {
>
>       public int id;
>       public int eventId;
>       public long timestamp;
>       public long counter;
>
>       public EventRecord(int id, int eventId, long timestamp, long
> counter) {
>          this.id = id;
>          this.eventId = eventId;
>          this.timestamp = timestamp;
>          this.counter = counter;
>       }
>
>       @Override
>       public String toString() {
>          final StringBuffer sb = new StringBuffer("EventRecord{");
>          sb.append("id=").append(id);
>          sb.append(", counter=").append(counter);
>          sb.append(", eventId=").append(eventId);
>          sb.append(",
> timestamp=").append(ofInstant(ofEpochMilli(timestamp), ZoneOffset.UTC));
>          sb.append('}');
>          return sb.toString();
>       }
>    }
> }
>
> —
> POM:
>
> —
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance";
>        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/maven-v4_0_0.xsd";>
>    <modelVersion>4.0.0</modelVersion>
>    <groupId>com.example.app</groupId>
>    <artifactId>event-hubs-kafka-flink-consumer</artifactId>
>    <packaging>jar</packaging>
>    <version>1.0-SNAPSHOT</version>
>    <properties>
>       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>
> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
>       <flink.version>1.14.0</flink.version>
>       <scala.binary.version>2.11</scala.binary.version>
>    </properties>
>    <dependencies>
>       <dependency>
>          <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>          <version>${flink.version}</version>
>       </dependency>
>       <dependency>
>          <groupId>org.apache.flink</groupId>
>          <artifactId>flink-clients_${scala.binary.version}</artifactId>
>          <version>${flink.version}</version>
>       </dependency>
>    </dependencies>
>    <build>
>       <defaultGoal>install</defaultGoal>
>       <plugins>
>          <plugin>
>             <groupId>org.apache.maven.plugins</groupId>
>             <artifactId>maven-compiler-plugin</artifactId>
>             <version>3.6.1</version>
>             <configuration>
>                <source>11</source>
>                <target>11</target>
>             </configuration>
>          </plugin>
>          <plugin>
>             <groupId>org.apache.maven.plugins</groupId>
>             <artifactId>maven-resources-plugin</artifactId>
>             <version>3.0.2</version>
>             <configuration>
>                <encoding>UTF-8</encoding>
>             </configuration>
>          </plugin>
>       </plugins>
>    </build>
> </project>
> —
>
>
> Would be cool if someone could point me in the right direction. Really
> great project!
>
> Thanks
>
> Oliver
>
>
>

Reply via email to