Hi everybody, I finally reached streaming territory. For a student project I want to implement CluStream for Flink. I guess this is especially interesting to try queryable state :)
But I have problems at the first steps. My input data is a csv file of records. For the start I just want to window this csv. I don't want to use AllWindows because it's not parallelizable. So my first question is: Can I window by processing time, like this: connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L)) I didn't find a way, so I added in the csv an index column and tried to use a countWindow: DataStreamSource<String> source = env.readTextFile(file.getPath()); DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new MapToVector()).setParallelism(4); connectionRecords.keyBy(0).countWindow(10).apply ( new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, GlobalWindow>() { public void apply (Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, Vector>> values, Collector<Tuple1<Integer>> out) throws Exception { int sum = 0; Iterator iterator = values.iterator(); while (iterator.hasNext () ) { Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next(); sum += 1; } out.collect (new Tuple1<Integer>(new Integer(sum))); } }).writeAsCsv("text"); To check whether everything works I just count the elements per window and write them into a csv file. Flink generates the files but all are empty. Can you tell me, what I did wrong? Best regards, Felix