[ https://issues.apache.org/jira/browse/FLINK-7561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148762#comment-16148762 ]
ASF GitHub Bot commented on FLINK-7561: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4626#discussion_r136293381 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/PreAggregationOperator.java --- @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.aggregation; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This operator perform preliminary aggregation of the input values on non-keyed stream. This means that the output + * is not fully aggregated, but only partially. It should be placed before keyBy of the final aggregation. This make it + * useful in couple of scenarios: + * <ol> + * <li>Performing keyBy operation with low number of distinct values in the key. In such case + * {@link PreAggregationOperator} can reduce both CPU usage and network usage, by pre-aggregating most of the + * values before shuffling them over the network.</li> + * <li>Increasing the parallelism above the number of distinct values for the task preceding the keyBy operation.</li> + * <li>Handling the data skew of some of the key values. Normally if there is a data skew, a lot of work could be + * dumped onto one single CPU core in the cluster. With pre aggregation some of that work can be performed in more + * distributed fashion by the {@link PreAggregationOperator}.</li> + * <li>Output partitioning of the data source is correlated with keyBy partitioning. For example when data source + * is partitioned by day and keyBy function shuffles the data based by day and hour.</li> + * </ol> + * + * <p>Because this operator performs only pre aggregation, it doesn't output the result of {@link AggregateFunction} + * but rather it outputs a tuple containing the Key, Window, and Accumulator, where Accumulator is a partially + * aggregated result {@link AggregateFunction}. + * + * <p>Keep in mind that {@link PreAggregationOperator} can have significant higher memory consumption compared to + * normal aggregation. If the input data are either not partitioned or the input partitioning is not correlated with + * the {@code keySelector}, each instance {@link PreAggregationOperator} can end up having each own accumulators entry + * per each key. In other words in that case memory consumption is expected to be {@code parallelism} times larger + * compared to what {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} would have. + * + * <p>It is expected that this operator should be followed by keyBy operation based on {@code tuple.f0} and after that + * followed by {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} which perform + * {@link AggregateFunction#merge(ACC, ACC)}. + * + * <p>Because currently {@link PreAggregationOperator} does not use {@link org.apache.flink.streaming.api.TimerService} + * only two elements triggering policies are supported: + * <ol> + * <li>Flush everything on any watermark.</li> + * <li>Iterate over each element in the state on each watermark and emit it if watermark's timestamp exceeds + * {@link Window#maxTimestamp()}.</li> + * </ol> + * The first option has a drawback that it will often unnecessary emit elements, that could potentially be further + * aggregated. The second one is quite CPU intensive if watermarks are emitted relatively often to the number of pre + * aggregated key values. + * + * <p>Other limitations and notes: + * <ol> + * <li>{@link MergingWindowAssigner} is not supported.</li> + * <li>{@link PreAggregationOperator} emits all of its data on each received watermark.</li> + * <li>On restoring from checkpoint keys can be randomly shuffled between {@link PreAggregationOperator} + * instances</li>. + * </ol> + */ +@PublicEvolving +public class PreAggregationOperator<K, IN, ACC, W extends Window> + extends AbstractStreamOperator<Tuple3<K, W, ACC>> + implements OneInputStreamOperator<IN, Tuple3<K, W, ACC>>, Serializable { + + protected final AggregateFunction<IN, ACC, ?> aggregateFunction; + protected final KeySelector<IN, K> keySelector; + protected final WindowAssigner<? super IN, W> windowAssigner; + protected final TypeInformation<K> keyTypeInformation; + protected final TypeInformation<ACC> accumulatorTypeInformation; + protected final boolean flushAllOnWatermark; + protected final Map<Tuple2<K, W>, ACC> aggregates = new HashMap<>(); + + protected transient WindowAssigner.WindowAssignerContext windowAssignerContext; + protected transient ListState<Tuple3<K, W, ACC>> aggregatesState; + + /** + * Creates {@link PreAggregationOperator}. + * + * @param aggregateFunction function used for aggregation. Note, {@link AggregateFunction#getResult(Object)} will + * not be used. + * @param keySelector + * @param keyTypeInformation + * @param accumulatorTypeInformation + * @param windowAssigner + * @param flushAllOnWatermark flag to control whether all elements should be emitted on any watermark. Check more + * information in {@link PreAggregationOperator}. + */ + public PreAggregationOperator( + AggregateFunction<IN, ACC, ?> aggregateFunction, + KeySelector<IN, K> keySelector, + TypeInformation<K> keyTypeInformation, + TypeInformation<ACC> accumulatorTypeInformation, + WindowAssigner<? super IN, W> windowAssigner, + boolean flushAllOnWatermark) { + this.aggregateFunction = checkNotNull(aggregateFunction, "aggregateFunction is null"); + this.keySelector = checkNotNull(keySelector, "keySelector is null"); + this.windowAssigner = checkNotNull(windowAssigner, "windowAssigner is null"); + this.keyTypeInformation = checkNotNull(keyTypeInformation, "keyTypeInformation is null"); + this.accumulatorTypeInformation = checkNotNull(accumulatorTypeInformation, "accumulatorTypeInformation is null"); + this.flushAllOnWatermark = flushAllOnWatermark; + + checkNotNull(keyTypeInformation, "keyTypeInformation is null"); + checkNotNull(accumulatorTypeInformation, "accumulatorTypeInformation is null"); + + checkArgument(!(windowAssigner instanceof MergingWindowAssigner), + "MergingWindowAssigner is not supported by the PreAggregationOperator"); + } + + @Override + public void open() throws Exception { + windowAssignerContext = new WindowAssigner.WindowAssignerContext() { + @Override + public long getCurrentProcessingTime() { + return System.currentTimeMillis(); --- End diff -- This should use the `ProcessingTimeService` that can be retrieve via `getProcessingTimeService()`. Then you can also test the operator with processing time because the test harness allows advancing processing time manually. > Add support for pre-aggregation in DataStream API > ------------------------------------------------- > > Key: FLINK-7561 > URL: https://issues.apache.org/jira/browse/FLINK-7561 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)