Hi Andrea, AFAIK, `keyBy` function you used will wrap all keys you selected into `Tuple`. You can use `Tuple.f0` to get your key, whose type will be `String`. If you want the KeyedStream has String Type for its key, you can use `KeySelector` in keyBy function. [1] Hope this will help you.
Best Regards, Tony Wei [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector- 2017-10-15 7:00 GMT+08:00 AndreaKinn <kinn6...@hotmail.it>: > Hi all, > I'm trying to implement a time ordering inside a stream using window > function. Then my purposes is to order the element inside a tumbling > window. > > This is my code (written following the doc): > > DataStream<Harness.KafkaRecord> LCxAccStream = env > .addSource(new > FlinkKafkaConsumer010<>("LCacc", > new > CustomDeserializer(), properties)).setParallelism(4) > .assignTimestampsAndWatermarks(new > CustomTimestampExtractor()).setParallelism(4) > .map(new MapFunction<Tuple8<String, > String, Date, String, String, > Double, Double, Double>, Harness.KafkaRecord>(){ > > @Override > public Harness.KafkaRecord > map(Tuple8<String, String, Date, String, > String, Double, Double, Double> value) throws Exception { > return new > Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3, > value.f4, value.f5); > } > }).setParallelism(4) > .keyBy("key") > .window(TumblingEventTimeWindows.of( > Time.milliseconds(WINDOW_SIZE))) > .apply(new WindowFunction<Harness.KafkaRecord, > Harness.KafkaRecord, > String, TimeWindow>() { > > public void apply(String key, > TimeWindow window, > > Iterable<Harness.KafkaRecord> input, > > Collector<Harness.KafkaRecord> out) > throws Exception { > > > ArrayList<Harness.KafkaRecord> list = new > ArrayList<Harness.KafkaRecord>(); > > for (Harness.KafkaRecord > in: input) > list.add(in); > Collections.sort(list); > for(Harness.KafkaRecord > output: list) > > out.collect(output); > } > }); > > Clearly I have defined a comparator for Harness.KafkaRecord object. > Unfortunately the method .apply(...) shows the following error: > > /The method apply(WindowFunction<Harness.KafkaRecord,R,Tuple,TimeWindow>) > in > the type WindowedStream<Harness.KafkaRecord,Tuple,TimeWindow> is not > applicable for the arguments (new > WindowFunction<Harness.KafkaRecord,Harness.KafkaRecord,String,TimeWindow> > (){}) > / > > Honestly I don't understand why I can't use String instead of Tuple. Btw my > key type is a String and moreover I can't understand what could means the > type Tuple in this case. > > Furthermore I noted that in the example here: WindowFunction - The Generic > Case > <https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/windows.html#keyed-vs-non-keyed-windows> > it use a String type as key of the KeyedStream. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >