Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6062#discussion_r191413234
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
    @@ -301,114 +259,39 @@ public void advanceWatermark(long time) throws 
Exception {
                                keySerializer.snapshotConfiguration(),
                                namespaceSerializer,
                                namespaceSerializer.snapshotConfiguration(),
    -                           getEventTimeTimerSetForKeyGroup(keyGroupIdx),
    -                           
getProcessingTimeTimerSetForKeyGroup(keyGroupIdx));
    +                           
eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx),
    +                           
processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx));
        }
     
        /**
         * Restore the timers (both processing and event time ones) for a given 
{@code keyGroupIdx}.
         *
    -    * @param restoredTimersSnapshot the restored snapshot containing the 
key-group's timers,
    +    * @param restoredSnapshot the restored snapshot containing the 
key-group's timers,
         *                       and the serializers that were used to write 
them
         * @param keyGroupIdx the id of the key-group to be put in the snapshot.
         */
        @SuppressWarnings("unchecked")
    -   public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> 
restoredTimersSnapshot, int keyGroupIdx) throws IOException {
    -           this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) 
restoredTimersSnapshot;
    +   public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> 
restoredSnapshot, int keyGroupIdx) {
    +           this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) 
restoredSnapshot;
     
    -           if ((this.keyDeserializer != null && 
!this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer())) ||
    -                   (this.namespaceDeserializer != null && 
!this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer())))
 {
    +           if ((this.keyDeserializer != null && 
!this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
    --- End diff --
    
    This check could be factored out into a method with a meaningful and easy 
to understand name, e.g. `checkSerializerCompatibility`.


---

Reply via email to