Yes, your observations are correct! 

Currently, I see two possible solutions that you could implement as a user:

1. Use .window() with a dummy key followed by a .windowAll():

DataStream<T> input = …;
input
  .map( (in) -> new Tuple2<Integer, T>(<rand int>, in)) 
  .keyBy(0)
  .window(…)
  .aggregate(...)
  .windowAll(…)
  .aggregate(…)

The problem here is that you incur a shuffle, which may or may not improve 
performance, depending on the aggregation operation.

To get around that shuffle you would have to use option 2.

2. Use a custom StreamOperator that does the pre-aggregation and emits results 
on event-time, followed by .windowAll();

DataStream<T> input = …;
input
  .transform(<name>, <type info>, new PreAggregationOperator()) 
  .windowAll(…)
  .aggregate(…)

Where PreAggregationOperator would pre-aggregate and checkpoint the 
pre-aggregated values and emit the pre-aggregate when the watermark for the end 
of a window arrives. The reason for why you have to use a custom operator is 
that a user function cannot “listen” on the watermark and therefore would not 
be able to emit the aggregate at the right time.

I hope this helps.

Best,
Aljoscha

> On 11. Jul 2017, at 22:05, Debski <a.deb...@avsystem.com> wrote:
> 
> Let us assume that I want to perform some kind of aggregation in specified
> time windows (e.g. tumbling window of 1 minute) and my aggregation operation
> is associative. Wouldn't it be possible to represent windowAll in runtime as
> /parallelism + 1/ operator instances where /parallelism/ number of operators
> compute partial aggregates and then partial results are merged into one in
> the last instance of the operator by using merge function that is present in
> AggregateFunction function.
> 
> Basically I would like to compute single aggregated value for all events in
> given time window and aggregation operation itself can be parallelized. 
> 
> For example i could have mapped stream with .map operation that has
> parallelism 4, then each map operator instance would pass 1/4 of events to
> adjacent instance of windowAll operator that would compute desired aggregate
> over subset of events. When the window is closed all partial states would be
> transferred to single windowAll merging operator.
> 
> Are there any plans to support such situations/is it possible to somehow
> implement such operator in current version of Flink. 
> 
> Also there is a note in windowAll java-doc about possible parallelism but I
> don't know how relevant it is to my case:
> 
> Note: This operation can be inherently non-parallel since all elements have
> to pass through the same operator instance. (Only for special cases, such as
> aligned time windows is it possible to perform this operation in parallel).
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Associative-operation-windowAll-possible-parallelism-tp14187.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.

Reply via email to