On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhue...@gmail.com
<mailto:fhue...@gmail.com>> wrote:
Hi,
in a MapReduce context, combiners are used to reduce the amount of
data 1) to shuffle and fully sort (to group the data by key) and 2)
to reduce the impact of skewed data.
The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator
to preaggregate because keyBy implies a shuffle. Instead you could
implement a ProcessFunction with operator state. In this solution you
need to implement the windowing logic yourself, i.e., group data in
window based on their timestamp. Ensure you don't run out of memory
(operator state is kept on the heap), etc. So this solution needs
quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window
aggregation if you don't mind the shuffle. However, you should add an
additional artificial key attribute to spread out the computation of
the same original key to more grouping key. Note that Flink assigns
grouping keys by hash partitioning to workers. This works well for
many distinct keys, but might cause issues in case of low key
cardinality. Also note that the state size grows and effectiveness
reduces with an increasing cardinality of the artificial key.
Hope this helps,
Fabian
2017-10-26 3:32 GMT+02:00 Kurt Young <ykt...@gmail.com
<mailto:ykt...@gmail.com>>:
Do you mean you want to keep the origin window as well as doing
some combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?
Best,
Kurt
On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonx...@gmail.com
<mailto:sharonx...@gmail.com>> wrote:
Thank Kurt I'm trying out WindowedStream aggregate right now.
Just wondering, is there any way for me to preserve the
window after aggregation. More specifically, originally i
have something like:
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow>
windowStream = dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
and then for the reducer I can do:
windowStream.apply(...)
and expect the window information is preserved.
If I were to do use aggregate on window stream, I would end
up with something like:
DataStream<Tuple2<String, Long>> windowStream = dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator,
Tuple2<String, Long>>() {
@Override
public Accumulator createAccumulator() {
return null;
}
@Override
public void add(Tuple2<String, Long> stringLong, Accumulator o) {
}
@Override
public Tuple2<String, Long> getResult(Accumulator o) {
return null;
}
@Override
public Accumulator merge(Accumulator o, Accumulator acc1) {
return null;
}
});
Because it looks like aggregate would only transfer
WindowedStream to a DataStream. But for a global aggregation
phase (a reducer), should I extract the window again?
Thanks! I apologize if that sounds like a very intuitive
questions.
Le
On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt...@gmail.com
<mailto:ykt...@gmail.com>> wrote:
I think you can use WindowedStream.aggreate
Best,
Kurt
On Tue, Oct 24, 2017 at 1:45 PM, Le Xu
<sharonx...@gmail.com <mailto:sharonx...@gmail.com>> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was
wondering if Flink has implementation of combiner in
DataStream (to use after keyBy and windowing).
Thanks again!
Le
On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young
<ykt...@gmail.com <mailto:ykt...@gmail.com>> wrote:
Hi,
The document you are looking at is pretty old,
you can check the newest version here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
Regarding to your question, you can use combineGroup
Best,
Kurt
On Mon, Oct 23, 2017 at 5:22 AM, Le Xu
<sharonx...@gmail.com
<mailto:sharonx...@gmail.com>> wrote:
Hello!
I'm new to Flink and I'm wondering if there
is a explicit local combiner to each mapper
so I can use to perform a local reduce on
each mapper? I looked up on
https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
<https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
but couldn't find anything that matches.
Thanks!
Le