While the readFile method would monitor changes to existing files, it would completely re-ingest each changed file after every change. This behavior wasn't very user friendly.
David On Fri, Jan 5, 2024 at 2:22 AM Martijn Visser <martijnvis...@apache.org> wrote: > Hi Prasanna, > > I think this is as expected. There is no support for monitoring > changes to existing files. > > Best regards, > > Martijn > > On Fri, Jan 5, 2024 at 10:22 AM Prasanna kumar > <prasannakumarram...@gmail.com> wrote: > > > > Hi Flink Community, > > > > > > I hope this email finds you well. I am currently in the process of > migrating my Flink application from version 1.12.7 to 1.17.2 and have > encountered a behavior issue with the FileSource while reading data from an > S3 bucket. > > > > In the previous version (1.12.7), I was utilizing the readFile method > with the TextInputFormat to continuously monitor the S3 bucket for any > updates or new files added at a specified time interval. The code snippet > for this was as follows: > > > > > > > > streamExecutionEnvironment > > .readFile(new TextInputFormat(new Path("s3://my-s3-path")), > > "s3://my-s3-path", > > FileProcessingMode.PROCESS_CONTINUOUSLY, > > 10000) > > .setParallelism(1); > > > > > > > > Now, after migrating to Flink 1.17.2, I have switched to using the > FileSource for continuous monitoring. The code snippet for this is as > follows: > > > > FileSource<String> fileSource = FileSource > > .forRecordStreamFormat(new TextLineInputFormat(), new > Path("s3://my-s3-path")) > > .monitorContinuously(Duration.ofMillis(10000)) > > .build(); > > > > streamExecutionEnvironment > > .fromSource(fileSource, WatermarkStrategy.noWatermarks(), > "filesource") > > .uid("filesource") > > .setParallelism(1); > > > > While this setup successfully detects new files added to the S3 bucket, > but it seems to be missing changes made to existing files. I am unsure if > this is expected behavior in Flink 1.17.2 or if there is a configuration > detail I might be overlooking. > > > > Any guidance or suggestions on resolving this issue would be greatly > appreciated. > > > > Thanks, > > Prasanna >