[ https://issues.apache.org/jira/browse/FLINK-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16563688#comment-16563688 ]
ASF GitHub Bot commented on FLINK-9981: --------------------------------------- azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation URL: https://github.com/apache/flink/pull/6438#discussion_r206506427 ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/MinMaxPriorityQueueOrderedSetCache.java ########## @@ -41,95 +38,86 @@ * * @param <E> type of the contained elements. */ -public class TreeOrderedSetCache<E> implements CachingInternalPriorityQueueSet.OrderedSetCache<E> { +public class MinMaxPriorityQueueOrderedSetCache<E extends HeapPriorityQueueElement> + implements CachingInternalPriorityQueueSet.OrderedSetCache<E> { /** The tree is used to store cached elements. */ @Nonnull - private final ObjectAVLTreeSet<E> avlTree; + private final HeapMinMaxPriorityQueueSet<E> minMaxHeapSet; /** The element comparator. */ @Nonnull - private final Comparator<E> elementComparator; + private final PriorityComparator<E> priorityComparator; /** The maximum capacity of the cache. */ @Nonnegative private final int capacity; /** - * Creates a new {@link TreeOrderedSetCache} with the given capacity and element comparator. Capacity must be > 0. - * @param elementComparator comparator for the cached elements. + * Creates a new {@link MinMaxPriorityQueueOrderedSetCache} with the given capacity and element comparator. Capacity must be > 0. + * @param priorityComparator comparator for the cached elements. * @param capacity the capacity of the cache. Must be > 0. */ - public TreeOrderedSetCache(@Nonnull Comparator<E> elementComparator, @Nonnegative int capacity) { + public MinMaxPriorityQueueOrderedSetCache(@Nonnull PriorityComparator<E> priorityComparator, @Nonnegative int capacity) { Preconditions.checkArgument(capacity > 0, "Cache capacity must be greater than 0."); - this.avlTree = new ObjectAVLTreeSet<>(elementComparator); - this.elementComparator = elementComparator; + this.minMaxHeapSet = new HeapMinMaxPriorityQueueSet<>(priorityComparator, capacity); + this.priorityComparator = priorityComparator; this.capacity = capacity; } @Override public void add(@Nonnull E element) { assert !isFull(); - avlTree.add(element); + minMaxHeapSet.add(element); } @Override public void remove(@Nonnull E element) { - avlTree.remove(element); + minMaxHeapSet.remove(element); } @Override public boolean isFull() { - return avlTree.size() == capacity; + return minMaxHeapSet.size() == capacity; } @Override public boolean isEmpty() { - return avlTree.isEmpty(); + return minMaxHeapSet.isEmpty(); } @Override public boolean isInLowerBound(@Nonnull E toCheck) { - return avlTree.isEmpty() || elementComparator.compare(peekLast(), toCheck) > 0; + return minMaxHeapSet.isEmpty() || priorityComparator.comparePriority(peekLast(), toCheck) >= 0; } @Nullable @Override public E removeFirst() { - if (avlTree.isEmpty()) { - return null; - } - final E first = avlTree.first(); - avlTree.remove(first); - return first; + return minMaxHeapSet.pollFirst(); } @Nullable @Override public E removeLast() { - if (avlTree.isEmpty()) { - return null; - } - final E last = avlTree.last(); - avlTree.remove(last); - return last; + return minMaxHeapSet.pollLast(); } @Nullable @Override public E peekFirst() { - return !avlTree.isEmpty() ? avlTree.first() : null; + return minMaxHeapSet.peekFirst(); } @Nullable @Override public E peekLast() { - return !avlTree.isEmpty() ? avlTree.last() : null; + return minMaxHeapSet.peekLast(); } @Nonnull @Override public CloseableIterator<E> orderedIterator() { - return CloseableIterator.adapterForIterator(avlTree.iterator()); + return minMaxHeapSet.iterator(); Review comment: I think the underlying iterator is not ordered any more and we do not rely on orderness of this interface method anywhere. It can be renamed in this case. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tune performance of RocksDB implementation > ------------------------------------------ > > Key: FLINK-9981 > URL: https://issues.apache.org/jira/browse/FLINK-9981 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Affects Versions: 1.6.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Major > Labels: pull-request-available > > General performance tuning/polishing for the RocksDB implementation. We can > figure out how caching/seeking can be improved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)