KeySelector was exactly what I need. Thank you a lot.
I modified my code in this way and now it works:

DataStream<Harness.KafkaRecord> LCxAccStream = env
                                .addSource(new FlinkKafkaConsumer010<>("LCacc", 
new
CustomDeserializer(), properties)).setParallelism(4)
                                .assignTimestampsAndWatermarks(new
CustomTimestampExtractor()).setParallelism(4)
                                .map(new MapFunction<Tuple8&lt;String, String, 
Date, String, String,
Double, Double, Double>, Harness.KafkaRecord>(){ 

                                        @Override 
                                        public Harness.KafkaRecord 
map(Tuple8<String, String, Date, String,
String, Double, Double, Double> value) throws Exception { 
                                                return new 
Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3,
value.f4, value.f5); 
                                        } 
                                }).setParallelism(4)
                                .keyBy(new KeySelector<Harness.KafkaRecord, 
String>() {
                                     public String getKey(Harness.KafkaRecord 
record) { return
record.key; }
                                   })
                                
.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                .apply(new WindowFunction<Harness.KafkaRecord, 
Harness.KafkaRecord,
String, TimeWindow>() {
                                        
                                        public void apply(String key, 
                                                        TimeWindow window, 
                                                        
Iterable<Harness.KafkaRecord> input, 
                                                        
Collector<Harness.KafkaRecord> out)
                                                        throws Exception {
                                                
                                                 ArrayList<Harness.KafkaRecord> 
list = new
ArrayList<Harness.KafkaRecord>();
                                                 
                                                 for (Harness.KafkaRecord in: 
input) 
                                                         list.add(in);
                                                 Collections.sort(list);        
                                                 for(Harness.KafkaRecord 
output: list)
                                                         out.collect(output);
                                        }
                                });



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to