[ https://issues.apache.org/jira/browse/FLINK-29926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-29926: ----------------------------------- Labels: 1.15 Flink ReadFile auto-deprioritized-critical (was: 1.15 Flink ReadFile stale-critical) Priority: Major (was: Critical) This issue was labeled "stale-critical" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Critical, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > File source continuous monitoring mode ignoring files during savepoint > upgrade mode > ----------------------------------------------------------------------------------- > > Key: FLINK-29926 > URL: https://issues.apache.org/jira/browse/FLINK-29926 > Project: Flink > Issue Type: Bug > Reporter: Avinash > Priority: Major > Labels: 1.15, Flink, ReadFile, auto-deprioritized-critical > > During a stateful application upgrade using flink kubernetes operator, the > StreamExecutionEnvironment.readFile() with > FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new > changes that has happened on the same file in the directory. > > *Background* : Currently we have a fresh deployment of the application using > kuberenetes operator using savepoint as the upgarde mode and checkpoint > enabled. > env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator > starts continuosly monitoring the directory (S3 prefix) for any changes and > also checkpoints for the provided duration. > {noformat} > 2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: > s3://test-app/configs > ... > ... > 2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667817365000 and global mod time= 1667817365000 > ... > ... > 2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - ContinuousFileMonitoringFunction checkpointed 1667817365000.{noformat} > Now we try to upgrade the application using the kubernetes operator, due to > this the application tries to take savepoint by using the below Suspend > Mechanism - Cancel with savepoint. > By doing this, the application calls the cancel methods which inturn sets the > globalModificationTime = Long.MAX_VALUE and then the savepoint is taken. > {noformat} > 2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667817365000 and global mod time= 1667817365000 > ... > 2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - ContinuousFileMonitoringFunction checkpointed 9223372036854775807 > .... > 2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Closed File Monitoring Source for path: s3://test-app/{noformat} > Due to this, the globalModificationTime changed from 1667817365000 to > MAX_VALUE (9223372036854775807) and gets stored in the savepoint state. > Once the application restarts with the new changes, the env.readFile() > operator restores the previous state in which the globalModificationTime = > Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade > {noformat} > 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Restoring state for the ContinuousFileMonitoringFunction > .... > 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - ContinuousFileMonitoringFunction retrieved a global mod time of > 9223372036854775807 > .... > 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: > s3://test-app/configs > .... > 2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667821399000 and global mod time= 9223372036854775807 > ... > ... > 2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667821399000 and global mod time= 9223372036854775807 > ... > 2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667821399000 and global mod time= 9223372036854775807{noformat} > Cause : The above issue seems to be due the reassignment of the > globalModificationTime to MAX_VALUE during cancel > [https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)