On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhue...@gmail.com
<mailto:fhue...@gmail.com>> wrote:
Hi,
in a MapReduce context, combiners are used to reduce the amount of
data 1) to shuffle and fully sort (to group the data by key) and 2)
to reduce the impact of skewed data.
The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window
operator to preaggregate because keyBy implies a shuffle. Instead
you could implement a ProcessFunction with operator state. In this
solution you need to implement the windowing logic yourself, i.e.,
group data in window based on their timestamp. Ensure you don't run
out of memory (operator state is kept on the heap), etc. So this
solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window
aggregation if you don't mind the shuffle. However, you should add
an additional artificial key attribute to spread out the
computation of the same original key to more grouping key. Note
that Flink assigns grouping keys by hash partitioning to workers.
This works well for many distinct keys, but might cause issues in
case of low key cardinality. Also note that the state size grows
and effectiveness reduces with an increasing cardinality of the
artificial key.
Hope this helps,
Fabian
2017-10-26 3:32 GMT+02:00 Kurt Young <ykt...@gmail.com
<mailto:ykt...@gmail.com>>:
Do you mean you want to keep the origin window as well as doing
some combine operations inside window in the same time?
What kind of data do you expect the following operator will
receive?
Best,
Kurt
On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonx...@gmail.com
<mailto:sharonx...@gmail.com>> wrote:
Thank Kurt I'm trying out WindowedStream aggregate right
now. Just wondering, is there any way for me to preserve
the window after aggregation. More specifically, originally
i have something like:
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow>
windowStream = dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
and then for the reducer I can do:
windowStream.apply(...)
and expect the window information is preserved.
If I were to do use aggregate on window stream, I would end
up with something like:
DataStream<Tuple2<String, Long>> windowStream = dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize,
TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator,
Tuple2<String, Long>>() {
@Override
public Accumulator createAccumulator() {
return null;
}
@Override
public void add(Tuple2<String, Long> stringLong,
Accumulator o) {
}
@Override
public Tuple2<String, Long> getResult(Accumulator
o) {
return null;
}
@Override
public Accumulator merge(Accumulator o,
Accumulator acc1) {
return null;
}
});
Because it looks like aggregate would only transfer
WindowedStream to a DataStream. But for a global
aggregation phase (a reducer), should I extract the window
again?
Thanks! I apologize if that sounds like a very intuitive
questions.
Le
On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young
<ykt...@gmail.com <mailto:ykt...@gmail.com>> wrote:
I think you can use WindowedStream.aggreate
Best,
Kurt
On Tue, Oct 24, 2017 at 1:45 PM, Le Xu
<sharonx...@gmail.com <mailto:sharonx...@gmail.com>> wrote:
Thanks Kurt. Maybe I wasn't clear before, I was
wondering if Flink has implementation of combiner
in DataStream (to use after keyBy and windowing).
Thanks again!
Le
On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young
<ykt...@gmail.com <mailto:ykt...@gmail.com>> wrote:
Hi,
The document you are looking at is pretty old,
you can check the newest version here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
Regarding to your question, you can use
combineGroup
Best,
Kurt
On Mon, Oct 23, 2017 at 5:22 AM, Le Xu
<sharonx...@gmail.com
<mailto:sharonx...@gmail.com>> wrote:
Hello!
I'm new to Flink and I'm wondering if there
is a explicit local combiner to each mapper
so I can use to perform a local reduce on
each mapper? I looked up on
https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
<https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
but couldn't find anything that matches.
Thanks!
Le