[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487458#comment-16487458 ]
ASF GitHub Bot commented on FLINK-9423: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r190289953 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * <p>Possible future improvements: + * <ul> + * <li>We could also implement shrinking for the heap and the deduplication maps.</li> + * <li>We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc..</li> + * </ul> + * + * @param <K> type of the key of the internal timers managed by this priority queue. + * @param <N> type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap<K, N> implements Queue<TimerHeapInternalTimer<K, N>>, Set<TimerHeapInternalTimer<K, N>> { + + /** + * A safe maximum size for arrays in the JVM. + */ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** + * Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. + */ + private static final Comparator<TimerHeapInternalTimer<?, ?>> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** + * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. + */ + private final HashMap<TimerHeapInternalTimer<K, N>, TimerHeapInternalTimer<K, N>>[] deduplicationMapsByKeyGroup; + + /** + * The array that represents the heap-organized priority queue. + */ + private TimerHeapInternalTimer<K, N>[] queue; + + /** + * The current size of the priority queue. + */ + private int size; + + /** + * The key-group range of timers that are managed by this queue. + */ + private final KeyGroupRange keyGroupRange; + + /** + * The total number of key-groups of the job. + */ + private final int totalNumberOfKeyGroups; + + + /** + * Creates an empty {@link InternalTimerHeap} with the requested initial capacity. + * + * @param minimumCapacity the minimum and initial capacity of this priority queue. + */ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; + this.keyGroupRange = keyGroupRange; + + final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups(); + final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange; + this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange]; + for (int i = 0; i < keyGroupsInLocalRange; ++i) { + deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize); + } + + this.queue = new TimerHeapInternalTimer[1 + minimumCapacity]; + } + + /** + * @see Set#add(Object) + */ + @Override + public boolean add(@Nonnull TimerHeapInternalTimer<K, N> timer) { + + if (getDedupMapForKeyGroup(timer).putIfAbsent(timer, timer) == null) { + final int newSize = ++this.size; + checkCapacity(newSize); + moveElementToIdx(timer, newSize); + siftUp(newSize); + return true; + } else { + return false; + } + } + + /** + * This behaves like {@link #add(TimerHeapInternalTimer)}. + */ + @Override + public boolean offer(@Nonnull TimerHeapInternalTimer<K, N> k) { + return add(k); + } + + @Nullable + @Override + public TimerHeapInternalTimer<K, N> poll() { + return size() > 0 ? removeElementAtIndex(1) : null; + } + + @Nonnull + @Override + public TimerHeapInternalTimer<K, N> remove() { + TimerHeapInternalTimer<K, N> pollResult = poll(); + if (pollResult != null) { + return pollResult; + } else { + throw new NoSuchElementException("InternalTimerPriorityQueue is empty."); + } + } + + @Nullable + @Override + public TimerHeapInternalTimer<K, N> peek() { + return size() > 0 ? queue[1] : null; + } + + @Nonnull + @Override + public TimerHeapInternalTimer<K, N> element() { + TimerHeapInternalTimer<K, N> peekResult = peek(); + if (peekResult != null) { + return peekResult; + } else { + throw new NoSuchElementException("InternalTimerPriorityQueue is empty."); + } + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean contains(@Nullable Object o) { + return (o instanceof TimerHeapInternalTimer) + && getDedupMapForKeyGroup((TimerHeapInternalTimer<?, ?>) o).containsKey(o); + } + + @Override + public boolean remove(@Nullable Object o) { + if (o instanceof TimerHeapInternalTimer) { + return removeInternal((TimerHeapInternalTimer<?, ?>) o); + } + return false; + } + + @Override + public boolean addAll(@Nullable Collection<? extends TimerHeapInternalTimer<K, N>> timers) { + + if (timers == null) { + return true; + } + + if (timers.size() > queue.length) { --- End diff -- Good question, what the right choice here is because it might also be that all elements in timers are duplicates and already contained. Not sure what the optimal choice is here. > Implement efficient deletes for heap based timer service > -------------------------------------------------------- > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.6.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Major > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)