[ 
https://issues.apache.org/jira/browse/FLINK-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16563688#comment-16563688
 ] 

ASF GitHub Bot commented on FLINK-9981:
---------------------------------------

azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune 
performance of RocksDB implementation
URL: https://github.com/apache/flink/pull/6438#discussion_r206506427
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/MinMaxPriorityQueueOrderedSetCache.java
 ##########
 @@ -41,95 +38,86 @@
  *
  * @param <E> type of the contained elements.
  */
-public class TreeOrderedSetCache<E> implements 
CachingInternalPriorityQueueSet.OrderedSetCache<E> {
+public class MinMaxPriorityQueueOrderedSetCache<E extends 
HeapPriorityQueueElement>
+       implements CachingInternalPriorityQueueSet.OrderedSetCache<E> {
 
        /** The tree is used to store cached elements. */
        @Nonnull
-       private final ObjectAVLTreeSet<E> avlTree;
+       private final HeapMinMaxPriorityQueueSet<E> minMaxHeapSet;
 
        /** The element comparator. */
        @Nonnull
-       private final Comparator<E> elementComparator;
+       private final PriorityComparator<E> priorityComparator;
 
        /** The maximum capacity of the cache. */
        @Nonnegative
        private final int capacity;
 
        /**
-        * Creates a new {@link TreeOrderedSetCache} with the given capacity 
and element comparator. Capacity must be > 0.
-        * @param elementComparator comparator for the cached elements.
+        * Creates a new {@link MinMaxPriorityQueueOrderedSetCache} with the 
given capacity and element comparator. Capacity must be > 0.
+        * @param priorityComparator comparator for the cached elements.
         * @param capacity the capacity of the cache. Must be > 0.
         */
-       public TreeOrderedSetCache(@Nonnull Comparator<E> elementComparator, 
@Nonnegative int capacity) {
+       public MinMaxPriorityQueueOrderedSetCache(@Nonnull 
PriorityComparator<E> priorityComparator, @Nonnegative int capacity) {
                Preconditions.checkArgument(capacity > 0, "Cache capacity must 
be greater than 0.");
-               this.avlTree = new ObjectAVLTreeSet<>(elementComparator);
-               this.elementComparator = elementComparator;
+               this.minMaxHeapSet = new 
HeapMinMaxPriorityQueueSet<>(priorityComparator, capacity);
+               this.priorityComparator = priorityComparator;
                this.capacity = capacity;
        }
 
        @Override
        public void add(@Nonnull E element) {
                assert !isFull();
-               avlTree.add(element);
+               minMaxHeapSet.add(element);
        }
 
        @Override
        public void remove(@Nonnull E element) {
-               avlTree.remove(element);
+               minMaxHeapSet.remove(element);
        }
 
        @Override
        public boolean isFull() {
-               return avlTree.size() == capacity;
+               return minMaxHeapSet.size() == capacity;
        }
 
        @Override
        public boolean isEmpty() {
-               return avlTree.isEmpty();
+               return minMaxHeapSet.isEmpty();
        }
 
        @Override
        public boolean isInLowerBound(@Nonnull E toCheck) {
-               return avlTree.isEmpty() || 
elementComparator.compare(peekLast(), toCheck) > 0;
+               return minMaxHeapSet.isEmpty() || 
priorityComparator.comparePriority(peekLast(), toCheck) >= 0;
        }
 
        @Nullable
        @Override
        public E removeFirst() {
-               if (avlTree.isEmpty()) {
-                       return null;
-               }
-               final E first = avlTree.first();
-               avlTree.remove(first);
-               return first;
+               return minMaxHeapSet.pollFirst();
        }
 
        @Nullable
        @Override
        public E removeLast() {
-               if (avlTree.isEmpty()) {
-                       return null;
-               }
-               final E last = avlTree.last();
-               avlTree.remove(last);
-               return last;
+               return minMaxHeapSet.pollLast();
        }
 
        @Nullable
        @Override
        public E peekFirst() {
-               return !avlTree.isEmpty() ? avlTree.first() : null;
+               return minMaxHeapSet.peekFirst();
        }
 
        @Nullable
        @Override
        public E peekLast() {
-               return !avlTree.isEmpty() ? avlTree.last() : null;
+               return minMaxHeapSet.peekLast();
        }
 
        @Nonnull
        @Override
        public CloseableIterator<E> orderedIterator() {
-               return CloseableIterator.adapterForIterator(avlTree.iterator());
+               return minMaxHeapSet.iterator();
 
 Review comment:
   I think the underlying iterator is not ordered any more and we do not rely 
on orderness of this interface method anywhere. It can be renamed in this case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tune performance of RocksDB implementation
> ------------------------------------------
>
>                 Key: FLINK-9981
>                 URL: https://issues.apache.org/jira/browse/FLINK-9981
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>
> General performance tuning/polishing for the RocksDB implementation. We can 
> figure out how caching/seeking can be improved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to