Getting late elements from side-output is already available with Flink
1.3 :)
Regards,
Kien
On 11/13/2017 5:00 PM, Fabian Hueske wrote:
Hi Andrea,
you are right. Flink's window operators can drop messages which are
too late, i.e., have a timestamp smaller than the last watermark.
This is expected behavior and documented at several places [1] [2].
There are a couple of options how to deal with late elements:
1. Use more conservative watermarks. This will add latency to your program
2. Configure an allowedLateness parameter for windows but have to be
able to handle respective updates. [2]
3. Use side outputs on windows (will become available with Flink 1.4) [3]
Cheers, Fabian
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#late-elements
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
2017-11-12 21:29 GMT+01:00 AndreaKinn <kinn6...@hotmail.it
<mailto:kinn6...@hotmail.it>>:
Hi,
I'm running a Flink application where data are retrieved from a
Kafka broker
and forwarded to a Cassandra sink.
I've implemented the following watermark emitter:
public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple8<String, String, Date, String,
String, Double, Double, Double>>{
private final long maxOutOfOrderness = 800;
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Tuple8<String, String, Date,
String, String,
Double, Double, Double> element, long previousElementTimestamp) {
long timestamp = element.f2.getTime();
currentMaxTimestamp = Math.max(timestamp,
currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp -
maxOutOfOrderness);
}
}
While I have implemented a record reordering in windows on event
time basis:
...
.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
.apply(new
WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {
public void apply(String key,
TimeWindow window,
Iterable<Harness.KafkaRecord> input,
Collector<Harness.KafkaRecord> out)
throws Exception {
ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();
for
(Harness.KafkaRecord in: input)
list.add(in);
Collections.sort(list);
for(Harness.KafkaRecord output: list)
out.collect(output);
}
});
Unfortunately when I check Cassandra's destination table size I
note that
some messages are lost.
Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I
expected to
see lower loss percentage with the lower ingestion frequency,
instead it is
the opposite!!
P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the
loss
percentage:
50 Hz: 0.273%
25 Hz: 0.284%
15 Hz: 0.302%
My suspect is that the data are lost because they arrive with a
too high
lateness and they are dropped by Flink. Is it a possibility?
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>