Thanks for the help!  I’ll try out the ProcessFunction then.

Le

> On Oct 26, 2017, at 8:03 AM, Kien Truong <duckientru...@gmail.com> wrote:
> 
> Hi,
> For Streaming API, use a ProcessFunction as Fabian's suggestion. 
> You can pretty much do anything with a ProcessFunction :)
> 
> Best regards,
> 
> Kien
> 
> 
> On 10/26/2017 8:01 PM, Le Xu wrote:
>> Hi Kien:
>> 
>> Is there a similar API for DataStream as well?
>> 
>> Thanks!
>> 
>> Le
>> 
>> 
>>> On Oct 26, 2017, at 7:58 AM, Kien Truong <duckientru...@gmail.com 
>>> <mailto:duckientru...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> For batch API, you can use GroupReduceFunction, which give you the same 
>>> benefit as a MapReduce combiner.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions>Regards,
>>> Kien
>>> 
>>> 
>>> On 10/26/2017 7:37 PM, Le Xu wrote:
>>>> Thanks guys! That makes more sense now. 
>>>> 
>>>> So does it mean once I start use a window operator, all operations on my 
>>>> WindowedStream                     would be global (across all 
>>>> partitions)? In that case, WindowedStream.aggregate (or sum) would apply 
>>>> to all data after shuffling instead of each partition. 
>>>> 
>>>> If I understand this correctly, once I want to perform some sort of 
>>>> counting within each partition for different words (such as word count), I 
>>>> should really avoid using keyBy but keep some sort of counting map for 
>>>> each word while also keep track of the current time stamp, inside each 
>>>> mapper.
>>>> 
>>>> Le
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhue...@gmail.com 
>>>>> <mailto:fhue...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> in a MapReduce context, combiners are used to reduce the amount of data 
>>>>> 1) to shuffle and fully sort (to group the data by key) and 2) to reduce 
>>>>> the impact of skewed data.
>>>>> 
>>>>> The question is, why do you need a combiner in your use case.
>>>>> - To reduce the data to shuffle: You should not use a window operator to 
>>>>> preaggregate because keyBy implies a shuffle. Instead you could implement 
>>>>> a ProcessFunction with operator state. In this solution you need to 
>>>>> implement the windowing logic yourself, i.e., group data in window based 
>>>>> on their timestamp. Ensure you don't run out of memory (operator state is 
>>>>> kept on the heap), etc. So this solution needs quite a bit of manual 
>>>>> tuning.
>>>>> - To reduce the impact of skewed data: You can use a window aggregation 
>>>>> if you don't mind the shuffle. However, you should add an additional 
>>>>> artificial key attribute to spread out the computation of the same 
>>>>> original key to more grouping key. Note that Flink assigns grouping keys 
>>>>> by hash partitioning to workers. This works well for many distinct keys, 
>>>>> but might cause issues in case of low key cardinality. Also note that the 
>>>>> state size grows and effectiveness reduces with an increasing cardinality 
>>>>> of the artificial key.
>>>>> 
>>>>> Hope this helps,
>>>>> Fabian
>>>>> 
>>>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt...@gmail.com 
>>>>> <mailto:ykt...@gmail.com>>:
>>>>> Do you mean you want to keep the origin window as well as doing some 
>>>>> combine operations inside window in the same time?
>>>>> What kind of data do you expect the following operator will receive?
>>>>> 
>>>>> Best,
>>>>> Kurt
>>>>> 
>>>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonx...@gmail.com 
>>>>> <mailto:sharonx...@gmail.com>> wrote:
>>>>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just 
>>>>> wondering, is there any way for me to preserve the window after 
>>>>> aggregation. More specifically, originally i have something like:
>>>>> 
>>>>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = 
>>>>> dataStream
>>>>>                 .keyBy(0) //id 
>>>>>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>>>> 
>>>>> and then for the reducer I can do:
>>>>>  
>>>>> windowStream.apply(...) 
>>>>> 
>>>>> and expect the window information is preserved.
>>>>> 
>>>>> If I were to do use aggregate on window stream, I would end up with 
>>>>> something like:
>>>>> 
>>>>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>>>>>                 .keyBy(0) //id 
>>>>>                 .timeWindow(Time.of(windowSize, 
>>>>> TimeUnit.MILLISECONDS)).aggregate
>>>>>                           (new AggregateFunction<Tuple2<String, Long>, 
>>>>> Accumulator, Tuple2<String, Long>>() {
>>>>>                     @Override
>>>>>                     public Accumulator createAccumulator() {
>>>>>                         return null;
>>>>>                     }
>>>>> 
>>>>>                     @Override
>>>>>                     public void add(Tuple2<String, Long> stringLong, 
>>>>> Accumulator o)                                       {
>>>>> 
>>>>>                     }
>>>>> 
>>>>>                     @Override
>>>>>                     public Tuple2<String, Long> getResult(Accumulator o) {
>>>>>                         return null;
>>>>>                     }
>>>>> 
>>>>>                     @Override
>>>>>                     public Accumulator merge(Accumulator o, Accumulator 
>>>>> acc1) {
>>>>>                         return null;
>>>>>                     }
>>>>>                 });
>>>>> 
>>>>> Because it looks like aggregate would only transfer WindowedStream to a 
>>>>> DataStream. But for a global aggregation phase (a reducer), should I 
>>>>> extract the window again?
>>>>> 
>>>>> 
>>>>> Thanks! I apologize if that sounds like a very intuitive questions.
>>>>> 
>>>>> 
>>>>> Le
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt...@gmail.com 
>>>>> <mailto:ykt...@gmail.com>> wrote:
>>>>> I think you can use WindowedStream.aggreate
>>>>> 
>>>>> Best,
>>>>> Kurt
>>>>> 
>>>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonx...@gmail.com 
>>>>> <mailto:sharonx...@gmail.com>> wrote:
>>>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has 
>>>>> implementation of combiner in DataStream (to use after keyBy and 
>>>>> windowing).
>>>>> 
>>>>> Thanks again!
>>>>> 
>>>>> Le
>>>>> 
>>>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt...@gmail.com 
>>>>> <mailto:ykt...@gmail.com>> wrote:
>>>>> Hi,
>>>>> 
>>>>> The document you are looking at is pretty old, you can check the newest 
>>>>> version here: 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>>>> 
>>>>> Regarding to your question, you can use combineGroup 
>>>>> 
>>>>> Best,
>>>>> Kurt
>>>>> 
>>>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonx...@gmail.com 
>>>>> <mailto:sharonx...@gmail.com>> wrote:
>>>>> Hello!
>>>>> 
>>>>> I'm new to Flink and I'm wondering if there is a explicit local combiner 
>>>>> to each mapper so I can use to perform a local reduce on each mapper? I 
>>>>> looked up on 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
>>>>>  but couldn't find anything that matches.
>>>>> 
>>>>> 
>>>>> Thanks!
>>>>> 
>>>>> Le
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 

Reply via email to