Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202589096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java --- @@ -80,13 +82,57 @@ public E peek() { @Override public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) { + if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) { + bulkPollRelaxedOrder(canConsume, consumer); + } else { + bulkPollStrictOrder(canConsume, consumer); + } + } + + private void bulkPollRelaxedOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) { + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } else { + while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) { + final E next = orderedCache.removeFirst(); + orderedStore.remove(next); + consumer.accept(next); + } + + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } + } + } + + private void bulkPollStrictOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) { E element; while ((element = peek()) != null && canConsume.test(element)) { poll(); consumer.accept(element); } } + private void bulkPollStore(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) { + try (CloseableIterator<E> iterator = orderedStore.orderedIterator()) { + while (iterator.hasNext()) { + final E next = iterator.next(); + if (canConsume.test(next)) { + orderedStore.remove(next); + consumer.accept(next); + } else { + orderedCache.add(next); + while (iterator.hasNext() && !orderedCache.isFull()) { + orderedCache.add(iterator.next()); + } + break; + } + } + } catch (Exception e) { + throw new FlinkRuntimeException("Exception while bulk polling store.", e); --- End diff -- Because it makes it more explicit that there are things which can go wrong. With checked exceptions you still have the chance to let the program fail. But without them, the caller needs to know that there are unchecked exception in order to do any recovery operation. Moreover, I'm not sure whether we should manifest on this level how recovery is done or not done. For example, maybe the caller can fetch the latest checkpoint data again and replay all in-between elements in order to recompute the state. This is something which the priority queue should not need to bother about.
---