Hi Andrea,

you are right. Flink's window operators can drop messages which are too
late, i.e., have a timestamp smaller than the last watermark.
This is expected behavior and documented at several places [1] [2].

There are a couple of options how to deal with late elements:

1. Use more conservative watermarks. This will add latency to your program
2. Configure an allowedLateness parameter for windows but have to be able
to handle respective updates. [2]
3. Use side outputs on windows (will become available with Flink 1.4) [3]

Cheers, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#late-elements
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

2017-11-12 21:29 GMT+01:00 AndreaKinn <kinn6...@hotmail.it>:

> Hi,
> I'm running a Flink application where data are retrieved from a Kafka
> broker
> and forwarded to a Cassandra sink.
> I've implemented the following watermark emitter:
>
> public class CustomTimestampExtractor implements
> AssignerWithPeriodicWatermarks<Tuple8&lt;String, String, Date, String,
> String, Double, Double, Double>>{
>
>     private final long maxOutOfOrderness = 800;
>     private long currentMaxTimestamp;
>
>         @Override
>         public long extractTimestamp(Tuple8<String, String, Date, String,
> String,
> Double, Double, Double> element, long previousElementTimestamp) {
>                 long timestamp = element.f2.getTime();
>                 currentMaxTimestamp = Math.max(timestamp,
> currentMaxTimestamp);
>                 return timestamp;
>         }
>
>         @Override
>         public Watermark getCurrentWatermark() {
>                 return new Watermark(currentMaxTimestamp -
> maxOutOfOrderness);
>         }
> }
>
> While I have implemented a record reordering in windows on event time
> basis:
>
> ...
> .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
>                                 .apply(new WindowFunction<Harness.KafkaRecord,
> Harness.KafkaRecord,
> String, TimeWindow>() {
>
>                                         public void apply(String key,
>                                                         TimeWindow window,
>
> Iterable<Harness.KafkaRecord> input,
>
> Collector<Harness.KafkaRecord> out)
>
> throws Exception {
>
>
> ArrayList<Harness.KafkaRecord> list = new
> ArrayList<Harness.KafkaRecord>();
>
>                                                 for (Harness.KafkaRecord
> in: input)
>                                                         list.add(in);
>                                                 Collections.sort(list);
>                                                 for(Harness.KafkaRecord
> output: list)
>
> out.collect(output);
>                                         }
>                                 });
>
> Unfortunately when I check Cassandra's destination table size I note that
> some messages are lost.
>
> Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to
> see lower loss percentage with the lower ingestion frequency, instead it is
> the opposite!!
>
> P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss
> percentage:
>
> 50 Hz: 0.273%
> 25 Hz: 0.284%
> 15 Hz: 0.302%
>
> My suspect is that the data are lost because they arrive with a too high
> lateness and they are dropped by Flink. Is it a possibility?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to