Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202556109 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java --- @@ -80,13 +82,57 @@ public E peek() { @Override public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) { + if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) { + bulkPollRelaxedOrder(canConsume, consumer); + } else { + bulkPollStrictOrder(canConsume, consumer); + } + } + + private void bulkPollRelaxedOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) { + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } else { + while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) { + final E next = orderedCache.removeFirst(); + orderedStore.remove(next); + consumer.accept(next); + } + + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } + } + } + + private void bulkPollStrictOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) { E element; while ((element = peek()) != null && canConsume.test(element)) { poll(); consumer.accept(element); } } + private void bulkPollStore(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) { + try (CloseableIterator<E> iterator = orderedStore.orderedIterator()) { + while (iterator.hasNext()) { + final E next = iterator.next(); + if (canConsume.test(next)) { + orderedStore.remove(next); + consumer.accept(next); + } else { + orderedCache.add(next); + while (iterator.hasNext() && !orderedCache.isFull()) { + orderedCache.add(iterator.next()); + } + break; + } + } + } catch (Exception e) { + throw new FlinkRuntimeException("Exception while bulk polling store.", e); --- End diff -- Why would you prefer it? I think there is no better way that caller can handle problems in this call than failing the job (rocksdb problems)?
---