You could also try using streams to make it a little more concise:
directories.stream()
.map(directory ->createInputStream(environment, directory))
.reduce(DataStream::union)
.map(joinedStream -> joinedStream.addSink(kafka));
On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
Do you know the list of directories when you submit the job?
If so, then you can iterate over them, create a source for each
directory, union them, and apply the sink to the union.
private static
DataStream<String>createInputStream(StreamExecutionEnvironment
environment, String directory) {
TextInputFormat format =new TextInputFormat(new
org.apache.flink.core.fs.Path(directory));
format.setNestedFileEnumeration(true); return
environment.readFile(format, directory,
FileProcessingMode.PROCESS_ONCE, -1,
FilePathFilter.createDefaultFilter()); }
public static void runJob()throws Exception {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
directories =getDirectories(); DataStream<String> joinedStreams =null;
for (String directory : directories) {
DataStream<String> inputStream =createInputStream(environment,
directory); if (joinedStreams ==null) {
joinedStreams = inputStream; }else {
joinedStreams.union(inputStream); }
}
// add a sanity check that there was at least 1 directory
joinedStreams.addSink(kafka); }
On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
Hi Guys,
Got stuck with it please help me here
Regards,
Satya
On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com>
wrote:
Hi Guys,
Sorry to bother you again, but someone could help me here? Any help in
this regard will be much appreciated.
Regards,
Satya
On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com>
wrote:
Hi Guys,
I need one help, any leads will be highly appreciated.I have written a
flink streaming job to read the data from s3 bucket and push to kafka.
Below is the working source that deal with single s3 path:
TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
format.setNestedFileEnumeration(true);
DataStream<String> inputStream = environment.readFile(format,
"s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
FilePathFilter.createDefaultFilter());
inputStream.addSink(kafka);
But my requirement is get the list of paths and pass them one by
one to
this environment.readFile() method.How we can achieve this.
Thanks,
Satya
--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913