Hi team, I'm also interested in finding out if there is Java code available to determine the extent to which a Flink job has processed files within a directory. Additionally, I'm curious about where the details of the processed files are stored within Flink.
Thanks and regards, Arjun S On Mon, 30 Oct 2023 at 10:48, arjun s <arjunjoice...@gmail.com> wrote: > Hi team, > > I appreciate the information provided. I'm inquiring whether there exists > a method to automatically relocate processed files from a directory once a > Flink job has completed processing them. I'm particularly keen on > understanding how this particular use case is currently managed in > production environments. Additionally, I'm curious about whether there's a > way to track which files in the directory have been processed by the Flink > job. > Looking forward to your response. > > Thank you. Regards > Arjun > > On Sat, 28 Oct 2023 at 23:42, Alexander Fedulov < > alexander.fedu...@gmail.com> wrote: > >> > Or was it the querying of the checkpoints you were advising against? >> >> Yes, I meant the approach, not file removal itself. Mainly because how >> exactly FileSource stores its state is an implementation detail and there >> are no external guarantees for its consistency between even the minor >> versions. >> On top of that, the original author of the StateProcessor API has moved >> to another project, so it has not been actively worked on recently. I am >> not sure it is even possible to access the FileSource state directly with >> it since FLIP-27 sources do not use the OperatorState abstraction directly >> [1]. >> >> [1] >> https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L510 >> >> Best, >> Alexander >> >> On Sat, 28 Oct 2023 at 16:13, Andrew Otto <o...@wikimedia.org> wrote: >> >>> > This is not a robust solution, I would advise against it. >>> Oh no? Am curious as to why not. It seems not dissimilar to how Kafka >>> topic retention works: the messages are removed after some time period >>> (hopefully after they are processed), so why would it be bad to remove >>> files that are already processed? >>> >>> Or was it the querying of the checkpoints you were advising against? >>> >>> To be sure, I was referring to moving the previously processed files >>> away, not the checkpoints themselves. >>> >>> On Fri, Oct 27, 2023 at 12:45 PM Alexander Fedulov < >>> alexander.fedu...@gmail.com> wrote: >>> >>>> > I wonder if you could use this fact to query the committed >>>> checkpoints and move them away after the job is done. >>>> >>>> This is not a robust solution, I would advise against it. >>>> >>>> Best, >>>> Alexander >>>> >>>> On Fri, 27 Oct 2023 at 16:41, Andrew Otto <o...@wikimedia.org> wrote: >>>> >>>>> For moving the files: >>>>> > It will keep the files as is and remember the name of the file read >>>>> in checkpointed state to ensure it doesnt read the same file twice. >>>>> >>>>> I wonder if you could use this fact to query the committed checkpoints >>>>> and move them away after the job is done. I think it should even be safe >>>>> to do this outside of the Flink job periodically (cron, whatever), because >>>>> on restart it won't reprocess the files that have been committed in the >>>>> checkpoints. >>>>> >>>>> >>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/libs/state_processor_api/#reading-state >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Oct 27, 2023 at 1:13 AM arjun s <arjunjoice...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi team, Thanks for your quick response. >>>>>> I have an inquiry regarding file processing in the event of a job >>>>>> restart. When the job is restarted, we encounter challenges in tracking >>>>>> which files have been processed and which remain pending. Is there a >>>>>> method >>>>>> to seamlessly resume processing files from where they were left off, >>>>>> particularly in situations where we need to submit and restart the job >>>>>> manually due to any server restart or application restart? This becomes >>>>>> an >>>>>> issue when the job processes all the files in the directory from the >>>>>> beginning after a restart, and I'm seeking a solution to address this. >>>>>> >>>>>> Thanks and regards, >>>>>> Arjun >>>>>> >>>>>> On Fri, 27 Oct 2023 at 07:29, Chirag Dewan <chirag.dewa...@yahoo.in> >>>>>> wrote: >>>>>> >>>>>>> Hi Arjun, >>>>>>> >>>>>>> Flink's FileSource doesnt move or delete the files as of now. It >>>>>>> will keep the files as is and remember the name of the file read in >>>>>>> checkpointed state to ensure it doesnt read the same file twice. >>>>>>> >>>>>>> Flink's source API works in a way that single Enumerator operates on >>>>>>> the JobManager. The enumerator is responsible for listing the files and >>>>>>> splitting these into smaller units. These units could be the complete >>>>>>> file >>>>>>> (in case of row formats) or splits within a file (for bulk formats). The >>>>>>> reading is done by SplitReaders in the Task Managers. This way it >>>>>>> ensures >>>>>>> that only reading is done concurrently and is able to track file >>>>>>> completions. >>>>>>> >>>>>>> You can read more Flink Sources >>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/> >>>>>>> and here >>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/> >>>>>>> >>>>>>> FileSystem >>>>>>> >>>>>>> FileSystem # This connector provides a unified Source and Sink for >>>>>>> BATCH and STREAMING that reads or writes (par... >>>>>>> >>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thursday, 26 October, 2023 at 06:53:23 pm IST, arjun s < >>>>>>> arjunjoice...@gmail.com> wrote: >>>>>>> >>>>>>> >>>>>>> Hello team, >>>>>>> I'm currently in the process of configuring a Flink job. This job >>>>>>> entails reading files from a specified directory and then transmitting >>>>>>> the >>>>>>> data to a Kafka sink. I've already successfully designed a Flink job >>>>>>> that >>>>>>> reads the file contents in a streaming manner and effectively sends >>>>>>> them to >>>>>>> Kafka. However, my specific requirement is a bit more intricate. I need >>>>>>> the >>>>>>> job to not only read these files and push the data to Kafka but also >>>>>>> relocate the processed file to a different directory once all of its >>>>>>> contents have been processed. Following this, the job should seamlessly >>>>>>> transition to processing the next file in the source directory. >>>>>>> Additionally, I have some concerns regarding how the job will behave if >>>>>>> it >>>>>>> encounters a restart. Could you please advise if this is achievable, >>>>>>> and if >>>>>>> so, provide guidance or code to implement it? >>>>>>> >>>>>>> I'm also quite interested in how the job will handle situations >>>>>>> where the source has a parallelism greater than 2 or 3, and how it can >>>>>>> accurately monitor the completion of reading all contents in each file. >>>>>>> >>>>>>> Thanks and Regards, >>>>>>> Arjun >>>>>>> >>>>>>