Hi Felix,

I'm not sure whether grouping/keyBy by processing time makes semantically
any sense. This can be anything depending on the execution order.
Therefore, there is not build in mechanism to group by processing time. But
you can always write a mapper which assigns the current processing time to
the stream record and use this field for grouping.

Concerning your second problem, could you check the path of the file? At
the moment Flink fails silently if the path is not valid. It might be that
you have a simple typo in the path. I've opened an issue to fix this issue
[1].

[1] https://issues.apache.org/jira/browse/FLINK-5027

Cheers,
Till





On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <neut...@googlemail.com>
wrote:

> Hi everybody,
>
> I finally reached streaming territory. For a student project I want to
> implement CluStream for Flink. I guess this is especially interesting to
> try queryable state :)
>
> But I have problems at the first steps. My input data is a csv file of
> records. For the start I just want to window this csv. I don't want to use 
> AllWindows
> because it's not parallelizable.
>
> So my first question is: Can I window by processing time, like this:
>
> connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
>
> I didn't find a way, so I added in the csv an index column and tried to use a 
> countWindow:
>
> DataStreamSource<String> source = env.readTextFile(file.getPath());
>
> DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new 
> MapToVector()).setParallelism(4);
>
> connectionRecords.keyBy(0).countWindow(10).apply (
>    new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, 
> GlobalWindow>() {
>       public void apply (Tuple tuple,
>                      GlobalWindow window,
>                      Iterable<Tuple2<Long, Vector>> values,
>                      Collector<Tuple1<Integer>> out) throws Exception {
>          int sum = 0;
>          Iterator iterator = values.iterator();
>          while (iterator.hasNext () ) {
>             Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
>             sum += 1;
>          }
>          out.collect (new Tuple1<Integer>(new Integer(sum)));
>       }
> }).writeAsCsv("text");
>
> To check whether everything works I just count the elements per window and 
> write them into a csv file.
>
> Flink generates the files but all are empty. Can you tell me, what I did 
> wrong?
>
> Best regards,
>
> Felix
>
>

Reply via email to