[ 
https://issues.apache.org/jira/browse/FLINK-29926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-29926:
-----------------------------------
      Labels: 1.15 Flink ReadFile auto-deprioritized-critical  (was: 1.15 Flink 
ReadFile stale-critical)
    Priority: Major  (was: Critical)

This issue was labeled "stale-critical" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Critical, 
please raise the priority and ask a committer to assign you the issue or revive 
the public discussion.


> File source continuous monitoring mode ignoring files during savepoint 
> upgrade mode
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-29926
>                 URL: https://issues.apache.org/jira/browse/FLINK-29926
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Avinash
>            Priority: Major
>              Labels: 1.15, Flink, ReadFile, auto-deprioritized-critical
>
> During a stateful application upgrade using flink kubernetes operator, the 
> StreamExecutionEnvironment.readFile() with 
> FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new 
> changes that has happened on the same file in the directory.
>  
> *Background* : Currently we have a fresh deployment of the application using 
> kuberenetes operator using savepoint as the upgarde mode and checkpoint 
> enabled.
> env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator 
> starts continuosly monitoring the directory (S3 prefix) for any changes and 
> also checkpoints for the provided duration.
> {noformat}
> 2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
> Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: 
> s3://test-app/configs
> ...
> ...
> 2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
> Source Thread - Source: Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
> 1667817365000 and global mod time= 1667817365000
> ...
> ...
> 2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
> Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - ContinuousFileMonitoringFunction checkpointed 1667817365000.{noformat}
> Now we try to upgrade the application using the kubernetes operator, due to 
> this the application tries to take savepoint by using the below Suspend 
> Mechanism - Cancel with savepoint.
> By doing this, the application calls the cancel methods which inturn sets the 
> globalModificationTime = Long.MAX_VALUE and then the savepoint is taken.
> {noformat}
> 2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
> Source Thread - Source: Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
> 1667817365000 and global mod time= 1667817365000
> ...
> 2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
> Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - ContinuousFileMonitoringFunction checkpointed 9223372036854775807
> ....
> 2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
> Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - Closed File Monitoring Source for path: s3://test-app/{noformat}
> Due to this, the globalModificationTime changed from 1667817365000 to 
> MAX_VALUE (9223372036854775807) and gets stored in the savepoint state.
> Once the application restarts with the new changes, the env.readFile() 
> operator restores the previous state in which the globalModificationTime = 
> Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade
> {noformat}
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source: 
> Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - Restoring state for the ContinuousFileMonitoringFunction
> ....
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
> Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - ContinuousFileMonitoringFunction retrieved a global mod time of 
> 9223372036854775807
> ....
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: 
> Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: 
> s3://test-app/configs
> ....
> 2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
> Source Thread - Source: Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
> 1667821399000 and global mod time= 9223372036854775807
> ...
> ...
> 2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
> Source Thread - Source: Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
> 1667821399000 and global mod time= 9223372036854775807
> ...
> 2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy 
> Source Thread - Source: Custom File Source (1/1)#0'] 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
>  - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 
> 1667821399000 and global mod time= 9223372036854775807{noformat}
> Cause : The above issue seems to be due the reassignment of the 
> globalModificationTime to MAX_VALUE during cancel
> [https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to