Thanks for the correction! :-) 2017-11-13 13:05 GMT+01:00 Kien Truong <duckientru...@gmail.com>:
> Getting late elements from side-output is already available with Flink 1.3 > :) > > Regards, > > Kien > On 11/13/2017 5:00 PM, Fabian Hueske wrote: > > Hi Andrea, > > you are right. Flink's window operators can drop messages which are too > late, i.e., have a timestamp smaller than the last watermark. > This is expected behavior and documented at several places [1] [2]. > > There are a couple of options how to deal with late elements: > > 1. Use more conservative watermarks. This will add latency to your program > 2. Configure an allowedLateness parameter for windows but have to be able > to handle respective updates. [2] > 3. Use side outputs on windows (will become available with Flink 1.4) [3] > > Cheers, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/event_time.html#late-elements > [2] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/windows.html#allowed-lateness > [3] https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/stream/operators/windows.html#getting-late-data-as-a-side- > output > > 2017-11-12 21:29 GMT+01:00 AndreaKinn <kinn6...@hotmail.it>: > >> Hi, >> I'm running a Flink application where data are retrieved from a Kafka >> broker >> and forwarded to a Cassandra sink. >> I've implemented the following watermark emitter: >> >> public class CustomTimestampExtractor implements >> AssignerWithPeriodicWatermarks<Tuple8<String, String, Date, String, >> String, Double, Double, Double>>{ >> >> private final long maxOutOfOrderness = 800; >> private long currentMaxTimestamp; >> >> @Override >> public long extractTimestamp(Tuple8<String, String, Date, >> String, String, >> Double, Double, Double> element, long previousElementTimestamp) { >> long timestamp = element.f2.getTime(); >> currentMaxTimestamp = Math.max(timestamp, >> currentMaxTimestamp); >> return timestamp; >> } >> >> @Override >> public Watermark getCurrentWatermark() { >> return new Watermark(currentMaxTimestamp - >> maxOutOfOrderness); >> } >> } >> >> While I have implemented a record reordering in windows on event time >> basis: >> >> ... >> .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE))) >> .apply(new >> WindowFunction<Harness.KafkaRecord, >> Harness.KafkaRecord, >> String, TimeWindow>() { >> >> public void apply(String key, >> TimeWindow window, >> >> Iterable<Harness.KafkaRecord> input, >> >> Collector<Harness.KafkaRecord> out) >> >> throws Exception { >> >> >> ArrayList<Harness.KafkaRecord> list = new >> ArrayList<Harness.KafkaRecord>(); >> >> for (Harness.KafkaRecord >> in: input) >> list.add(in); >> Collections.sort(list); >> for(Harness.KafkaRecord >> output: list) >> >> out.collect(output); >> } >> }); >> >> Unfortunately when I check Cassandra's destination table size I note that >> some messages are lost. >> >> Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to >> see lower loss percentage with the lower ingestion frequency, instead it >> is >> the opposite!! >> >> P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss >> percentage: >> >> 50 Hz: 0.273% >> 25 Hz: 0.284% >> 15 Hz: 0.302% >> >> My suspect is that the data are lost because they arrive with a too high >> lateness and they are dropped by Flink. Is it a possibility? >> >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > >