Thanks Feng. Above code worked well and served its purpose.
I just modified it to use *NonSplittingRecursiveAllDirEnumerator* instead of *NonSplittingRecursiveEnumerato**r* as the regex filter was to be applied on the individual filenames, excluding the parent directory specified as the path. FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("file:///tmp/test/")) .setFileEnumerator(() -> new NonSplittingRecursiveAllDirEnumerator(".+\\.csv")) .monitorContinuously(Duration.ofSeconds(30)) .build(); Regards, Amogh. On Sun, Aug 18, 2024 at 2:18 PM Feng Jin <jinfeng1...@gmail.com> wrote: > Hi Amogh > > You can test the code below: > > ```java > FileSource.forRecordStreamFormat(csvFormat, new Path("file:///tmp/test")) > .setFileEnumerator(() -> new NonSplittingRecursiveEnumerator(path -> > path.getName().endsWith(".csv"))) > .build(); > ``` > > Best, > Feng > > > > On Sat, Aug 17, 2024 at 4:41 AM amogh joshi <amoghjo...@gmail.com> wrote: > >> Hi Users, >> >> Any clues on configurable regex path for FilesSource/Filesystem connector >> for stream APIs is appreciated. >> >> Regards, >> Amogh. >> >> >> On Thu, 15 Aug, 2024, 11:18 amogh joshi, <amoghjo...@gmail.com> wrote: >> >>> Hi, >>> >>> I am building a pretty straightforward processing pipeline as described >>> below, using *DataStream* *APIs* and *FileSystem connector*. >>> >>> *filesystem-source -> transforms -> database-sink* >>> >>> Everything worked well till the filesystem (source) had just a single >>> type (JSON) of files. Recently the filesystem got modified to store other >>> types of files too (like parquet, text, etc.). >>> >>> It appeared just a matter of specifying some regex to the FileSource or >>> SplitEnumerator and creating different sources for different types of >>> files. >>> >>> However, the two public methods *FileSource::forRecordStreamFormat *and >>> *FileSource::forBulkFileFormat* do not allow configuring the regex >>> based file paths. >>> >>> What could be the recommended way to configure a custom regex for a >>> FileSource? >>> >>> I am using version 1.19.1. >>> >>> Regards, >>> Amogh. >>> >>