Yes, your observations are correct! Currently, I see two possible solutions that you could implement as a user:
1. Use .window() with a dummy key followed by a .windowAll(): DataStream<T> input = …; input .map( (in) -> new Tuple2<Integer, T>(<rand int>, in)) .keyBy(0) .window(…) .aggregate(...) .windowAll(…) .aggregate(…) The problem here is that you incur a shuffle, which may or may not improve performance, depending on the aggregation operation. To get around that shuffle you would have to use option 2. 2. Use a custom StreamOperator that does the pre-aggregation and emits results on event-time, followed by .windowAll(); DataStream<T> input = …; input .transform(<name>, <type info>, new PreAggregationOperator()) .windowAll(…) .aggregate(…) Where PreAggregationOperator would pre-aggregate and checkpoint the pre-aggregated values and emit the pre-aggregate when the watermark for the end of a window arrives. The reason for why you have to use a custom operator is that a user function cannot “listen” on the watermark and therefore would not be able to emit the aggregate at the right time. I hope this helps. Best, Aljoscha > On 11. Jul 2017, at 22:05, Debski <a.deb...@avsystem.com> wrote: > > Let us assume that I want to perform some kind of aggregation in specified > time windows (e.g. tumbling window of 1 minute) and my aggregation operation > is associative. Wouldn't it be possible to represent windowAll in runtime as > /parallelism + 1/ operator instances where /parallelism/ number of operators > compute partial aggregates and then partial results are merged into one in > the last instance of the operator by using merge function that is present in > AggregateFunction function. > > Basically I would like to compute single aggregated value for all events in > given time window and aggregation operation itself can be parallelized. > > For example i could have mapped stream with .map operation that has > parallelism 4, then each map operator instance would pass 1/4 of events to > adjacent instance of windowAll operator that would compute desired aggregate > over subset of events. When the window is closed all partial states would be > transferred to single windowAll merging operator. > > Are there any plans to support such situations/is it possible to somehow > implement such operator in current version of Flink. > > Also there is a note in windowAll java-doc about possible parallelism but I > don't know how relevant it is to my case: > > Note: This operation can be inherently non-parallel since all elements have > to pass through the same operator instance. (Only for special cases, such as > aligned time windows is it possible to perform this operation in parallel). > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Associative-operation-windowAll-possible-parallelism-tp14187.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.