Hi Guys,
I need one help, any leads will be highly appreciated.I have written a
flink streaming job to read the data from s3 bucket and push to kafka.
Below is the working source that deal with single s3 path:
TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
format.setNestedFileEnumeration(true);
DataStream<String> inputStream = environment.readFile(format,
"s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
FilePathFilter.createDefaultFilter());
inputStream.addSink(kafka);

But my requirement is get the list of paths and pass them one by one to
this environment.readFile() method.How we can achieve this.

Thanks,
Satya

Reply via email to