Hi, 
I'm running a Flink application where data are retrieved from a Kafka broker
and forwarded to a Cassandra sink.
I've implemented the following watermark emitter:

public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple8&lt;String, String, Date, String,
String, Double, Double, Double>>{

    private final long maxOutOfOrderness = 800;
    private long currentMaxTimestamp;
    
        @Override
        public long extractTimestamp(Tuple8<String, String, Date, String, 
String,
Double, Double, Double> element, long previousElementTimestamp) {
                long timestamp = element.f2.getTime();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
}

While I have implemented a record reordering in windows on event time basis:

... 
.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                .apply(new WindowFunction<Harness.KafkaRecord, 
Harness.KafkaRecord,
String, TimeWindow>() {

                                        public void apply(String key, 
                                                        TimeWindow window, 
                                                        
Iterable<Harness.KafkaRecord> input, 
                                                        
Collector<Harness.KafkaRecord> out)
                                                                        throws 
Exception {

                                                ArrayList<Harness.KafkaRecord> 
list = new
ArrayList<Harness.KafkaRecord>();

                                                for (Harness.KafkaRecord in: 
input) 
                                                        list.add(in);
                                                Collections.sort(list); 
                                                for(Harness.KafkaRecord output: 
list)
                                                        out.collect(output);
                                        }
                                });

Unfortunately when I check Cassandra's destination table size I note that
some messages are lost.

Performing 3 tests I have ingested data at 50, 25 and 15 Hz. I expected to
see lower loss percentage with the lower ingestion frequency, instead it is
the opposite!!

P.S.: Kafka ingests 45.000 messages of 1Kb each one, following the loss
percentage:

50 Hz: 0.273%
25 Hz: 0.284%
15 Hz: 0.302%

My suspect is that the data are lost because they arrive with a too high
lateness and they are dropped by Flink. Is it a possibility?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to