[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493599#comment-16493599
 ] 

ASF GitHub Bot commented on FLINK-9423:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6062#discussion_r191413234
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
    @@ -301,114 +259,39 @@ public void advanceWatermark(long time) throws 
Exception {
                                keySerializer.snapshotConfiguration(),
                                namespaceSerializer,
                                namespaceSerializer.snapshotConfiguration(),
    -                           getEventTimeTimerSetForKeyGroup(keyGroupIdx),
    -                           
getProcessingTimeTimerSetForKeyGroup(keyGroupIdx));
    +                           
eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx),
    +                           
processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx));
        }
     
        /**
         * Restore the timers (both processing and event time ones) for a given 
{@code keyGroupIdx}.
         *
    -    * @param restoredTimersSnapshot the restored snapshot containing the 
key-group's timers,
    +    * @param restoredSnapshot the restored snapshot containing the 
key-group's timers,
         *                       and the serializers that were used to write 
them
         * @param keyGroupIdx the id of the key-group to be put in the snapshot.
         */
        @SuppressWarnings("unchecked")
    -   public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> 
restoredTimersSnapshot, int keyGroupIdx) throws IOException {
    -           this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) 
restoredTimersSnapshot;
    +   public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> 
restoredSnapshot, int keyGroupIdx) {
    +           this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) 
restoredSnapshot;
     
    -           if ((this.keyDeserializer != null && 
!this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer())) ||
    -                   (this.namespaceDeserializer != null && 
!this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer())))
 {
    +           if ((this.keyDeserializer != null && 
!this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
    --- End diff --
    
    This check could be factored out into a method with a meaningful and easy 
to understand name, e.g. `checkSerializerCompatibility`.


> Implement efficient deletes for heap based timer service
> --------------------------------------------------------
>
>                 Key: FLINK-9423
>                 URL: https://issues.apache.org/jira/browse/FLINK-9423
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.5.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to