Hi!

For the UDF solution, you can add a "file name" column to your csv file
like this:
id,value,filename
1,100,
2,200,
3,300,test.csv

Only the filename of the last record of the csv file is filled, so that
this indicates the end of file.

Then write a UDF like this:

public class MyUDF extends ScalarFunction {
  public String eval(String filename) {
    if (filename != null && filename.length > 0) {
      // do the file renaming here
    }
    return filename
  }
}

Suppose your original SQL is like
SELECT id, value FROM csvSource;

Then you can change your SQL to be
SELECT id, value, MyUDF(filename) FROM csvSource;

This will, of course, add in a "useless" column and you'll have to deal
with it in your result file.

For more on UDF, see
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/

Samir Vasani <samirvas...@gmail.com> 于2021年7月24日周六 下午9:24写道:

> Hi,
> Let me know if you have any idea as this is very critical for my project.
>
> Thanks & Regards,
> Samir Vasani
>
>
>
> On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani <samirvas...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Can you elaborate more on UDF as I did not understand it.
>>
>> Thanks & Regards,
>> Samir Vasani
>>
>>
>>
>> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> In this case it won't work, as JobListener#onJobExecuted will only be
>>> called when the job finishes, successfully or unsuccessfully.
>>>
>>> For a forever-running job I would suggest adding a UDF right after the
>>> source and adding a special "EOF" record in each of the csv file. This UDF
>>> monitors the data flowing through it, and if it gets the EOF record it
>>> moves the file.
>>>
>>> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:44写道:
>>>
>>>> Hi Caizhi Weng,
>>>>
>>>> Thanks for your input.
>>>> I would explain the requirement in little more detail.
>>>> Flink pipeline will be running forever (until some issue happens and we
>>>> would need to restart) so It will continuously monitor if a new file comes
>>>> to the *input *folder or not.
>>>> In this case will your suggestion work?
>>>>
>>>>
>>>> Thanks & Regards,
>>>> Samir Vasani
>>>>
>>>>
>>>>
>>>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng <tsreape...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> JobListener#onJobExecuted might help, if your job is not a
>>>>> forever-running streaming job. See
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>>>>>
>>>>> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:22写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am a new bee to flink and facing some challenges to solve below use
>>>>>> case
>>>>>>
>>>>>> Use Case description:
>>>>>>
>>>>>> I will receive a csv file with a timestamp on every single day in
>>>>>> some folder say *input*.The file format would be
>>>>>> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>>>>>>
>>>>>> Now my flink pipeline will read this csv file in a row by row fashion
>>>>>> and it will be written to my Kafka topic.
>>>>>>
>>>>>> Once the pipeline reads the entire file then this file needs to be
>>>>>> moved to another folder say *historic* so that i can keep *input * folder
>>>>>> empty for the new file.
>>>>>>
>>>>>> I googled a lot but did not find anything so can you guide me to
>>>>>> achieve this.
>>>>>>
>>>>>> Let me know if anything else is required.
>>>>>>
>>>>>>
>>>>>> Samir Vasani
>>>>>>
>>>>>

Reply via email to