Getting late elements from side-output is already available with Flink 1.3 :)

Regards,

Kien

On 11/13/2017 5:00 PM, Fabian Hueske wrote:
Hi Andrea,

you are right. Flink's window operators can drop messages which are too late, i.e., have a timestamp smaller than the last watermark.
This is expected behavior and documented at several places [1] [2].

There are a couple of options how to deal with late elements:

1. Use more conservative watermarks. This will add latency to your program
2. Configure an allowedLateness parameter for windows but have to be able to handle respective updates. [2]
3. Use side outputs on windows (will become available with Flink 1.4) [3]

Cheers, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#late-elements [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness [3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

2017-11-12 21:29 GMT+01:00 AndreaKinn <kinn6...@hotmail.it <mailto:kinn6...@hotmail.it>>:

    Hi,
    I'm running a Flink application where data are retrieved from a
    Kafka broker
    and forwarded to a Cassandra sink.
    I've implemented the following watermark emitter:

    public class CustomTimestampExtractor implements
    AssignerWithPeriodicWatermarks<Tuple8&lt;String, String, Date, String,
    String, Double, Double, Double>>{

        private final long maxOutOfOrderness = 800;
        private long currentMaxTimestamp;

            @Override
            public long extractTimestamp(Tuple8<String, String, Date,
    String, String,
    Double, Double, Double> element, long previousElementTimestamp) {
                    long timestamp = element.f2.getTime();
                    currentMaxTimestamp = Math.max(timestamp,
    currentMaxTimestamp);
                    return timestamp;
            }

            @Override
            public Watermark getCurrentWatermark() {
                    return new Watermark(currentMaxTimestamp -
    maxOutOfOrderness);
            }
    }

    While I have implemented a record reordering in windows on event
    time basis:

    ...
    .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                    .apply(new
    WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
    String, TimeWindow>() {

                                            public void apply(String key,
    TimeWindow window,
    Iterable<Harness.KafkaRecord> input,
    Collector<Harness.KafkaRecord> out)
                throws Exception {

    ArrayList<Harness.KafkaRecord> list = new
    ArrayList<Harness.KafkaRecord>();

                                                    for
    (Harness.KafkaRecord in: input)
    list.add(in);
    Collections.sort(list);
    for(Harness.KafkaRecord output: list)
    out.collect(output);
                                            }
                                    });

    Unfortunately when I check Cassandra's destination table size I
    note that
    some messages are lost.

    Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I
    expected to
    see lower loss percentage with the lower ingestion frequency,
    instead it is
    the opposite!!

    P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the
    loss
    percentage:

    50 Hz: 0.273%
    25 Hz: 0.284%
    15 Hz: 0.302%

    My suspect is that the data are lost because they arrive with a
    too high
    lateness and they are dropped by Flink. Is it a possibility?




    --
    Sent from:
    http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
    <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>


Reply via email to