Hi Till, the mapper solution makes sense :)
Unfortunately, in my case it was not a typo in the path. I checked and saw that the records are read. You can find the whole program here: https://github.com/FelixNeutatz/CluStream/blob/master/flink-java-project/src/main/java/org/apache/flink/clustream/StreamingJobIndex.java I am happy for any ideas. Best regards, Felix 2016-11-07 16:15 GMT+01:00 Till Rohrmann <trohrm...@apache.org>: > Hi Felix, > > I'm not sure whether grouping/keyBy by processing time makes semantically > any sense. This can be anything depending on the execution order. > Therefore, there is not build in mechanism to group by processing time. But > you can always write a mapper which assigns the current processing time to > the stream record and use this field for grouping. > > Concerning your second problem, could you check the path of the file? At > the moment Flink fails silently if the path is not valid. It might be that > you have a simple typo in the path. I've opened an issue to fix this issue > [1]. > > [1] https://issues.apache.org/jira/browse/FLINK-5027 > > Cheers, > Till > > > > > > On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <neut...@googlemail.com> > wrote: > >> Hi everybody, >> >> I finally reached streaming territory. For a student project I want to >> implement CluStream for Flink. I guess this is especially interesting to >> try queryable state :) >> >> But I have problems at the first steps. My input data is a csv file of >> records. For the start I just want to window this csv. I don't want to use >> AllWindows >> because it's not parallelizable. >> >> So my first question is: Can I window by processing time, like this: >> >> connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L)) >> >> I didn't find a way, so I added in the csv an index column and tried to use >> a countWindow: >> >> DataStreamSource<String> source = env.readTextFile(file.getPath()); >> >> DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new >> MapToVector()).setParallelism(4); >> >> connectionRecords.keyBy(0).countWindow(10).apply ( >> new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, >> GlobalWindow>() { >> public void apply (Tuple tuple, >> GlobalWindow window, >> Iterable<Tuple2<Long, Vector>> values, >> Collector<Tuple1<Integer>> out) throws Exception { >> int sum = 0; >> Iterator iterator = values.iterator(); >> while (iterator.hasNext () ) { >> Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next(); >> sum += 1; >> } >> out.collect (new Tuple1<Integer>(new Integer(sum))); >> } >> }).writeAsCsv("text"); >> >> To check whether everything works I just count the elements per window and >> write them into a csv file. >> >> Flink generates the files but all are empty. Can you tell me, what I did >> wrong? >> >> Best regards, >> >> Felix >> >> >