Hi Team, Could you please help me here. I’m sorry for asking on such short notice but my work has stopped due to this.
Regards, Satya On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <satyaadi...@gmail.com> wrote: > Hi Shesnay/Team, > > Thank you so much for the reply.In the continuation of the previous email, > below is the block diagram where I am reading the file from s3 and pushing > it to kafka.Now with the current setup, I have total 4 directory based on > the readfile method from flink environment ,we are creating 4 readers > parallely to process the data from s3 . > > Below are my Questions: > 1. Can we restrict the no. of readers to process the data parallely. e.g > let's say if we have a thousand of directory , in that case i want to > restrict the no. of readers to 10 and ten parallel threads will continue > with 100 sequential reading of the directory per thread to consume the data > . > > 2.In between the two flink operators i.e s3 reader and kafka sink , i just > want to implement one more operator in order to transform the data which i > am reading from s3 bucket and then want to push into the kafka sink. Below > is my working code.Here i am finding difficulties to implement map > operator in order to transform the union of datastreams by applying union > method over each directory's reader before pushing to kafka. > > List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters); > > s3PathList.stream() > .map(directory -> S3Service.customInputStream(environment, directory, > readerParallelism)) > .reduce(DataStream::union) > .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to > " + kafkaTopicName)); > > > *Something like this I'm trying to do in order to achieve the above use > case by applying FlatMap, it could be map as well:* > s3PathList.stream() > .map(directory -> S3Service.customInputStream(environment, directory, > readerParallelism)) > .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream, > String>() { > @Override > public void flatMap(String value, Collector<String> out) throws > Exception { > FinalJsonMessage m=objectMapper.readValue(value, > FinalJsonMessage.class); > System.out.println("Json string:: ------"+m); > //transformation logic > out.collect(value); > } > }) > .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to > " + kafkaTopicName)); > [image: image.png] > Request your support on the same. > Regards, > Satya > > On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <satyaadi...@gmail.com> > wrote: > >> Hi @ches...@apache.org <ches...@apache.org> , >> >> Thanks for your support, it was really helpful. >> Do you know the list of directories when you submit the job? [Yes we do >> have] >> The impletemation is progress and will get back to you if any further >> challenges we may face. >> Appreciate your support in this regard. >> >> Regards, >> Satya >> >> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <satyaadi...@gmail.com> >> wrote: >> >>> Thank you @Chesnay let me try this change . >>> >>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>>> You could also try using streams to make it a little more concise: >>>> >>>> directories.stream() >>>> .map(directory -> createInputStream(environment, directory)) >>>> .reduce(DataStream::union) >>>> .map(joinedStream -> joinedStream.addSink(kafka)); >>>> >>>> >>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote: >>>> >>>> Do you know the list of directories when you submit the job? >>>> >>>> If so, then you can iterate over them, create a source for each >>>> directory, union them, and apply the sink to the union. >>>> >>>> private static >>>> DataStream<String>createInputStream(StreamExecutionEnvironment environment, >>>> String directory) { >>>> TextInputFormat format =new TextInputFormat(new >>>> org.apache.flink.core.fs.Path(directory)); >>>> format.setNestedFileEnumeration(true); return environment.readFile(format, >>>> directory, FileProcessingMode.PROCESS_ONCE, -1, >>>> FilePathFilter.createDefaultFilter()); } >>>> >>>> public static void runJob()throws Exception { >>>> StreamExecutionEnvironment environment = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String> >>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for >>>> (String directory : directories) { >>>> DataStream<String> inputStream =createInputStream(environment, >>>> directory); if (joinedStreams ==null) { >>>> joinedStreams = inputStream; }else { >>>> joinedStreams.union(inputStream); } >>>> } >>>> // add a sanity check that there was at least 1 directory >>>> >>>> joinedStreams.addSink(kafka); } >>>> >>>> >>>> >>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote: >>>> >>>> Hi Guys, >>>> >>>> Got stuck with it please help me here >>>> Regards, >>>> Satya >>>> >>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com> >>>> <satyaadi...@gmail.com> wrote: >>>> >>>> Hi Guys, >>>> >>>> Sorry to bother you again, but someone could help me here? Any help in >>>> this regard will be much appreciated. >>>> >>>> Regards, >>>> Satya >>>> >>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com> >>>> <satyaadi...@gmail.com> >>>> wrote: >>>> >>>> Hi Guys, >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> I need one help, any leads will be highly appreciated.I have written a >>>> flink streaming job to read the data from s3 bucket and push to kafka. >>>> Below is the working source that deal with single s3 path: >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> TextInputFormat format = new TextInputFormat(new >>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/")); >>>> >>>> >>>> >>>> >>>> format.setNestedFileEnumeration(true); >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> DataStream<String> inputStream = environment.readFile(format, >>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1, >>>> >>>> >>>> >>>> >>>> FilePathFilter.createDefaultFilter()); >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> inputStream.addSink(kafka); >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> But my requirement is get the list of paths and pass them one by one to >>>> this environment.readFile() method.How we can achieve this. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> Thanks, >>>> >>>> >>>> >>>> >>>> Satya >>>> >>>> >>>> >>>> -- >>>> -------------------------- >>>> Best Regards >>>> Satya Prakash >>>> (M)+91-9845111913 >>>> >>>> >>>> -- >>>> >>>> -------------------------- >>>> Best Regards >>>> Satya Prakash >>>> (M)+91-9845111913 >>>> >>>> >>>> >>>> >>>> >>> >>> -- >>> -------------------------- >>> Best Regards >>> Satya Prakash >>> (M)+91-9845111913 >>> >> >> >> -- >> -------------------------- >> Best Regards >> Satya Prakash >> (M)+91-9845111913 >> > > > -- > -------------------------- > Best Regards > Satya Prakash > (M)+91-9845111913 > -- -------------------------- Best Regards Satya Prakash (M)+91-9845111913