Hi Andrea, you are right. Flink's window operators can drop messages which are too late, i.e., have a timestamp smaller than the last watermark. This is expected behavior and documented at several places [1] [2].
There are a couple of options how to deal with late elements: 1. Use more conservative watermarks. This will add latency to your program 2. Configure an allowedLateness parameter for windows but have to be able to handle respective updates. [2] 3. Use side outputs on windows (will become available with Flink 1.4) [3] Cheers, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#late-elements [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness [3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#getting-late-data-as-a-side-output 2017-11-12 21:29 GMT+01:00 AndreaKinn <kinn6...@hotmail.it>: > Hi, > I'm running a Flink application where data are retrieved from a Kafka > broker > and forwarded to a Cassandra sink. > I've implemented the following watermark emitter: > > public class CustomTimestampExtractor implements > AssignerWithPeriodicWatermarks<Tuple8<String, String, Date, String, > String, Double, Double, Double>>{ > > private final long maxOutOfOrderness = 800; > private long currentMaxTimestamp; > > @Override > public long extractTimestamp(Tuple8<String, String, Date, String, > String, > Double, Double, Double> element, long previousElementTimestamp) { > long timestamp = element.f2.getTime(); > currentMaxTimestamp = Math.max(timestamp, > currentMaxTimestamp); > return timestamp; > } > > @Override > public Watermark getCurrentWatermark() { > return new Watermark(currentMaxTimestamp - > maxOutOfOrderness); > } > } > > While I have implemented a record reordering in windows on event time > basis: > > ... > .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE))) > .apply(new WindowFunction<Harness.KafkaRecord, > Harness.KafkaRecord, > String, TimeWindow>() { > > public void apply(String key, > TimeWindow window, > > Iterable<Harness.KafkaRecord> input, > > Collector<Harness.KafkaRecord> out) > > throws Exception { > > > ArrayList<Harness.KafkaRecord> list = new > ArrayList<Harness.KafkaRecord>(); > > for (Harness.KafkaRecord > in: input) > list.add(in); > Collections.sort(list); > for(Harness.KafkaRecord > output: list) > > out.collect(output); > } > }); > > Unfortunately when I check Cassandra's destination table size I note that > some messages are lost. > > Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to > see lower loss percentage with the lower ingestion frequency, instead it is > the opposite!! > > P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss > percentage: > > 50 Hz: 0.273% > 25 Hz: 0.284% > 15 Hz: 0.302% > > My suspect is that the data are lost because they arrive with a too high > lateness and they are dropped by Flink. Is it a possibility? > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >