> I wonder if you could use this fact to query the committed checkpoints and move them away after the job is done.
This is not a robust solution, I would advise against it. Best, Alexander On Fri, 27 Oct 2023 at 16:41, Andrew Otto <o...@wikimedia.org> wrote: > For moving the files: > > It will keep the files as is and remember the name of the file read in > checkpointed state to ensure it doesnt read the same file twice. > > I wonder if you could use this fact to query the committed checkpoints and > move them away after the job is done. I think it should even be safe to do > this outside of the Flink job periodically (cron, whatever), because on > restart it won't reprocess the files that have been committed in the > checkpoints. > > > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/libs/state_processor_api/#reading-state > > > > > On Fri, Oct 27, 2023 at 1:13 AM arjun s <arjunjoice...@gmail.com> wrote: > >> Hi team, Thanks for your quick response. >> I have an inquiry regarding file processing in the event of a job >> restart. When the job is restarted, we encounter challenges in tracking >> which files have been processed and which remain pending. Is there a method >> to seamlessly resume processing files from where they were left off, >> particularly in situations where we need to submit and restart the job >> manually due to any server restart or application restart? This becomes an >> issue when the job processes all the files in the directory from the >> beginning after a restart, and I'm seeking a solution to address this. >> >> Thanks and regards, >> Arjun >> >> On Fri, 27 Oct 2023 at 07:29, Chirag Dewan <chirag.dewa...@yahoo.in> >> wrote: >> >>> Hi Arjun, >>> >>> Flink's FileSource doesnt move or delete the files as of now. It will >>> keep the files as is and remember the name of the file read in checkpointed >>> state to ensure it doesnt read the same file twice. >>> >>> Flink's source API works in a way that single Enumerator operates on the >>> JobManager. The enumerator is responsible for listing the files and >>> splitting these into smaller units. These units could be the complete file >>> (in case of row formats) or splits within a file (for bulk formats). The >>> reading is done by SplitReaders in the Task Managers. This way it ensures >>> that only reading is done concurrently and is able to track file >>> completions. >>> >>> You can read more Flink Sources >>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/> >>> and here >>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/> >>> >>> FileSystem >>> >>> FileSystem # This connector provides a unified Source and Sink for BATCH >>> and STREAMING that reads or writes (par... >>> >>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/> >>> >>> >>> >>> On Thursday, 26 October, 2023 at 06:53:23 pm IST, arjun s < >>> arjunjoice...@gmail.com> wrote: >>> >>> >>> Hello team, >>> I'm currently in the process of configuring a Flink job. This job >>> entails reading files from a specified directory and then transmitting the >>> data to a Kafka sink. I've already successfully designed a Flink job that >>> reads the file contents in a streaming manner and effectively sends them to >>> Kafka. However, my specific requirement is a bit more intricate. I need the >>> job to not only read these files and push the data to Kafka but also >>> relocate the processed file to a different directory once all of its >>> contents have been processed. Following this, the job should seamlessly >>> transition to processing the next file in the source directory. >>> Additionally, I have some concerns regarding how the job will behave if it >>> encounters a restart. Could you please advise if this is achievable, and if >>> so, provide guidance or code to implement it? >>> >>> I'm also quite interested in how the job will handle situations where >>> the source has a parallelism greater than 2 or 3, and how it can accurately >>> monitor the completion of reading all contents in each file. >>> >>> Thanks and Regards, >>> Arjun >>> >>