Hi I did not understand why you are using table when we are working on a program?
On Mon, Jul 26, 2021, 7:20 AM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > For the UDF solution, you can add a "file name" column to your csv file > like this: > id,value,filename > 1,100, > 2,200, > 3,300,test.csv > > Only the filename of the last record of the csv file is filled, so that > this indicates the end of file. > > Then write a UDF like this: > > public class MyUDF extends ScalarFunction { > public String eval(String filename) { > if (filename != null && filename.length > 0) { > // do the file renaming here > } > return filename > } > } > > Suppose your original SQL is like > SELECT id, value FROM csvSource; > > Then you can change your SQL to be > SELECT id, value, MyUDF(filename) FROM csvSource; > > This will, of course, add in a "useless" column and you'll have to deal > with it in your result file. > > For more on UDF, see > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ > > Samir Vasani <samirvas...@gmail.com> 于2021年7月24日周六 下午9:24写道: > >> Hi, >> Let me know if you have any idea as this is very critical for my project. >> >> Thanks & Regards, >> Samir Vasani >> >> >> >> On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani <samirvas...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Can you elaborate more on UDF as I did not understand it. >>> >>> Thanks & Regards, >>> Samir Vasani >>> >>> >>> >>> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng <tsreape...@gmail.com> >>> wrote: >>> >>>> Hi! >>>> >>>> In this case it won't work, as JobListener#onJobExecuted will only be >>>> called when the job finishes, successfully or unsuccessfully. >>>> >>>> For a forever-running job I would suggest adding a UDF right after the >>>> source and adding a special "EOF" record in each of the csv file. This UDF >>>> monitors the data flowing through it, and if it gets the EOF record it >>>> moves the file. >>>> >>>> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:44写道: >>>> >>>>> Hi Caizhi Weng, >>>>> >>>>> Thanks for your input. >>>>> I would explain the requirement in little more detail. >>>>> Flink pipeline will be running forever (until some issue happens and >>>>> we would need to restart) so It will continuously monitor if a new file >>>>> comes to the *input *folder or not. >>>>> In this case will your suggestion work? >>>>> >>>>> >>>>> Thanks & Regards, >>>>> Samir Vasani >>>>> >>>>> >>>>> >>>>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng <tsreape...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> JobListener#onJobExecuted might help, if your job is not a >>>>>> forever-running streaming job. See >>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html >>>>>> >>>>>> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:22写道: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am a new bee to flink and facing some challenges to solve below >>>>>>> use case >>>>>>> >>>>>>> Use Case description: >>>>>>> >>>>>>> I will receive a csv file with a timestamp on every single day in >>>>>>> some folder say *input*.The file format would be >>>>>>> *file_name_dd-mm-yy-hh-mm-ss.csv*. >>>>>>> >>>>>>> Now my flink pipeline will read this csv file in a row by row >>>>>>> fashion and it will be written to my Kafka topic. >>>>>>> >>>>>>> Once the pipeline reads the entire file then this file needs to be >>>>>>> moved to another folder say *historic* so that i can keep *input * >>>>>>> folder >>>>>>> empty for the new file. >>>>>>> >>>>>>> I googled a lot but did not find anything so can you guide me to >>>>>>> achieve this. >>>>>>> >>>>>>> Let me know if anything else is required. >>>>>>> >>>>>>> >>>>>>> Samir Vasani >>>>>>> >>>>>>