[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15313791#comment-15313791 ]
ASF GitHub Bot commented on FLINK-3477: --------------------------------------- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65667687 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param <T> The data type consumed and produced by the combiner. */ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> { - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext<ReduceFunction<T>, T> taskContext; private TypeSerializer<T> serializer; private TypeComparator<T> comparator; - + private ReduceFunction<T> reducer; - + private Collector<T> output; - + + private DriverStrategy strategy; + private InMemorySorter<T> sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable<T> table; + private List<MemorySegment> memory; - private boolean running; + private volatile boolean canceled; --- End diff -- Sorry, I forgot the rename in the chained driver: 6abd3f3cf49568cc0fecd85d7e7d8a0d7f9ec39f And I forgot to invert the meaning with the rename in ReduceCombineDriver: 984ba12f44a7ee9b16790c3e172b53969448e1c2 > Add hash-based combine strategy for ReduceFunction > -------------------------------------------------- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime > Reporter: Fabian Hueske > Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)