Hi Chesnay/Team,

Thanks, we got the fix for our problem but got stuck with the below issue,
request your support.


How to catch FileNotFoundException during runtime,if any directory is
missing in s3 as part of the below source code to avoid job failure.


s3PathList.stream().map(directory ->
S3Service.createInputStream(environment, directory, readerParallelism))
.reduce(DataStream::union).map(joinedStream ->
joinedStream.addSink(kafkaProducer));





Regards,

Satya

On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <satyaadi...@gmail.com> wrote:

> Hi Chesnay/Team
>
> Thank you so much.I have tried with the solution but it is not working as
> expected showing compilation issues and tried all the ways .Please find
> below code snippet :
>
> s3PathList.stream()
> .map(directory -> S3Service.customCreateInputStream(environment,
> directory, readerParallelism))
> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
> IntermidiateOperator()).map(joinedStream ->
> joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName));
>
> public static class IntermidiateOperator implements
> FlatMapFunction<String, String> {
> private static final ObjectMapper objectMapper1 = new ObjectMapper();
>
> @Override
> public void flatMap(String value, Collector<String> out) throws Exception {
> Test m = objectMapper1.readValue(value, Test.class);
> System.out.println("Json string:: ------" + m);
> // logger.info("Json string:: ------"+m);
> out.collect(value);
> }
> }
>
> Also just to clarify one doubt , How to handle *FileNotFoundException* as
> part of flink reader during runtime if in case directory is not available
> in s3. How to avoid job failure in that use case.
>
> Regards,
> Satya
>
> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <satyaadi...@gmail.com>
> wrote:
>
>> Thanks, I'll check it out.
>>
>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> 1) There's no mechanism in the API to restrict the number of  number of
>>> readers across several sources. I can't quite think of a way to achieve
>>> this; maybe Kostas has an idea.
>>>
>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
>>>
>>> Try this:
>>>
>>> s3PathList.stream()
>>> .map(...)
>>> .reduce(...)
>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>>> .map(joinedStream->  joinedStream.addSink...)
>>>
>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>>>
>>> Hi Team,
>>>
>>> Could you please help me here. I’m sorry for asking on such short notice
>>> but my work has stopped due to this.
>>>
>>>
>>> Regards,
>>> Satya
>>>
>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <satyaadi...@gmail.com>
>>> wrote:
>>>
>>>> Hi  Shesnay/Team,
>>>>
>>>> Thank you so much for the reply.In the continuation of the previous
>>>> email, below is the block diagram where I am reading the file  from s3 and
>>>> pushing it to kafka.Now with the current setup, I have total 4 directory
>>>> based on the readfile method  from flink environment ,we are  creating 4
>>>> readers parallely to process the data from s3 .
>>>>
>>>> Below are my Questions:
>>>> 1. Can we restrict the no. of readers to process the  data parallely.
>>>> e.g let's say if  we have a thousand of directory , in that case i want to
>>>> restrict the no. of readers to 10 and ten parallel threads will continue
>>>> with 100 sequential reading of the directory per thread to consume the data
>>>> .
>>>>
>>>> 2.In between the two flink operators i.e s3 reader and kafka sink , i
>>>> just want to implement one more operator in order to transform the data
>>>> which i am reading from s3 bucket and then want to push into the kafka
>>>> sink. Below is my working code.Here i am finding  difficulties to
>>>> implement  map operator in order to transform the union of datastreams  by
>>>> applying union method over each directory's reader before pushing to kafka.
>>>>
>>>> List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);
>>>>
>>>> s3PathList.stream()
>>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>>> readerParallelism))
>>>> .reduce(DataStream::union)
>>>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish
>>>> to " + kafkaTopicName));
>>>>
>>>>
>>>> *Something like this I'm trying to do in order to achieve the above use
>>>> case by applying FlatMap, it could be map as well:*
>>>> s3PathList.stream()
>>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>>> readerParallelism))
>>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>>>> String>() {
>>>>    @Override
>>>>    public void flatMap(String value, Collector<String> out) throws
>>>> Exception {
>>>>     FinalJsonMessage m=objectMapper.readValue(value,
>>>> FinalJsonMessage.class);
>>>>     System.out.println("Json string:: ------"+m);
>>>>      //transformation logic
>>>>      out.collect(value);
>>>>    }
>>>>  })
>>>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish
>>>> to " + kafkaTopicName));
>>>> [image: image.png]
>>>> Request your support on the same.
>>>> Regards,
>>>> Satya
>>>>
>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <satyaadi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi @ches...@apache.org <ches...@apache.org> ,
>>>>>
>>>>> Thanks for your support, it was really helpful.
>>>>> Do you know the list of directories when you submit the job? [Yes we
>>>>> do have]
>>>>> The impletemation is progress and will get back to you if any further
>>>>> challenges we may face.
>>>>> Appreciate your support in this regard.
>>>>>
>>>>> Regards,
>>>>> Satya
>>>>>
>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <satyaadi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thank you @Chesnay let me try this change .
>>>>>>
>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> You could also try using streams to make it a little more concise:
>>>>>>>
>>>>>>> directories.stream()
>>>>>>>    .map(directory -> createInputStream(environment, directory))
>>>>>>>    .reduce(DataStream::union)
>>>>>>>    .map(joinedStream -> joinedStream.addSink(kafka));
>>>>>>>
>>>>>>>
>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>>>>>
>>>>>>> Do you know the list of directories when you submit the job?
>>>>>>>
>>>>>>> If so, then you can iterate over them, create a source for each
>>>>>>> directory, union them, and apply the sink to the union.
>>>>>>>
>>>>>>> private static
>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment 
>>>>>>> environment,
>>>>>>> String directory) {
>>>>>>>    TextInputFormat format =new TextInputFormat(new
>>>>>>> org.apache.flink.core.fs.Path(directory));
>>>>>>> format.setNestedFileEnumeration(true); return 
>>>>>>> environment.readFile(format,
>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>>>>>> FilePathFilter.createDefaultFilter()); }
>>>>>>>
>>>>>>> public static void runJob()throws Exception {
>>>>>>>    StreamExecutionEnvironment environment =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>>>>>> directories =getDirectories(); DataStream<String> joinedStreams =null; 
>>>>>>> for
>>>>>>> (String directory : directories) {
>>>>>>>       DataStream<String> inputStream =createInputStream(environment,
>>>>>>> directory); if (joinedStreams ==null) {
>>>>>>>          joinedStreams = inputStream; }else {
>>>>>>>          joinedStreams.union(inputStream); }
>>>>>>>    }
>>>>>>>    // add a sanity check that there was at least 1 directory
>>>>>>>
>>>>>>>    joinedStreams.addSink(kafka); }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>>>>>
>>>>>>> Hi Guys,
>>>>>>>
>>>>>>> Got stuck with it please help me here
>>>>>>> Regards,
>>>>>>> Satya
>>>>>>>
>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>>>>>>> <satyaadi...@gmail.com> <satyaadi...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Guys,
>>>>>>>
>>>>>>> Sorry to bother you again, but someone could help me here? Any help
>>>>>>> in
>>>>>>> this regard will be much appreciated.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Satya
>>>>>>>
>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com>
>>>>>>> <satyaadi...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Guys,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I need one help, any leads will be highly appreciated.I have written
>>>>>>> a
>>>>>>> flink streaming job to read the data from s3 bucket and push to
>>>>>>> kafka.
>>>>>>> Below is the working source that deal with single s3 path:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> TextInputFormat format = new TextInputFormat(new
>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> format.setNestedFileEnumeration(true);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> DataStream<String> inputStream = environment.readFile(format,
>>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> FilePathFilter.createDefaultFilter());
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> inputStream.addSink(kafka);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> But my requirement is get the list of paths and pass them one by one
>>>>>>> to
>>>>>>> this environment.readFile() method.How we can achieve this.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Satya
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> --------------------------
>>>>>>> Best Regards
>>>>>>> Satya Prakash
>>>>>>> (M)+91-9845111913
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> --------------------------
>>>>>>> Best Regards
>>>>>>> Satya Prakash
>>>>>>> (M)+91-9845111913
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> --------------------------
>>>>>> Best Regards
>>>>>> Satya Prakash
>>>>>> (M)+91-9845111913
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> --------------------------
>>>>> Best Regards
>>>>> Satya Prakash
>>>>> (M)+91-9845111913
>>>>>
>>>>
>>>>
>>>> --
>>>> --------------------------
>>>> Best Regards
>>>> Satya Prakash
>>>> (M)+91-9845111913
>>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>>
>>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>


-- 
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913

Reply via email to