[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15246167#comment-15246167 ]
ASF GitHub Bot commented on FLINK-3477: --------------------------------------- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r60104097 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java --- @@ -54,6 +54,32 @@ @Internal public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleInputOperator<T, T, FT> { + /** + * An enumeration of hints, optionally usable to tell the system exactly how to execute the combiner phase + * of a reduce. + * (Note: The final reduce phase (after combining) is currently always executed by a sort-based strategy.) + */ + public enum CombineHint { --- End diff -- Should we consider an additional flag {{CombineHint.NONE}} for cases where the expected number of duplicate keys is relatively small (i.e. FLINK-3279)? > Add hash-based combine strategy for ReduceFunction > -------------------------------------------------- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime > Reporter: Fabian Hueske > Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)