Hi, Let me know if you have any idea as this is very critical for my project.
Thanks & Regards, Samir Vasani On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani <samirvas...@gmail.com> wrote: > Hi, > > Can you elaborate more on UDF as I did not understand it. > > Thanks & Regards, > Samir Vasani > > > > On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> In this case it won't work, as JobListener#onJobExecuted will only be >> called when the job finishes, successfully or unsuccessfully. >> >> For a forever-running job I would suggest adding a UDF right after the >> source and adding a special "EOF" record in each of the csv file. This UDF >> monitors the data flowing through it, and if it gets the EOF record it >> moves the file. >> >> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:44写道: >> >>> Hi Caizhi Weng, >>> >>> Thanks for your input. >>> I would explain the requirement in little more detail. >>> Flink pipeline will be running forever (until some issue happens and we >>> would need to restart) so It will continuously monitor if a new file comes >>> to the *input *folder or not. >>> In this case will your suggestion work? >>> >>> >>> Thanks & Regards, >>> Samir Vasani >>> >>> >>> >>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng <tsreape...@gmail.com> >>> wrote: >>> >>>> Hi! >>>> >>>> JobListener#onJobExecuted might help, if your job is not a >>>> forever-running streaming job. See >>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html >>>> >>>> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:22写道: >>>> >>>>> Hi, >>>>> >>>>> I am a new bee to flink and facing some challenges to solve below use >>>>> case >>>>> >>>>> Use Case description: >>>>> >>>>> I will receive a csv file with a timestamp on every single day in some >>>>> folder say *input*.The file format would be >>>>> *file_name_dd-mm-yy-hh-mm-ss.csv*. >>>>> >>>>> Now my flink pipeline will read this csv file in a row by row fashion >>>>> and it will be written to my Kafka topic. >>>>> >>>>> Once the pipeline reads the entire file then this file needs to be >>>>> moved to another folder say *historic* so that i can keep *input * folder >>>>> empty for the new file. >>>>> >>>>> I googled a lot but did not find anything so can you guide me to >>>>> achieve this. >>>>> >>>>> Let me know if anything else is required. >>>>> >>>>> >>>>> Samir Vasani >>>>> >>>>