Hi everybody,

I found a new problem. The algorithm I want to implement needs a global
ReducingState. What do I mean with that:
I want to calculate a local aggregation for each task and then combine all
these local aggregates to one global aggregate and push this global
aggregate to all nodes and continue processing the data stream. If you
don't understand my description, I also made some drawings of what I mean:
https://docs.google.com/presentation/d/13ei6pzhwNKqNShhdNWXqJaYCG1z0Hsrxfy5sRnqun5M/edit?usp=sharing

I found out that the ReducingState described here:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
in the CountWindowAverage example only has degree of parallelization = 1
and when I use more keys, I get a higher degree of parallelization, but no
global synchronization.

I am really new to streaming, so maybe I follow some bad assumptions. You
can also point me to some reading :)

Thank you for your help.

Best regards,
Felix

2016-11-08 10:17 GMT+01:00 Felix Neutatz <neut...@googlemail.com>:

> Hi Yassine,
>
> thanks that explains it :)
>
> Best regards,
> Felix
>
> On Nov 7, 2016 21:28, "Yassine MARZOUGUI" <y.marzou...@mindlytix.com>
> wrote:
>
>> Hi Flelix,
>>
>> As I see in kddcup.newtestdata_small_unlabeled_index
>> <https://github.com/FelixNeutatz/CluStream/blob/master/flink-java-project/src/main/resources/data/kddcup.newtestdata_small_unlabeled_index>,
>> the first field of connectionRecords (splits[0]), is unique for each
>> record, therefore when apply keyBy(0), it will logically partition your
>> stream by that field and each partition will contain only one element. So
>> the countWindow(2) actually never fires because it never reaches 2
>> elements. That's why your files stay empty.
>>
>> Could you please go into more detail about what the expected output is? Then
>> we might be able to figure out the proper way to achieve it.
>>
>> Best,
>> Yassine
>>
>> 2016-11-07 19:18 GMT+01:00 Felix Neutatz <neut...@googlemail.com>:
>>
>>> Hi Till,
>>>
>>> the mapper solution makes sense :)
>>>
>>> Unfortunately, in my case it was not a typo in the path. I checked and
>>> saw that the records are read.
>>>
>>> You can find the whole program here: https://github.com/Felix
>>> Neutatz/CluStream/blob/master/flink-java-project/src/main/ja
>>> va/org/apache/flink/clustream/StreamingJobIndex.java
>>>
>>> I am happy for any ideas.
>>>
>>> Best regards,
>>> Felix
>>>
>>> 2016-11-07 16:15 GMT+01:00 Till Rohrmann <trohrm...@apache.org>:
>>>
>>>> Hi Felix,
>>>>
>>>> I'm not sure whether grouping/keyBy by processing time makes
>>>> semantically any sense. This can be anything depending on the execution
>>>> order. Therefore, there is not build in mechanism to group by processing
>>>> time. But you can always write a mapper which assigns the current
>>>> processing time to the stream record and use this field for grouping.
>>>>
>>>> Concerning your second problem, could you check the path of the file?
>>>> At the moment Flink fails silently if the path is not valid. It might be
>>>> that you have a simple typo in the path. I've opened an issue to fix this
>>>> issue [1].
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-5027
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <neut...@googlemail.com>
>>>> wrote:
>>>>
>>>>> Hi everybody,
>>>>>
>>>>> I finally reached streaming territory. For a student project I want to
>>>>> implement CluStream for Flink. I guess this is especially interesting to
>>>>> try queryable state :)
>>>>>
>>>>> But I have problems at the first steps. My input data is a csv file of
>>>>> records. For the start I just want to window this csv. I don't want to 
>>>>> use AllWindows
>>>>> because it's not parallelizable.
>>>>>
>>>>> So my first question is: Can I window by processing time, like this:
>>>>>
>>>>> connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
>>>>>
>>>>> I didn't find a way, so I added in the csv an index column and tried to 
>>>>> use a countWindow:
>>>>>
>>>>> DataStreamSource<String> source = env.readTextFile(file.getPath());
>>>>>
>>>>> DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new 
>>>>> MapToVector()).setParallelism(4);
>>>>>
>>>>> connectionRecords.keyBy(0).countWindow(10).apply (
>>>>>    new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, 
>>>>> GlobalWindow>() {
>>>>>       public void apply (Tuple tuple,
>>>>>                      GlobalWindow window,
>>>>>                      Iterable<Tuple2<Long, Vector>> values,
>>>>>                      Collector<Tuple1<Integer>> out) throws Exception {
>>>>>          int sum = 0;
>>>>>          Iterator iterator = values.iterator();
>>>>>          while (iterator.hasNext () ) {
>>>>>             Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
>>>>>             sum += 1;
>>>>>          }
>>>>>          out.collect (new Tuple1<Integer>(new Integer(sum)));
>>>>>       }
>>>>> }).writeAsCsv("text");
>>>>>
>>>>> To check whether everything works I just count the elements per window 
>>>>> and write them into a csv file.
>>>>>
>>>>> Flink generates the files but all are empty. Can you tell me, what I did 
>>>>> wrong?
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Felix
>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to