Thanks for the help! I’ll try out the ProcessFunction then. Le
> On Oct 26, 2017, at 8:03 AM, Kien Truong <duckientru...@gmail.com> wrote: > > Hi, > For Streaming API, use a ProcessFunction as Fabian's suggestion. > You can pretty much do anything with a ProcessFunction :) > > Best regards, > > Kien > > > On 10/26/2017 8:01 PM, Le Xu wrote: >> Hi Kien: >> >> Is there a similar API for DataStream as well? >> >> Thanks! >> >> Le >> >> >>> On Oct 26, 2017, at 7:58 AM, Kien Truong <duckientru...@gmail.com >>> <mailto:duckientru...@gmail.com>> wrote: >>> >>> Hi, >>> >>> For batch API, you can use GroupReduceFunction, which give you the same >>> benefit as a MapReduce combiner. >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions>Regards, >>> Kien >>> >>> >>> On 10/26/2017 7:37 PM, Le Xu wrote: >>>> Thanks guys! That makes more sense now. >>>> >>>> So does it mean once I start use a window operator, all operations on my >>>> WindowedStream would be global (across all >>>> partitions)? In that case, WindowedStream.aggregate (or sum) would apply >>>> to all data after shuffling instead of each partition. >>>> >>>> If I understand this correctly, once I want to perform some sort of >>>> counting within each partition for different words (such as word count), I >>>> should really avoid using keyBy but keep some sort of counting map for >>>> each word while also keep track of the current time stamp, inside each >>>> mapper. >>>> >>>> Le >>>> >>>> >>>> >>>> >>>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhue...@gmail.com >>>>> <mailto:fhue...@gmail.com>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> in a MapReduce context, combiners are used to reduce the amount of data >>>>> 1) to shuffle and fully sort (to group the data by key) and 2) to reduce >>>>> the impact of skewed data. >>>>> >>>>> The question is, why do you need a combiner in your use case. >>>>> - To reduce the data to shuffle: You should not use a window operator to >>>>> preaggregate because keyBy implies a shuffle. Instead you could implement >>>>> a ProcessFunction with operator state. In this solution you need to >>>>> implement the windowing logic yourself, i.e., group data in window based >>>>> on their timestamp. Ensure you don't run out of memory (operator state is >>>>> kept on the heap), etc. So this solution needs quite a bit of manual >>>>> tuning. >>>>> - To reduce the impact of skewed data: You can use a window aggregation >>>>> if you don't mind the shuffle. However, you should add an additional >>>>> artificial key attribute to spread out the computation of the same >>>>> original key to more grouping key. Note that Flink assigns grouping keys >>>>> by hash partitioning to workers. This works well for many distinct keys, >>>>> but might cause issues in case of low key cardinality. Also note that the >>>>> state size grows and effectiveness reduces with an increasing cardinality >>>>> of the artificial key. >>>>> >>>>> Hope this helps, >>>>> Fabian >>>>> >>>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt...@gmail.com >>>>> <mailto:ykt...@gmail.com>>: >>>>> Do you mean you want to keep the origin window as well as doing some >>>>> combine operations inside window in the same time? >>>>> What kind of data do you expect the following operator will receive? >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonx...@gmail.com >>>>> <mailto:sharonx...@gmail.com>> wrote: >>>>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just >>>>> wondering, is there any way for me to preserve the window after >>>>> aggregation. More specifically, originally i have something like: >>>>> >>>>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = >>>>> dataStream >>>>> .keyBy(0) //id >>>>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)) >>>>> >>>>> and then for the reducer I can do: >>>>> >>>>> windowStream.apply(...) >>>>> >>>>> and expect the window information is preserved. >>>>> >>>>> If I were to do use aggregate on window stream, I would end up with >>>>> something like: >>>>> >>>>> DataStream<Tuple2<String, Long>> windowStream = dataStream >>>>> .keyBy(0) //id >>>>> .timeWindow(Time.of(windowSize, >>>>> TimeUnit.MILLISECONDS)).aggregate >>>>> (new AggregateFunction<Tuple2<String, Long>, >>>>> Accumulator, Tuple2<String, Long>>() { >>>>> @Override >>>>> public Accumulator createAccumulator() { >>>>> return null; >>>>> } >>>>> >>>>> @Override >>>>> public void add(Tuple2<String, Long> stringLong, >>>>> Accumulator o) { >>>>> >>>>> } >>>>> >>>>> @Override >>>>> public Tuple2<String, Long> getResult(Accumulator o) { >>>>> return null; >>>>> } >>>>> >>>>> @Override >>>>> public Accumulator merge(Accumulator o, Accumulator >>>>> acc1) { >>>>> return null; >>>>> } >>>>> }); >>>>> >>>>> Because it looks like aggregate would only transfer WindowedStream to a >>>>> DataStream. But for a global aggregation phase (a reducer), should I >>>>> extract the window again? >>>>> >>>>> >>>>> Thanks! I apologize if that sounds like a very intuitive questions. >>>>> >>>>> >>>>> Le >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt...@gmail.com >>>>> <mailto:ykt...@gmail.com>> wrote: >>>>> I think you can use WindowedStream.aggreate >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonx...@gmail.com >>>>> <mailto:sharonx...@gmail.com>> wrote: >>>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has >>>>> implementation of combiner in DataStream (to use after keyBy and >>>>> windowing). >>>>> >>>>> Thanks again! >>>>> >>>>> Le >>>>> >>>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt...@gmail.com >>>>> <mailto:ykt...@gmail.com>> wrote: >>>>> Hi, >>>>> >>>>> The document you are looking at is pretty old, you can check the newest >>>>> version here: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html >>>>> >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html> >>>>> >>>>> Regarding to your question, you can use combineGroup >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonx...@gmail.com >>>>> <mailto:sharonx...@gmail.com>> wrote: >>>>> Hello! >>>>> >>>>> I'm new to Flink and I'm wondering if there is a explicit local combiner >>>>> to each mapper so I can use to perform a local reduce on each mapper? I >>>>> looked up on >>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html >>>>> >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html> >>>>> but couldn't find anything that matches. >>>>> >>>>> >>>>> Thanks! >>>>> >>>>> Le >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>