Hi Kien: Is there a similar API for DataStream as well?
Thanks! Le > On Oct 26, 2017, at 7:58 AM, Kien Truong <duckientru...@gmail.com> wrote: > > Hi, > > For batch API, you can use GroupReduceFunction, which give you the same > benefit as a MapReduce combiner. > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions>Regards, > Kien > > > On 10/26/2017 7:37 PM, Le Xu wrote: >> Thanks guys! That makes more sense now. >> >> So does it mean once I start use a window operator, all operations on my >> WindowedStream would be global (across all partitions)? In that case, >> WindowedStream.aggregate (or sum) would apply to all data after shuffling >> instead of each partition. >> >> If I understand this correctly, once I want to perform some sort of counting >> within each partition for different words (such as word count), I should >> really avoid using keyBy but keep some sort of counting map for each word >> while also keep track of the current time stamp, inside each mapper. >> >> Le >> >> >> >> >>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhue...@gmail.com >>> <mailto:fhue...@gmail.com>> wrote: >>> >>> Hi, >>> >>> in a MapReduce context, combiners are used to reduce the amount of data 1) >>> to shuffle and fully sort (to group the data by key) and 2) to reduce the >>> impact of skewed data. >>> >>> The question is, why do you need a combiner in your use case. >>> - To reduce the data to shuffle: You should not use a window operator to >>> preaggregate because keyBy implies a shuffle. Instead you could implement a >>> ProcessFunction with operator state. In this solution you need to implement >>> the windowing logic yourself, i.e., group data in window based on their >>> timestamp. Ensure you don't run out of memory (operator state is kept on >>> the heap), etc. So this solution needs quite a bit of manual tuning. >>> - To reduce the impact of skewed data: You can use a window aggregation if >>> you don't mind the shuffle. However, you should add an additional >>> artificial key attribute to spread out the computation of the same >>> original key to more grouping key. Note that Flink assigns grouping keys by >>> hash partitioning to workers. This works well for many distinct keys, but >>> might cause issues in case of low key cardinality. Also note that the state >>> size grows and effectiveness reduces with an increasing cardinality of the >>> artificial key. >>> >>> Hope this helps, >>> Fabian >>> >>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt...@gmail.com >>> <mailto:ykt...@gmail.com>>: >>> Do you mean you want to keep the origin window as well as doing some >>> combine operations inside window in the same time? >>> What kind of data do you expect the following operator will receive? >>> >>> Best, >>> Kurt >>> >>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonx...@gmail.com >>> <mailto:sharonx...@gmail.com>> wrote: >>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just >>> wondering, is there any way for me to preserve the window after >>> aggregation. More specifically, originally i have something like: >>> >>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = >>> dataStream >>> .keyBy(0) //id >>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)) >>> >>> and then for the reducer I can do: >>> >>> windowStream.apply(...) >>> >>> and expect the window information is preserved. >>> >>> If I were to do use aggregate on window stream, I would end up with >>> something like: >>> >>> DataStream<Tuple2<String, Long>> windowStream = dataStream >>> .keyBy(0) //id >>> .timeWindow(Time.of(windowSize, >>> TimeUnit.MILLISECONDS)).aggregate >>> (new AggregateFunction<Tuple2<String, Long>, >>> Accumulator, Tuple2<String, Long>>() { >>> @Override >>> public Accumulator createAccumulator() { >>> return null; >>> } >>> >>> @Override >>> public void add(Tuple2<String, Long> stringLong, >>> Accumulator o) { >>> >>> } >>> >>> @Override >>> public Tuple2<String, Long> getResult(Accumulator o) { >>> return null; >>> } >>> >>> @Override >>> public Accumulator merge(Accumulator o, Accumulator >>> acc1) { >>> return null; >>> } >>> }); >>> >>> Because it looks like aggregate would only transfer WindowedStream to a >>> DataStream. But for a global aggregation phase (a reducer), should I >>> extract the window again? >>> >>> >>> Thanks! I apologize if that sounds like a very intuitive questions. >>> >>> >>> Le >>> >>> >>> >>> >>> >>> >>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt...@gmail.com >>> <mailto:ykt...@gmail.com>> wrote: >>> I think you can use WindowedStream.aggreate >>> >>> Best, >>> Kurt >>> >>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonx...@gmail.com >>> <mailto:sharonx...@gmail.com>> wrote: >>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has >>> implementation of combiner in DataStream (to use after keyBy and windowing). >>> >>> Thanks again! >>> >>> Le >>> >>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt...@gmail.com >>> <mailto:ykt...@gmail.com>> wrote: >>> Hi, >>> >>> The document you are looking at is pretty old, you can check the newest >>> version here: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html> >>> >>> Regarding to your question, you can use combineGroup >>> >>> Best, >>> Kurt >>> >>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonx...@gmail.com >>> <mailto:sharonx...@gmail.com>> wrote: >>> Hello! >>> >>> I'm new to Flink and I'm wondering if there is a explicit local combiner to >>> each mapper so I can use to perform a local reduce on each mapper? I looked >>> up on >>> https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html> >>> but couldn't find anything that matches. >>> >>> >>> Thanks! >>> >>> Le >>> >>> >>> >>> >>> >>> >>