Thanks for the correction! :-)

2017-11-13 13:05 GMT+01:00 Kien Truong <duckientru...@gmail.com>:

> Getting late elements from side-output is already available with Flink 1.3
> :)
>
> Regards,
>
> Kien
> On 11/13/2017 5:00 PM, Fabian Hueske wrote:
>
> Hi Andrea,
>
> you are right. Flink's window operators can drop messages which are too
> late, i.e., have a timestamp smaller than the last watermark.
> This is expected behavior and documented at several places [1] [2].
>
> There are a couple of options how to deal with late elements:
>
> 1. Use more conservative watermarks. This will add latency to your program
> 2. Configure an allowedLateness parameter for windows but have to be able
> to handle respective updates. [2]
> 3. Use side outputs on windows (will become available with Flink 1.4) [3]
>
> Cheers, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/event_time.html#late-elements
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/windows.html#allowed-lateness
> [3] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/operators/windows.html#getting-late-data-as-a-side-
> output
>
> 2017-11-12 21:29 GMT+01:00 AndreaKinn <kinn6...@hotmail.it>:
>
>> Hi,
>> I'm running a Flink application where data are retrieved from a Kafka
>> broker
>> and forwarded to a Cassandra sink.
>> I've implemented the following watermark emitter:
>>
>> public class CustomTimestampExtractor implements
>> AssignerWithPeriodicWatermarks<Tuple8&lt;String, String, Date, String,
>> String, Double, Double, Double>>{
>>
>>     private final long maxOutOfOrderness = 800;
>>     private long currentMaxTimestamp;
>>
>>         @Override
>>         public long extractTimestamp(Tuple8<String, String, Date,
>> String, String,
>> Double, Double, Double> element, long previousElementTimestamp) {
>>                 long timestamp = element.f2.getTime();
>>                 currentMaxTimestamp = Math.max(timestamp,
>> currentMaxTimestamp);
>>                 return timestamp;
>>         }
>>
>>         @Override
>>         public Watermark getCurrentWatermark() {
>>                 return new Watermark(currentMaxTimestamp -
>> maxOutOfOrderness);
>>         }
>> }
>>
>> While I have implemented a record reordering in windows on event time
>> basis:
>>
>> ...
>> .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
>>                                 .apply(new 
>> WindowFunction<Harness.KafkaRecord,
>> Harness.KafkaRecord,
>> String, TimeWindow>() {
>>
>>                                         public void apply(String key,
>>                                                         TimeWindow window,
>>
>> Iterable<Harness.KafkaRecord> input,
>>
>> Collector<Harness.KafkaRecord> out)
>>
>> throws Exception {
>>
>>
>> ArrayList<Harness.KafkaRecord> list = new
>> ArrayList<Harness.KafkaRecord>();
>>
>>                                                 for (Harness.KafkaRecord
>> in: input)
>>                                                         list.add(in);
>>                                                 Collections.sort(list);
>>                                                 for(Harness.KafkaRecord
>> output: list)
>>
>> out.collect(output);
>>                                         }
>>                                 });
>>
>> Unfortunately when I check Cassandra's destination table size I note that
>> some messages are lost.
>>
>> Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to
>> see lower loss percentage with the lower ingestion frequency, instead it
>> is
>> the opposite!!
>>
>> P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss
>> percentage:
>>
>> 50 Hz: 0.273%
>> 25 Hz: 0.284%
>> 15 Hz: 0.302%
>>
>> My suspect is that the data are lost because they arrive with a too high
>> lateness and they are dropped by Flink. Is it a possibility?
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Reply via email to