Hi  Shesnay/Team,

Thank you so much for the reply.In the continuation of the previous email,
below is the block diagram where I am reading the file  from s3 and pushing
it to kafka.Now with the current setup, I have total 4 directory based on
the readfile method  from flink environment ,we are  creating 4 readers
parallely to process the data from s3 .

Below are my Questions:
1. Can we restrict the no. of readers to process the  data parallely.  e.g
let's say if  we have a thousand of directory , in that case i want to
restrict the no. of readers to 10 and ten parallel threads will continue
with 100 sequential reading of the directory per thread to consume the data
.

2.In between the two flink operators i.e s3 reader and kafka sink , i just
want to implement one more operator in order to transform the data which i
am reading from s3 bucket and then want to push into the kafka sink. Below
is my working code.Here i am finding  difficulties to implement  map
operator in order to transform the union of datastreams  by applying union
method over each directory's reader before pushing to kafka.

List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);

s3PathList.stream()
.map(directory -> S3Service.customInputStream(environment, directory,
readerParallelism))
.reduce(DataStream::union)
.map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to "
+ kafkaTopicName));


*Something like this I'm trying to do in order to achieve the above use
case by applying FlatMap, it could be map as well:*
s3PathList.stream()
.map(directory -> S3Service.customInputStream(environment, directory,
readerParallelism))
.reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
String>() {
   @Override
   public void flatMap(String value, Collector<String> out) throws
Exception {
    FinalJsonMessage m=objectMapper.readValue(value,
FinalJsonMessage.class);
    System.out.println("Json string:: ------"+m);
     //transformation logic
     out.collect(value);
   }
 })
.map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to "
+ kafkaTopicName));
[image: image.png]
Request your support on the same.
Regards,
Satya

On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <satyaadi...@gmail.com> wrote:

> Hi @ches...@apache.org <ches...@apache.org> ,
>
> Thanks for your support, it was really helpful.
> Do you know the list of directories when you submit the job? [Yes we do
> have]
> The impletemation is progress and will get back to you if any further
> challenges we may face.
> Appreciate your support in this regard.
>
> Regards,
> Satya
>
> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <satyaadi...@gmail.com> wrote:
>
>> Thank you @Chesnay let me try this change .
>>
>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> You could also try using streams to make it a little more concise:
>>>
>>> directories.stream()
>>>    .map(directory -> createInputStream(environment, directory))
>>>    .reduce(DataStream::union)
>>>    .map(joinedStream -> joinedStream.addSink(kafka));
>>>
>>>
>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>
>>> Do you know the list of directories when you submit the job?
>>>
>>> If so, then you can iterate over them, create a source for each
>>> directory, union them, and apply the sink to the union.
>>>
>>> private static
>>> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
>>> String directory) {
>>>    TextInputFormat format =new TextInputFormat(new
>>> org.apache.flink.core.fs.Path(directory));
>>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>> FilePathFilter.createDefaultFilter()); }
>>>
>>> public static void runJob()throws Exception {
>>>    StreamExecutionEnvironment environment =
>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>>> (String directory : directories) {
>>>       DataStream<String> inputStream =createInputStream(environment,
>>> directory); if (joinedStreams ==null) {
>>>          joinedStreams = inputStream; }else {
>>>          joinedStreams.union(inputStream); }
>>>    }
>>>    // add a sanity check that there was at least 1 directory
>>>
>>>    joinedStreams.addSink(kafka); }
>>>
>>>
>>>
>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>
>>> Hi Guys,
>>>
>>> Got stuck with it please help me here
>>> Regards,
>>> Satya
>>>
>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com>
>>> <satyaadi...@gmail.com> wrote:
>>>
>>> Hi Guys,
>>>
>>> Sorry to bother you again, but someone could help me here? Any help in
>>> this regard will be much appreciated.
>>>
>>> Regards,
>>> Satya
>>>
>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com>
>>> <satyaadi...@gmail.com>
>>> wrote:
>>>
>>> Hi Guys,
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> I need one help, any leads will be highly appreciated.I have written a
>>> flink streaming job to read the data from s3 bucket and push to kafka.
>>> Below is the working source that deal with single s3 path:
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> TextInputFormat format = new TextInputFormat(new
>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>
>>>
>>>
>>>
>>> format.setNestedFileEnumeration(true);
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> DataStream<String> inputStream = environment.readFile(format,
>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>
>>>
>>>
>>>
>>> FilePathFilter.createDefaultFilter());
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> inputStream.addSink(kafka);
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> But my requirement is get the list of paths and pass them one by one to
>>> this environment.readFile() method.How we can achieve this.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>> Satya
>>>
>>>
>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>>
>>> --
>>>
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>


-- 
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913

Reply via email to