Hi Prasanna,

I think this is as expected. There is no support for monitoring
changes to existing files.

Best regards,

Martijn

On Fri, Jan 5, 2024 at 10:22 AM Prasanna kumar
<prasannakumarram...@gmail.com> wrote:
>
> Hi Flink Community,
>
>
> I hope this email finds you well. I am currently in the process of migrating 
> my Flink application from version 1.12.7 to 1.17.2 and have encountered a 
> behavior issue with the FileSource while reading data from an S3 bucket.
>
>  In the previous version (1.12.7), I was utilizing the readFile method with 
> the TextInputFormat to continuously monitor the S3 bucket for any updates or 
> new files added at a specified time interval. The code snippet for this was 
> as follows:
>
>
>
> streamExecutionEnvironment
>     .readFile(new TextInputFormat(new Path("s3://my-s3-path")),
>               "s3://my-s3-path",
>               FileProcessingMode.PROCESS_CONTINUOUSLY,
>               10000)
>     .setParallelism(1);
>
>
>
> Now, after migrating to Flink 1.17.2, I have switched to using the FileSource 
> for continuous monitoring. The code snippet for this is as follows:
>
> FileSource<String> fileSource = FileSource
>     .forRecordStreamFormat(new TextLineInputFormat(), new 
> Path("s3://my-s3-path"))
>     .monitorContinuously(Duration.ofMillis(10000))
>     .build();
>
> streamExecutionEnvironment
>     .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource")
>     .uid("filesource")
>     .setParallelism(1);
>
> While this setup successfully detects new files added to the S3 bucket, but 
> it seems to be missing changes made to existing files. I am unsure if this is 
> expected behavior in Flink 1.17.2 or if there is a configuration detail I 
> might be overlooking.
>
>  Any guidance or suggestions on resolving this issue would be greatly 
> appreciated.
>
> Thanks,
> Prasanna

Reply via email to