godfrey he created FLINK-12161: ---------------------------------- Summary: Supports partial-final optimization for stream group aggregate Key: FLINK-12161 URL: https://issues.apache.org/jira/browse/FLINK-12161 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he
To resolve data-skew for distinct aggregates on stream, we introduce a rule named {{SplitAggregateRule}} which rewrites an aggregate query with distinct aggregations into an expanded double aggregations. The first aggregation compute the results in sub-partition(with bucket) and the results are combined by the second aggregation. if two-stage aggregation is also enabled, we find that many plans have common pattern, looks like: {code} ... StreamExecGlobalGroupAggregate (final global agg) +- StreamExecExchange +- StreamExecLocalGroupAggregate (final local agg) +- StreamExecGlobalGroupAggregate (partial global agg) +- .... {code} There is no exchange between the final local aggregate and the partial global aggregate, so they will be executed in a same JobVertex, and could share state. We introduce a rule named {{IncrementalAggregateRule}} to do that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)