Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6333#discussion_r202589096
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
 ---
    @@ -80,13 +82,57 @@ public E peek() {
     
        @Override
        public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull 
Consumer<E> consumer) {
    +           if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
    +                   bulkPollRelaxedOrder(canConsume, consumer);
    +           } else {
    +                   bulkPollStrictOrder(canConsume, consumer);
    +           }
    +   }
    +
    +   private void bulkPollRelaxedOrder(@Nonnull Predicate<E> canConsume, 
@Nonnull Consumer<E> consumer) {
    +           if (orderedCache.isEmpty()) {
    +                   bulkPollStore(canConsume, consumer);
    +           } else {
    +                   while (!orderedCache.isEmpty() && 
canConsume.test(orderedCache.peekFirst())) {
    +                           final E next = orderedCache.removeFirst();
    +                           orderedStore.remove(next);
    +                           consumer.accept(next);
    +                   }
    +
    +                   if (orderedCache.isEmpty()) {
    +                           bulkPollStore(canConsume, consumer);
    +                   }
    +           }
    +   }
    +
    +   private void bulkPollStrictOrder(@Nonnull Predicate<E> canConsume, 
@Nonnull Consumer<E> consumer) {
                E element;
                while ((element = peek()) != null && canConsume.test(element)) {
                        poll();
                        consumer.accept(element);
                }
        }
     
    +   private void bulkPollStore(@Nonnull Predicate<E> canConsume, @Nonnull 
Consumer<E> consumer) {
    +           try (CloseableIterator<E> iterator = 
orderedStore.orderedIterator()) {
    +                   while (iterator.hasNext()) {
    +                           final E next = iterator.next();
    +                           if (canConsume.test(next)) {
    +                                   orderedStore.remove(next);
    +                                   consumer.accept(next);
    +                           } else {
    +                                   orderedCache.add(next);
    +                                   while (iterator.hasNext() && 
!orderedCache.isFull()) {
    +                                           
orderedCache.add(iterator.next());
    +                                   }
    +                                   break;
    +                           }
    +                   }
    +           } catch (Exception e) {
    +                   throw new FlinkRuntimeException("Exception while bulk 
polling store.", e);
    --- End diff --
    
    Because it makes it more explicit that there are things which can go wrong. 
With checked exceptions you still have the chance to let the program fail. But 
without them, the caller needs to know that there are unchecked exception in 
order to do any recovery operation.
    
    Moreover, I'm not sure whether we should manifest on this level how 
recovery is done or not done. For example, maybe the caller can fetch the 
latest checkpoint data again and replay all in-between elements in order to 
recompute the state. This is something which the priority queue should not need 
to bother about.


---

Reply via email to