Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65667687 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param <T> The data type consumed and produced by the combiner. */ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> { - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext<ReduceFunction<T>, T> taskContext; private TypeSerializer<T> serializer; private TypeComparator<T> comparator; - + private ReduceFunction<T> reducer; - + private Collector<T> output; - + + private DriverStrategy strategy; + private InMemorySorter<T> sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable<T> table; + private List<MemorySegment> memory; - private boolean running; + private volatile boolean canceled; --- End diff -- Sorry, I forgot the rename in the chained driver: 6abd3f3cf49568cc0fecd85d7e7d8a0d7f9ec39f And I forgot to invert the meaning with the rename in ReduceCombineDriver: 984ba12f44a7ee9b16790c3e172b53969448e1c2
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---