Hi Team,

Could you please help me here. I’m sorry for asking on such short notice
but my work has stopped due to this.


Regards,
Satya

On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <satyaadi...@gmail.com> wrote:

> Hi  Shesnay/Team,
>
> Thank you so much for the reply.In the continuation of the previous email,
> below is the block diagram where I am reading the file  from s3 and pushing
> it to kafka.Now with the current setup, I have total 4 directory based on
> the readfile method  from flink environment ,we are  creating 4 readers
> parallely to process the data from s3 .
>
> Below are my Questions:
> 1. Can we restrict the no. of readers to process the  data parallely.  e.g
> let's say if  we have a thousand of directory , in that case i want to
> restrict the no. of readers to 10 and ten parallel threads will continue
> with 100 sequential reading of the directory per thread to consume the data
> .
>
> 2.In between the two flink operators i.e s3 reader and kafka sink , i just
> want to implement one more operator in order to transform the data which i
> am reading from s3 bucket and then want to push into the kafka sink. Below
> is my working code.Here i am finding  difficulties to implement  map
> operator in order to transform the union of datastreams  by applying union
> method over each directory's reader before pushing to kafka.
>
> List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);
>
> s3PathList.stream()
> .map(directory -> S3Service.customInputStream(environment, directory,
> readerParallelism))
> .reduce(DataStream::union)
> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to
> " + kafkaTopicName));
>
>
> *Something like this I'm trying to do in order to achieve the above use
> case by applying FlatMap, it could be map as well:*
> s3PathList.stream()
> .map(directory -> S3Service.customInputStream(environment, directory,
> readerParallelism))
> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
> String>() {
>    @Override
>    public void flatMap(String value, Collector<String> out) throws
> Exception {
>     FinalJsonMessage m=objectMapper.readValue(value,
> FinalJsonMessage.class);
>     System.out.println("Json string:: ------"+m);
>      //transformation logic
>      out.collect(value);
>    }
>  })
> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to
> " + kafkaTopicName));
> [image: image.png]
> Request your support on the same.
> Regards,
> Satya
>
> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <satyaadi...@gmail.com>
> wrote:
>
>> Hi @ches...@apache.org <ches...@apache.org> ,
>>
>> Thanks for your support, it was really helpful.
>> Do you know the list of directories when you submit the job? [Yes we do
>> have]
>> The impletemation is progress and will get back to you if any further
>> challenges we may face.
>> Appreciate your support in this regard.
>>
>> Regards,
>> Satya
>>
>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <satyaadi...@gmail.com>
>> wrote:
>>
>>> Thank you @Chesnay let me try this change .
>>>
>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org>
>>> wrote:
>>>
>>>> You could also try using streams to make it a little more concise:
>>>>
>>>> directories.stream()
>>>>    .map(directory -> createInputStream(environment, directory))
>>>>    .reduce(DataStream::union)
>>>>    .map(joinedStream -> joinedStream.addSink(kafka));
>>>>
>>>>
>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>>
>>>> Do you know the list of directories when you submit the job?
>>>>
>>>> If so, then you can iterate over them, create a source for each
>>>> directory, union them, and apply the sink to the union.
>>>>
>>>> private static
>>>> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
>>>> String directory) {
>>>>    TextInputFormat format =new TextInputFormat(new
>>>> org.apache.flink.core.fs.Path(directory));
>>>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>>> FilePathFilter.createDefaultFilter()); }
>>>>
>>>> public static void runJob()throws Exception {
>>>>    StreamExecutionEnvironment environment =
>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>>>> (String directory : directories) {
>>>>       DataStream<String> inputStream =createInputStream(environment,
>>>> directory); if (joinedStreams ==null) {
>>>>          joinedStreams = inputStream; }else {
>>>>          joinedStreams.union(inputStream); }
>>>>    }
>>>>    // add a sanity check that there was at least 1 directory
>>>>
>>>>    joinedStreams.addSink(kafka); }
>>>>
>>>>
>>>>
>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>>
>>>> Hi Guys,
>>>>
>>>> Got stuck with it please help me here
>>>> Regards,
>>>> Satya
>>>>
>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com>
>>>> <satyaadi...@gmail.com> wrote:
>>>>
>>>> Hi Guys,
>>>>
>>>> Sorry to bother you again, but someone could help me here? Any help in
>>>> this regard will be much appreciated.
>>>>
>>>> Regards,
>>>> Satya
>>>>
>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com>
>>>> <satyaadi...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Guys,
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> I need one help, any leads will be highly appreciated.I have written a
>>>> flink streaming job to read the data from s3 bucket and push to kafka.
>>>> Below is the working source that deal with single s3 path:
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> TextInputFormat format = new TextInputFormat(new
>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>
>>>>
>>>>
>>>>
>>>> format.setNestedFileEnumeration(true);
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> DataStream<String> inputStream = environment.readFile(format,
>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>>
>>>>
>>>>
>>>>
>>>> FilePathFilter.createDefaultFilter());
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> inputStream.addSink(kafka);
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> But my requirement is get the list of paths and pass them one by one to
>>>> this environment.readFile() method.How we can achieve this.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>>
>>>>
>>>>
>>>> Satya
>>>>
>>>>
>>>>
>>>> --
>>>> --------------------------
>>>> Best Regards
>>>> Satya Prakash
>>>> (M)+91-9845111913
>>>>
>>>>
>>>> --
>>>>
>>>> --------------------------
>>>> Best Regards
>>>> Satya Prakash
>>>> (M)+91-9845111913
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
-- 
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913

Reply via email to