Hi! For the UDF solution, you can add a "file name" column to your csv file like this: id,value,filename 1,100, 2,200, 3,300,test.csv
Only the filename of the last record of the csv file is filled, so that this indicates the end of file. Then write a UDF like this: public class MyUDF extends ScalarFunction { public String eval(String filename) { if (filename != null && filename.length > 0) { // do the file renaming here } return filename } } Suppose your original SQL is like SELECT id, value FROM csvSource; Then you can change your SQL to be SELECT id, value, MyUDF(filename) FROM csvSource; This will, of course, add in a "useless" column and you'll have to deal with it in your result file. For more on UDF, see https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ Samir Vasani <samirvas...@gmail.com> 于2021年7月24日周六 下午9:24写道: > Hi, > Let me know if you have any idea as this is very critical for my project. > > Thanks & Regards, > Samir Vasani > > > > On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani <samirvas...@gmail.com> > wrote: > >> Hi, >> >> Can you elaborate more on UDF as I did not understand it. >> >> Thanks & Regards, >> Samir Vasani >> >> >> >> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng <tsreape...@gmail.com> wrote: >> >>> Hi! >>> >>> In this case it won't work, as JobListener#onJobExecuted will only be >>> called when the job finishes, successfully or unsuccessfully. >>> >>> For a forever-running job I would suggest adding a UDF right after the >>> source and adding a special "EOF" record in each of the csv file. This UDF >>> monitors the data flowing through it, and if it gets the EOF record it >>> moves the file. >>> >>> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:44写道: >>> >>>> Hi Caizhi Weng, >>>> >>>> Thanks for your input. >>>> I would explain the requirement in little more detail. >>>> Flink pipeline will be running forever (until some issue happens and we >>>> would need to restart) so It will continuously monitor if a new file comes >>>> to the *input *folder or not. >>>> In this case will your suggestion work? >>>> >>>> >>>> Thanks & Regards, >>>> Samir Vasani >>>> >>>> >>>> >>>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng <tsreape...@gmail.com> >>>> wrote: >>>> >>>>> Hi! >>>>> >>>>> JobListener#onJobExecuted might help, if your job is not a >>>>> forever-running streaming job. See >>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html >>>>> >>>>> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:22写道: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am a new bee to flink and facing some challenges to solve below use >>>>>> case >>>>>> >>>>>> Use Case description: >>>>>> >>>>>> I will receive a csv file with a timestamp on every single day in >>>>>> some folder say *input*.The file format would be >>>>>> *file_name_dd-mm-yy-hh-mm-ss.csv*. >>>>>> >>>>>> Now my flink pipeline will read this csv file in a row by row fashion >>>>>> and it will be written to my Kafka topic. >>>>>> >>>>>> Once the pipeline reads the entire file then this file needs to be >>>>>> moved to another folder say *historic* so that i can keep *input * folder >>>>>> empty for the new file. >>>>>> >>>>>> I googled a lot but did not find anything so can you guide me to >>>>>> achieve this. >>>>>> >>>>>> Let me know if anything else is required. >>>>>> >>>>>> >>>>>> Samir Vasani >>>>>> >>>>>