[ 
https://issues.apache.org/jira/browse/FLINK-4636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533001#comment-15533001
 ] 

ASF GitHub Bot commented on FLINK-4636:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2568#discussion_r81155827
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 ---
    @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) 
throws Exception {
     
                int numberPriorityQueueEntries = ois.readInt();
     
    -           priorityQueue = new 
PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new 
StreamRecordComparator<IN>());
    -
    -           for (int i = 0; i <numberPriorityQueueEntries; i++) {
    -                   StreamElement streamElement = 
streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois));
    -                   priorityQueue.offer(streamElement.<IN>asRecord());
    +           if(numberPriorityQueueEntries > 0) {
    +                   priorityQueue = new 
PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new 
StreamRecordComparator<IN>());
    --- End diff --
    
    Initialize with size `Math.max(INITIAL_PRIORITY_QUEUE_CAPACITY, 
numberPriorityQueueEntries)`?


> AbstractCEPPatternOperator fails to restore state
> -------------------------------------------------
>
>                 Key: FLINK-4636
>                 URL: https://issues.apache.org/jira/browse/FLINK-4636
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.2.0, 1.1.2
>            Reporter: Fabian Hueske
>             Fix For: 1.2.0, 1.1.3
>
>
> The {{restoreState()}} of the {{AbstractCEPPatternOperator}} restores the a 
> Java {{PriorityQueue}}. For that it first reads the number of elements to 
> insert and then creates a {{PriorityQueue}} object. However, Java's 
> {{PriorityQueue}} cannot be instantiated with an initial capacity of {{0}}, 
> which is not checked.
> In case of an empty queue, the {{PriorityQueue}} should be instantiated with 
> an initial size of {{1}}.
> See 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-CEPPatternOperator-when-taskmanager-is-killed-tp9024.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to