Thanks, I'll check it out. On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <ches...@apache.org> wrote:
> 1) There's no mechanism in the API to restrict the number of number of > readers across several sources. I can't quite think of a way to achieve > this; maybe Kostas has an idea. > > 2) You're mixing up the Java Streams and Finks DataStream API. > > Try this: > > s3PathList.stream() > .map(...) > .reduce(...) > .map(joinedStream -> stream.map(new FlatMapFunction...)) > .map(joinedStream-> joinedStream.addSink...) > > On 10/12/2020 6:05 AM, Satyaa Dixit wrote: > > Hi Team, > > Could you please help me here. I’m sorry for asking on such short notice > but my work has stopped due to this. > > > Regards, > Satya > > On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <satyaadi...@gmail.com> wrote: > >> Hi Shesnay/Team, >> >> Thank you so much for the reply.In the continuation of the previous >> email, below is the block diagram where I am reading the file from s3 and >> pushing it to kafka.Now with the current setup, I have total 4 directory >> based on the readfile method from flink environment ,we are creating 4 >> readers parallely to process the data from s3 . >> >> Below are my Questions: >> 1. Can we restrict the no. of readers to process the data parallely. >> e.g let's say if we have a thousand of directory , in that case i want to >> restrict the no. of readers to 10 and ten parallel threads will continue >> with 100 sequential reading of the directory per thread to consume the data >> . >> >> 2.In between the two flink operators i.e s3 reader and kafka sink , i >> just want to implement one more operator in order to transform the data >> which i am reading from s3 bucket and then want to push into the kafka >> sink. Below is my working code.Here i am finding difficulties to >> implement map operator in order to transform the union of datastreams by >> applying union method over each directory's reader before pushing to kafka. >> >> List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters); >> >> s3PathList.stream() >> .map(directory -> S3Service.customInputStream(environment, directory, >> readerParallelism)) >> .reduce(DataStream::union) >> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to >> " + kafkaTopicName)); >> >> >> *Something like this I'm trying to do in order to achieve the above use >> case by applying FlatMap, it could be map as well:* >> s3PathList.stream() >> .map(directory -> S3Service.customInputStream(environment, directory, >> readerParallelism)) >> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream, >> String>() { >> @Override >> public void flatMap(String value, Collector<String> out) throws >> Exception { >> FinalJsonMessage m=objectMapper.readValue(value, >> FinalJsonMessage.class); >> System.out.println("Json string:: ------"+m); >> //transformation logic >> out.collect(value); >> } >> }) >> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to >> " + kafkaTopicName)); >> [image: image.png] >> Request your support on the same. >> Regards, >> Satya >> >> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <satyaadi...@gmail.com> >> wrote: >> >>> Hi @ches...@apache.org <ches...@apache.org> , >>> >>> Thanks for your support, it was really helpful. >>> Do you know the list of directories when you submit the job? [Yes we do >>> have] >>> The impletemation is progress and will get back to you if any further >>> challenges we may face. >>> Appreciate your support in this regard. >>> >>> Regards, >>> Satya >>> >>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <satyaadi...@gmail.com> >>> wrote: >>> >>>> Thank you @Chesnay let me try this change . >>>> >>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org> >>>> wrote: >>>> >>>>> You could also try using streams to make it a little more concise: >>>>> >>>>> directories.stream() >>>>> .map(directory -> createInputStream(environment, directory)) >>>>> .reduce(DataStream::union) >>>>> .map(joinedStream -> joinedStream.addSink(kafka)); >>>>> >>>>> >>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote: >>>>> >>>>> Do you know the list of directories when you submit the job? >>>>> >>>>> If so, then you can iterate over them, create a source for each >>>>> directory, union them, and apply the sink to the union. >>>>> >>>>> private static >>>>> DataStream<String>createInputStream(StreamExecutionEnvironment >>>>> environment, >>>>> String directory) { >>>>> TextInputFormat format =new TextInputFormat(new >>>>> org.apache.flink.core.fs.Path(directory)); >>>>> format.setNestedFileEnumeration(true); return environment.readFile(format, >>>>> directory, FileProcessingMode.PROCESS_ONCE, -1, >>>>> FilePathFilter.createDefaultFilter()); } >>>>> >>>>> public static void runJob()throws Exception { >>>>> StreamExecutionEnvironment environment = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String> >>>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for >>>>> (String directory : directories) { >>>>> DataStream<String> inputStream =createInputStream(environment, >>>>> directory); if (joinedStreams ==null) { >>>>> joinedStreams = inputStream; }else { >>>>> joinedStreams.union(inputStream); } >>>>> } >>>>> // add a sanity check that there was at least 1 directory >>>>> >>>>> joinedStreams.addSink(kafka); } >>>>> >>>>> >>>>> >>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote: >>>>> >>>>> Hi Guys, >>>>> >>>>> Got stuck with it please help me here >>>>> Regards, >>>>> Satya >>>>> >>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com> >>>>> <satyaadi...@gmail.com> wrote: >>>>> >>>>> Hi Guys, >>>>> >>>>> Sorry to bother you again, but someone could help me here? Any help in >>>>> this regard will be much appreciated. >>>>> >>>>> Regards, >>>>> Satya >>>>> >>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com> >>>>> <satyaadi...@gmail.com> >>>>> wrote: >>>>> >>>>> Hi Guys, >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> I need one help, any leads will be highly appreciated.I have written a >>>>> flink streaming job to read the data from s3 bucket and push to kafka. >>>>> Below is the working source that deal with single s3 path: >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> TextInputFormat format = new TextInputFormat(new >>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/")); >>>>> >>>>> >>>>> >>>>> >>>>> format.setNestedFileEnumeration(true); >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> DataStream<String> inputStream = environment.readFile(format, >>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1, >>>>> >>>>> >>>>> >>>>> >>>>> FilePathFilter.createDefaultFilter()); >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> inputStream.addSink(kafka); >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> But my requirement is get the list of paths and pass them one by one >>>>> to >>>>> this environment.readFile() method.How we can achieve this. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> >>>>> >>>>> >>>>> Satya >>>>> >>>>> >>>>> >>>>> -- >>>>> -------------------------- >>>>> Best Regards >>>>> Satya Prakash >>>>> (M)+91-9845111913 >>>>> >>>>> >>>>> -- >>>>> >>>>> -------------------------- >>>>> Best Regards >>>>> Satya Prakash >>>>> (M)+91-9845111913 >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>>> -- >>>> -------------------------- >>>> Best Regards >>>> Satya Prakash >>>> (M)+91-9845111913 >>>> >>> >>> >>> -- >>> -------------------------- >>> Best Regards >>> Satya Prakash >>> (M)+91-9845111913 >>> >> >> >> -- >> -------------------------- >> Best Regards >> Satya Prakash >> (M)+91-9845111913 >> > -- > -------------------------- > Best Regards > Satya Prakash > (M)+91-9845111913 > > > -- -------------------------- Best Regards Satya Prakash (M)+91-9845111913