[ https://issues.apache.org/jira/browse/FLINK-29926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-29926: ----------------------------------- Labels: 1.15 Flink ReadFile stale-critical (was: 1.15 Flink ReadFile) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Critical but is unassigned and neither itself nor its Sub-Tasks have been updated for 14 days. I have gone ahead and marked it "stale-critical". If this ticket is critical, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > File source continuous monitoring mode ignoring files during savepoint > upgrade mode > ----------------------------------------------------------------------------------- > > Key: FLINK-29926 > URL: https://issues.apache.org/jira/browse/FLINK-29926 > Project: Flink > Issue Type: Bug > Reporter: Avinash > Priority: Critical > Labels: 1.15, Flink, ReadFile, stale-critical > > During a stateful application upgrade using flink kubernetes operator, the > StreamExecutionEnvironment.readFile() with > FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new > changes that has happened on the same file in the directory. > > *Background* : Currently we have a fresh deployment of the application using > kuberenetes operator using savepoint as the upgarde mode and checkpoint > enabled. > env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator > starts continuosly monitoring the directory (S3 prefix) for any changes and > also checkpoints for the provided duration. > {noformat} > 2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: > s3://test-app/configs > ... > ... > 2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667817365000 and global mod time= 1667817365000 > ... > ... > 2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - ContinuousFileMonitoringFunction checkpointed 1667817365000.{noformat} > Now we try to upgrade the application using the kubernetes operator, due to > this the application tries to take savepoint by using the below Suspend > Mechanism - Cancel with savepoint. > By doing this, the application calls the cancel methods which inturn sets the > globalModificationTime = Long.MAX_VALUE and then the savepoint is taken. > {noformat} > 2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667817365000 and global mod time= 1667817365000 > ... > 2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - ContinuousFileMonitoringFunction checkpointed 9223372036854775807 > .... > 2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Closed File Monitoring Source for path: s3://test-app/{noformat} > Due to this, the globalModificationTime changed from 1667817365000 to > MAX_VALUE (9223372036854775807) and gets stored in the savepoint state. > Once the application restarts with the new changes, the env.readFile() > operator restores the previous state in which the globalModificationTime = > Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade > {noformat} > 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Restoring state for the ContinuousFileMonitoringFunction > .... > 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - ContinuousFileMonitoringFunction retrieved a global mod time of > 9223372036854775807 > .... > 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: > Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: > s3://test-app/configs > .... > 2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667821399000 and global mod time= 9223372036854775807 > ... > ... > 2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667821399000 and global mod time= 9223372036854775807 > ... > 2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy > Source Thread - Source: Custom File Source (1/1)#0'] > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ > - Ignoring s3://test-app/configs/control-event-config.json, with mod time= > 1667821399000 and global mod time= 9223372036854775807{noformat} > Cause : The above issue seems to be due the reassignment of the > globalModificationTime to MAX_VALUE during cancel > [https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)