Hi Till,

the mapper solution makes sense :)

Unfortunately, in my case it was not a typo in the path. I checked and saw
that the records are read.

You can find the whole program here:
https://github.com/FelixNeutatz/CluStream/blob/master/flink-java-project/src/main/java/org/apache/flink/clustream/StreamingJobIndex.java

I am happy for any ideas.

Best regards,
Felix

2016-11-07 16:15 GMT+01:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Felix,
>
> I'm not sure whether grouping/keyBy by processing time makes semantically
> any sense. This can be anything depending on the execution order.
> Therefore, there is not build in mechanism to group by processing time. But
> you can always write a mapper which assigns the current processing time to
> the stream record and use this field for grouping.
>
> Concerning your second problem, could you check the path of the file? At
> the moment Flink fails silently if the path is not valid. It might be that
> you have a simple typo in the path. I've opened an issue to fix this issue
> [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-5027
>
> Cheers,
> Till
>
>
>
>
>
> On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <neut...@googlemail.com>
> wrote:
>
>> Hi everybody,
>>
>> I finally reached streaming territory. For a student project I want to
>> implement CluStream for Flink. I guess this is especially interesting to
>> try queryable state :)
>>
>> But I have problems at the first steps. My input data is a csv file of
>> records. For the start I just want to window this csv. I don't want to use 
>> AllWindows
>> because it's not parallelizable.
>>
>> So my first question is: Can I window by processing time, like this:
>>
>> connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
>>
>> I didn't find a way, so I added in the csv an index column and tried to use 
>> a countWindow:
>>
>> DataStreamSource<String> source = env.readTextFile(file.getPath());
>>
>> DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new 
>> MapToVector()).setParallelism(4);
>>
>> connectionRecords.keyBy(0).countWindow(10).apply (
>>    new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, 
>> GlobalWindow>() {
>>       public void apply (Tuple tuple,
>>                      GlobalWindow window,
>>                      Iterable<Tuple2<Long, Vector>> values,
>>                      Collector<Tuple1<Integer>> out) throws Exception {
>>          int sum = 0;
>>          Iterator iterator = values.iterator();
>>          while (iterator.hasNext () ) {
>>             Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
>>             sum += 1;
>>          }
>>          out.collect (new Tuple1<Integer>(new Integer(sum)));
>>       }
>> }).writeAsCsv("text");
>>
>> To check whether everything works I just count the elements per window and 
>> write them into a csv file.
>>
>> Flink generates the files but all are empty. Can you tell me, what I did 
>> wrong?
>>
>> Best regards,
>>
>> Felix
>>
>>
>

Reply via email to