Hi,
Let me know if you have any idea as this is very critical for my project.

Thanks & Regards,
Samir Vasani



On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani <samirvas...@gmail.com> wrote:

> Hi,
>
> Can you elaborate more on UDF as I did not understand it.
>
> Thanks & Regards,
> Samir Vasani
>
>
>
> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> In this case it won't work, as JobListener#onJobExecuted will only be
>> called when the job finishes, successfully or unsuccessfully.
>>
>> For a forever-running job I would suggest adding a UDF right after the
>> source and adding a special "EOF" record in each of the csv file. This UDF
>> monitors the data flowing through it, and if it gets the EOF record it
>> moves the file.
>>
>> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:44写道:
>>
>>> Hi Caizhi Weng,
>>>
>>> Thanks for your input.
>>> I would explain the requirement in little more detail.
>>> Flink pipeline will be running forever (until some issue happens and we
>>> would need to restart) so It will continuously monitor if a new file comes
>>> to the *input *folder or not.
>>> In this case will your suggestion work?
>>>
>>>
>>> Thanks & Regards,
>>> Samir Vasani
>>>
>>>
>>>
>>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng <tsreape...@gmail.com>
>>> wrote:
>>>
>>>> Hi!
>>>>
>>>> JobListener#onJobExecuted might help, if your job is not a
>>>> forever-running streaming job. See
>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>>>>
>>>> Samir Vasani <samirvas...@gmail.com> 于2021年7月23日周五 下午3:22写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am a new bee to flink and facing some challenges to solve below use
>>>>> case
>>>>>
>>>>> Use Case description:
>>>>>
>>>>> I will receive a csv file with a timestamp on every single day in some
>>>>> folder say *input*.The file format would be
>>>>> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>>>>>
>>>>> Now my flink pipeline will read this csv file in a row by row fashion
>>>>> and it will be written to my Kafka topic.
>>>>>
>>>>> Once the pipeline reads the entire file then this file needs to be
>>>>> moved to another folder say *historic* so that i can keep *input * folder
>>>>> empty for the new file.
>>>>>
>>>>> I googled a lot but did not find anything so can you guide me to
>>>>> achieve this.
>>>>>
>>>>> Let me know if anything else is required.
>>>>>
>>>>>
>>>>> Samir Vasani
>>>>>
>>>>

Reply via email to