I am trying to understand the Flink design pattern for consuming files from S3 continuously as they appear. I have written the below minimal program to do that and it works as expected wrt detecting newly-uploaded S3 files within the configured 5 second monitoring poll period. Then it just prints the file contents to stdout as a simple task.
Where I am having difficulty is understanding the design pattern to maintain state so that upon restart, the Flink app will NOT reprocess files that it already processed. I can see that Flink is retaining state in my configured state location at file:///var/tmp/flink/checkpoints/ and when inspecting state files at paths like /var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata, I see that the S3 path to the files already processed by Flink show up in the _metadata file. So, I know the state of each file is being captured. Now I am trying to understand a few concepts; 1. How much state will Flink retain? If the files in the bucket are retained for a long time then there could be a lot of files piling up in the bucket with say, a life cycle delete policy of 30 days. It seems that Flink would have to retain the complete list to be able to avoid reprocessing existing files and that would be quite a lot of state. 2. I understand from the docs that you can restart Flink using state from either a savepoint or a checkpoint. I was trying to restart my test application standalone using the following command from my dev environment but, upon startup, it still reprocesses the files that are in the _metadata state captured from the previous run. Is the "--fromSavepoint" option the correct way to specify the savepoint file to be read at startup? /usr/bin/env \ ASP_USE_LOCAL_ENV=1 \ ASP_VERBOSE_LOGGING=true \ ASP_CHECKPOINT_INTERVAL_MILLISECONDS=5000 \ /usr/lib/jvm/java-11-openjdk-amd64/bin/java \ @./startup.argfile \ com.myapp.weatherstreamprocessor.WeatherStreamProcessor \ --fromSavepoint /var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata I am using the Flink operator to deploy my Flink application to EKS and already have one production Flink application that is consuming from and writing to Kinesis, so I have some initial Flink experience doing that. So, I realize that, when deployed in my EKS cluster, checkpointing is meant for recovery of the task managers by the job manager should the task managers need to be restarted. And, for managed restarts (like code updates), I should be using an explicitly created savepoint. But I am just trying to prove the behavior in my test environment. Could someone kindly direct me to the right approach to be able to restart in my test environment, read the checkpoint, and NOT have Flink reprocesses the files already seen in the previous running instance? Thanks! # ========================================================================================== # My Test Application # ========================================================================================== package com.myapp.weatherstreamprocessor; import java.time.Duration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class WeatherStreamProcessor { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true); conf.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024 ^ 3)); conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024 ^ 3)); conf.set(TaskManagerOptions.CPU_CORES, 4.0); final StreamExecutionEnvironment env; Config appCfg = new Config(); if (appCfg.getUseLocalEnv()) { env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); } else { env = StreamExecutionEnvironment.getExecutionEnvironment(conf); } env.setParallelism(appCfg.getParallelism()); if (appCfg.getCheckpointInterval() > 0) { env.enableCheckpointing(appCfg.getCheckpointInterval()); } CheckpointConfig config = env.getCheckpointConfig(); config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.setCheckpointStorage("file:///var/tmp/flink/checkpoints/"); SetupJob(env, appCfg); env.execute("Weather Stream Processor"); } static void SetupJob(StreamExecutionEnvironment env, Config appCfg) { final FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("s3://stats-staging/")) .monitorContinuously(Duration.ofSeconds(5L)) .build(); final DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); stream.print(); } }