Hi Shesnay/Team, Thank you so much for the reply.In the continuation of the previous email, below is the block diagram where I am reading the file from s3 and pushing it to kafka.Now with the current setup, I have total 4 directory based on the readfile method from flink environment ,we are creating 4 readers parallely to process the data from s3 .
Below are my Questions: 1. Can we restrict the no. of readers to process the data parallely. e.g let's say if we have a thousand of directory , in that case i want to restrict the no. of readers to 10 and ten parallel threads will continue with 100 sequential reading of the directory per thread to consume the data . 2.In between the two flink operators i.e s3 reader and kafka sink , i just want to implement one more operator in order to transform the data which i am reading from s3 bucket and then want to push into the kafka sink. Below is my working code.Here i am finding difficulties to implement map operator in order to transform the union of datastreams by applying union method over each directory's reader before pushing to kafka. List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters); s3PathList.stream() .map(directory -> S3Service.customInputStream(environment, directory, readerParallelism)) .reduce(DataStream::union) .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName)); *Something like this I'm trying to do in order to achieve the above use case by applying FlatMap, it could be map as well:* s3PathList.stream() .map(directory -> S3Service.customInputStream(environment, directory, readerParallelism)) .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { FinalJsonMessage m=objectMapper.readValue(value, FinalJsonMessage.class); System.out.println("Json string:: ------"+m); //transformation logic out.collect(value); } }) .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName)); [image: image.png] Request your support on the same. Regards, Satya On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <satyaadi...@gmail.com> wrote: > Hi @ches...@apache.org <ches...@apache.org> , > > Thanks for your support, it was really helpful. > Do you know the list of directories when you submit the job? [Yes we do > have] > The impletemation is progress and will get back to you if any further > challenges we may face. > Appreciate your support in this regard. > > Regards, > Satya > > On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <satyaadi...@gmail.com> wrote: > >> Thank you @Chesnay let me try this change . >> >> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> You could also try using streams to make it a little more concise: >>> >>> directories.stream() >>> .map(directory -> createInputStream(environment, directory)) >>> .reduce(DataStream::union) >>> .map(joinedStream -> joinedStream.addSink(kafka)); >>> >>> >>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote: >>> >>> Do you know the list of directories when you submit the job? >>> >>> If so, then you can iterate over them, create a source for each >>> directory, union them, and apply the sink to the union. >>> >>> private static >>> DataStream<String>createInputStream(StreamExecutionEnvironment environment, >>> String directory) { >>> TextInputFormat format =new TextInputFormat(new >>> org.apache.flink.core.fs.Path(directory)); >>> format.setNestedFileEnumeration(true); return environment.readFile(format, >>> directory, FileProcessingMode.PROCESS_ONCE, -1, >>> FilePathFilter.createDefaultFilter()); } >>> >>> public static void runJob()throws Exception { >>> StreamExecutionEnvironment environment = >>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String> >>> directories =getDirectories(); DataStream<String> joinedStreams =null; for >>> (String directory : directories) { >>> DataStream<String> inputStream =createInputStream(environment, >>> directory); if (joinedStreams ==null) { >>> joinedStreams = inputStream; }else { >>> joinedStreams.union(inputStream); } >>> } >>> // add a sanity check that there was at least 1 directory >>> >>> joinedStreams.addSink(kafka); } >>> >>> >>> >>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote: >>> >>> Hi Guys, >>> >>> Got stuck with it please help me here >>> Regards, >>> Satya >>> >>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com> >>> <satyaadi...@gmail.com> wrote: >>> >>> Hi Guys, >>> >>> Sorry to bother you again, but someone could help me here? Any help in >>> this regard will be much appreciated. >>> >>> Regards, >>> Satya >>> >>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com> >>> <satyaadi...@gmail.com> >>> wrote: >>> >>> Hi Guys, >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> I need one help, any leads will be highly appreciated.I have written a >>> flink streaming job to read the data from s3 bucket and push to kafka. >>> Below is the working source that deal with single s3 path: >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> TextInputFormat format = new TextInputFormat(new >>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/")); >>> >>> >>> >>> >>> format.setNestedFileEnumeration(true); >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> DataStream<String> inputStream = environment.readFile(format, >>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1, >>> >>> >>> >>> >>> FilePathFilter.createDefaultFilter()); >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> inputStream.addSink(kafka); >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> But my requirement is get the list of paths and pass them one by one to >>> this environment.readFile() method.How we can achieve this. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> Thanks, >>> >>> >>> >>> >>> Satya >>> >>> >>> >>> -- >>> -------------------------- >>> Best Regards >>> Satya Prakash >>> (M)+91-9845111913 >>> >>> >>> -- >>> >>> -------------------------- >>> Best Regards >>> Satya Prakash >>> (M)+91-9845111913 >>> >>> >>> >>> >>> >> >> -- >> -------------------------- >> Best Regards >> Satya Prakash >> (M)+91-9845111913 >> > > > -- > -------------------------- > Best Regards > Satya Prakash > (M)+91-9845111913 > -- -------------------------- Best Regards Satya Prakash (M)+91-9845111913