Avinash created FLINK-29926:
-------------------------------

             Summary: File source continuous monitoring mode ignoring files 
during savepoint upgrade mode
                 Key: FLINK-29926
                 URL: https://issues.apache.org/jira/browse/FLINK-29926
             Project: Flink
          Issue Type: Bug
            Reporter: Avinash


During a stateful application upgarde using flink kubernetes operator, the 
StreamExecutionEnvironment.readFile() with 
FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new 
changes that has happened on the same file in the directory.
 
*Background* : Currently we have a fresh deployment of the application using 
kuberenetes operator using savepoint as the upgarde mode and checkpoint 
enabled. env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode 
operator starts continuosly monitoring the directory (S3 prefix) for any 
changes and also checkpoints for the provided duration.
{noformat}
2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: 
s3://test-app/configs
...
...
2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
Source Thread - Source: Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
1667817365000 and global mod time= 1667817365000
...
...
2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - ContinuousFileMonitoringFunction checkpointed 1667817365000.{noformat}
Now we try to upgarde the application using the kubernetes operator, due to 
this the application tries to take savepoint by using the below Suspend 
Mechanism - Cancel with savepoint.
By doing this, the application calls the cancel methods which inturn sets the 
globalModificationTime = Long.MAX_VALUE and then the savepoint is taken.
{noformat}
2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
Source Thread - Source: Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
1667817365000 and global mod time= 1667817365000
...
2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - ContinuousFileMonitoringFunction checkpointed 9223372036854775807
....
2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - Closed File Monitoring Source for path: s3://test-app/{noformat}
Due to this, the globalModificationTime changed from 1667817365000 to MAX_VALUE 
(9223372036854775807) and gets stored in the savepoint stateOnce the 
application restarts with the new changes, the env.readFile() operator restores 
the previous state in which the globalModificationTime = Long.MAX_VALUE and 
starts ignoring any changes done to the file after upgrade
{noformat}
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source: 
Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - Restoring state for the ContinuousFileMonitoringFunction
....
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - ContinuousFileMonitoringFunction retrieved a global mod time of 
9223372036854775807
....
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: 
s3://test-app/configs
....
2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
Source Thread - Source: Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
1667821399000 and global mod time= 9223372036854775807
...
...
2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
Source Thread - Source: Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
1667821399000 and global mod time= 9223372036854775807
...
2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
Source Thread - Source: Custom File Source (1/1)#0'] 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
 - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
1667821399000 and global mod time= 9223372036854775807{noformat}
Cause : The above seems to be due the reassignment of the 
globalModificationTime to MAX_VALUE during cancel
https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to