Thanks, I'll check it out.

On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <ches...@apache.org> wrote:

> 1) There's no mechanism in the API to restrict the number of  number of
> readers across several sources. I can't quite think of a way to achieve
> this; maybe Kostas has an idea.
>
> 2) You're mixing  up the Java Streams and Finks DataStream API.
>
> Try this:
>
> s3PathList.stream()
> .map(...)
> .reduce(...)
> .map(joinedStream -> stream.map(new FlatMapFunction...))
> .map(joinedStream->  joinedStream.addSink...)
>
> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>
> Hi Team,
>
> Could you please help me here. I’m sorry for asking on such short notice
> but my work has stopped due to this.
>
>
> Regards,
> Satya
>
> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <satyaadi...@gmail.com> wrote:
>
>> Hi  Shesnay/Team,
>>
>> Thank you so much for the reply.In the continuation of the previous
>> email, below is the block diagram where I am reading the file  from s3 and
>> pushing it to kafka.Now with the current setup, I have total 4 directory
>> based on the readfile method  from flink environment ,we are  creating 4
>> readers parallely to process the data from s3 .
>>
>> Below are my Questions:
>> 1. Can we restrict the no. of readers to process the  data parallely.
>> e.g let's say if  we have a thousand of directory , in that case i want to
>> restrict the no. of readers to 10 and ten parallel threads will continue
>> with 100 sequential reading of the directory per thread to consume the data
>> .
>>
>> 2.In between the two flink operators i.e s3 reader and kafka sink , i
>> just want to implement one more operator in order to transform the data
>> which i am reading from s3 bucket and then want to push into the kafka
>> sink. Below is my working code.Here i am finding  difficulties to
>> implement  map operator in order to transform the union of datastreams  by
>> applying union method over each directory's reader before pushing to kafka.
>>
>> List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);
>>
>> s3PathList.stream()
>> .map(directory -> S3Service.customInputStream(environment, directory,
>> readerParallelism))
>> .reduce(DataStream::union)
>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to
>> " + kafkaTopicName));
>>
>>
>> *Something like this I'm trying to do in order to achieve the above use
>> case by applying FlatMap, it could be map as well:*
>> s3PathList.stream()
>> .map(directory -> S3Service.customInputStream(environment, directory,
>> readerParallelism))
>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>> String>() {
>>    @Override
>>    public void flatMap(String value, Collector<String> out) throws
>> Exception {
>>     FinalJsonMessage m=objectMapper.readValue(value,
>> FinalJsonMessage.class);
>>     System.out.println("Json string:: ------"+m);
>>      //transformation logic
>>      out.collect(value);
>>    }
>>  })
>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to
>> " + kafkaTopicName));
>> [image: image.png]
>> Request your support on the same.
>> Regards,
>> Satya
>>
>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <satyaadi...@gmail.com>
>> wrote:
>>
>>> Hi @ches...@apache.org <ches...@apache.org> ,
>>>
>>> Thanks for your support, it was really helpful.
>>> Do you know the list of directories when you submit the job? [Yes we do
>>> have]
>>> The impletemation is progress and will get back to you if any further
>>> challenges we may face.
>>> Appreciate your support in this regard.
>>>
>>> Regards,
>>> Satya
>>>
>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <satyaadi...@gmail.com>
>>> wrote:
>>>
>>>> Thank you @Chesnay let me try this change .
>>>>
>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org>
>>>> wrote:
>>>>
>>>>> You could also try using streams to make it a little more concise:
>>>>>
>>>>> directories.stream()
>>>>>    .map(directory -> createInputStream(environment, directory))
>>>>>    .reduce(DataStream::union)
>>>>>    .map(joinedStream -> joinedStream.addSink(kafka));
>>>>>
>>>>>
>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Do you know the list of directories when you submit the job?
>>>>>
>>>>> If so, then you can iterate over them, create a source for each
>>>>> directory, union them, and apply the sink to the union.
>>>>>
>>>>> private static
>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment 
>>>>> environment,
>>>>> String directory) {
>>>>>    TextInputFormat format =new TextInputFormat(new
>>>>> org.apache.flink.core.fs.Path(directory));
>>>>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>>>> FilePathFilter.createDefaultFilter()); }
>>>>>
>>>>> public static void runJob()throws Exception {
>>>>>    StreamExecutionEnvironment environment =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>>>>> (String directory : directories) {
>>>>>       DataStream<String> inputStream =createInputStream(environment,
>>>>> directory); if (joinedStreams ==null) {
>>>>>          joinedStreams = inputStream; }else {
>>>>>          joinedStreams.union(inputStream); }
>>>>>    }
>>>>>    // add a sanity check that there was at least 1 directory
>>>>>
>>>>>    joinedStreams.addSink(kafka); }
>>>>>
>>>>>
>>>>>
>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>>>
>>>>> Hi Guys,
>>>>>
>>>>> Got stuck with it please help me here
>>>>> Regards,
>>>>> Satya
>>>>>
>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com>
>>>>> <satyaadi...@gmail.com> wrote:
>>>>>
>>>>> Hi Guys,
>>>>>
>>>>> Sorry to bother you again, but someone could help me here? Any help in
>>>>> this regard will be much appreciated.
>>>>>
>>>>> Regards,
>>>>> Satya
>>>>>
>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com>
>>>>> <satyaadi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi Guys,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I need one help, any leads will be highly appreciated.I have written a
>>>>> flink streaming job to read the data from s3 bucket and push to kafka.
>>>>> Below is the working source that deal with single s3 path:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> TextInputFormat format = new TextInputFormat(new
>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> format.setNestedFileEnumeration(true);
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> DataStream<String> inputStream = environment.readFile(format,
>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> FilePathFilter.createDefaultFilter());
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> inputStream.addSink(kafka);
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> But my requirement is get the list of paths and pass them one by one
>>>>> to
>>>>> this environment.readFile() method.How we can achieve this.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Satya
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> --------------------------
>>>>> Best Regards
>>>>> Satya Prakash
>>>>> (M)+91-9845111913
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> --------------------------
>>>>> Best Regards
>>>>> Satya Prakash
>>>>> (M)+91-9845111913
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> --------------------------
>>>> Best Regards
>>>> Satya Prakash
>>>> (M)+91-9845111913
>>>>
>>>
>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
>
>

-- 
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913

Reply via email to