In continuation of the above email, I have tried below code also but it is
restarting the job from the beginning.

environment.enableCheckpointing(30000L);
environment.disableOperatorChaining();
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
environment.getCheckpointConfig().setPreferCheckpointForRecovery(true);

environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, //
max failures per interval
Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate
Time.of(60, TimeUnit.SECONDS) // delay
));

Please have a look into this as well.

Regards,
Satya


On Tue, Oct 27, 2020 at 4:46 PM Satyaa Dixit <satyaadi...@gmail.com> wrote:

> Hi Chesnay & Team,
>
> I'm using already using "ContinuousFileMonitoringFunction"  but still I'm
> not able to achieve the below use case. For example once job started and it
> process half of the data and in between job got failed because of below
> exception. how to avoid this exception? could you please help us on this
> too.
>
>
>
>
>
> *org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception when processing split: [110]
> s3://messages/qa/test-data/DEMO/0077/data/2020-09-03/10/2020-09-03_10:01:51_53807.json
> mod@ 1599127313000 : 0 + 345 at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
> Caused by: java.lang.OutOfMemoryError: Java heap space*
>
> If I start the job back again, it is causing the data duplication. How to
> fix this negative use case with flink s3 reader source using checkpointing?
>
> Regards,
>
> Satya
>
> On Thu, Oct 22, 2020 at 7:05 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> The existing ContinuousFileMonitoringFunction and
>> ContinuousFileReaderOperator already take care of that.
>> Unless you aren't re-implementing them from scratch you shouldn't have
>> to do anything.
>>
>> On 10/22/2020 1:47 PM, Satyaa Dixit wrote:
>> > Hi Chesnay,
>> > Thanks for your support.It helped a lot. I need one more help on how to
>> do
>> > checkpointing as part of the s3 reader source in case if some failure
>> > happens due to OutOfMemoryError exception or it could be any other
>> failure,
>> > and want to recover the data from last reader splitted offset during
>> > restart the job in continuation of the previous job in order to avoid
>> > duplicate data.
>> >
>> > Thanks,
>> > Satya
>> >
>> > On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler <ches...@apache.org>
>> wrote:
>> >
>> >> hmm...I don't see an easy way.
>> >> You may have to replicated StreamExecutionEnvironment#createFileInput
>> and
>> >> create a custom ContinuousFileMonitoringFunction that ignores missing
>> >> files in it's run() method.
>> >>
>> >> Alternatively, use some library to check the existence of the S3
>> >> directories before creating the sources.
>> >>
>> >> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
>> >>> Hi Chesnay/Team,
>> >>>
>> >>> Thanks, we got the fix for our problem but got stuck with the below
>> >> issue,
>> >>> request your support.
>> >>>
>> >>>
>> >>> How to catch FileNotFoundException during runtime,if any directory is
>> >>> missing in s3 as part of the below source code to avoid job failure.
>> >>>
>> >>>
>> >>> s3PathList.stream().map(directory ->
>> >>> S3Service.createInputStream(environment, directory,
>> readerParallelism))
>> >>> .reduce(DataStream::union).map(joinedStream ->
>> >>> joinedStream.addSink(kafkaProducer));
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> Regards,
>> >>>
>> >>> Satya
>> >>>
>> >>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <satyaadi...@gmail.com>
>> >> wrote:
>> >>>> Hi Chesnay/Team
>> >>>>
>> >>>> Thank you so much.I have tried with the solution but it is not
>> working
>> >> as
>> >>>> expected showing compilation issues and tried all the ways .Please
>> find
>> >>>> below code snippet :
>> >>>>
>> >>>> s3PathList.stream()
>> >>>> .map(directory -> S3Service.customCreateInputStream(environment,
>> >>>> directory, readerParallelism))
>> >>>> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
>> >>>> IntermidiateOperator()).map(joinedStream ->
>> >>>> joinedStream.addSink(kafkaProducer).name("Publish to " +
>> >> kafkaTopicName));
>> >>>> public static class IntermidiateOperator implements
>> >>>> FlatMapFunction<String, String> {
>> >>>> private static final ObjectMapper objectMapper1 = new ObjectMapper();
>> >>>>
>> >>>> @Override
>> >>>> public void flatMap(String value, Collector<String> out) throws
>> >> Exception {
>> >>>> Test m = objectMapper1.readValue(value, Test.class);
>> >>>> System.out.println("Json string:: ------" + m);
>> >>>> // logger.info("Json string:: ------"+m);
>> >>>> out.collect(value);
>> >>>> }
>> >>>> }
>> >>>>
>> >>>> Also just to clarify one doubt , How to handle
>> *FileNotFoundException*
>> >> as
>> >>>> part of flink reader during runtime if in case directory is not
>> >> available
>> >>>> in s3. How to avoid job failure in that use case.
>> >>>>
>> >>>> Regards,
>> >>>> Satya
>> >>>>
>> >>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <satyaadi...@gmail.com
>> >
>> >>>> wrote:
>> >>>>
>> >>>>> Thanks, I'll check it out.
>> >>>>>
>> >>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <
>> ches...@apache.org>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> 1) There's no mechanism in the API to restrict the number of
>> number
>> >> of
>> >>>>>> readers across several sources. I can't quite think of a way to
>> >> achieve
>> >>>>>> this; maybe Kostas has an idea.
>> >>>>>>
>> >>>>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
>> >>>>>>
>> >>>>>> Try this:
>> >>>>>>
>> >>>>>> s3PathList.stream()
>> >>>>>> .map(...)
>> >>>>>> .reduce(...)
>> >>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>> >>>>>> .map(joinedStream->  joinedStream.addSink...)
>> >>>>>>
>> >>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>> >>>>>>
>> >>>>>> Hi Team,
>> >>>>>>
>> >>>>>> Could you please help me here. I’m sorry for asking on such short
>> >> notice
>> >>>>>> but my work has stopped due to this.
>> >>>>>>
>> >>>>>>
>> >>>>>> Regards,
>> >>>>>> Satya
>> >>>>>>
>> >>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <satyaadi...@gmail.com
>> >
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Hi  Shesnay/Team,
>> >>>>>>>
>> >>>>>>> Thank you so much for the reply.In the continuation of the
>> previous
>> >>>>>>> email, below is the block diagram where I am reading the file
>> from
>> >> s3 and
>> >>>>>>> pushing it to kafka.Now with the current setup, I have total 4
>> >> directory
>> >>>>>>> based on the readfile method  from flink environment ,we are
>> >> creating 4
>> >>>>>>> readers parallely to process the data from s3 .
>> >>>>>>>
>> >>>>>>> Below are my Questions:
>> >>>>>>> 1. Can we restrict the no. of readers to process the  data
>> parallely.
>> >>>>>>> e.g let's say if  we have a thousand of directory , in that case i
>> >> want to
>> >>>>>>> restrict the no. of readers to 10 and ten parallel threads will
>> >> continue
>> >>>>>>> with 100 sequential reading of the directory per thread to consume
>> >> the data
>> >>>>>>> .
>> >>>>>>>
>> >>>>>>> 2.In between the two flink operators i.e s3 reader and kafka sink
>> , i
>> >>>>>>> just want to implement one more operator in order to transform the
>> >> data
>> >>>>>>> which i am reading from s3 bucket and then want to push into the
>> >> kafka
>> >>>>>>> sink. Below is my working code.Here i am finding  difficulties to
>> >>>>>>> implement  map operator in order to transform the union of
>> >> datastreams  by
>> >>>>>>> applying union method over each directory's reader before pushing
>> to
>> >> kafka.
>> >>>>>>> List<String> s3PathList =
>> >> S3Service.getListOfS3Paths(finalParameters);
>> >>>>>>> s3PathList.stream()
>> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
>> directory,
>> >>>>>>> readerParallelism))
>> >>>>>>> .reduce(DataStream::union)
>> >>>>>>> .map(joinedStream ->
>> >> joinedStream.addSink(kafkaProducer).name("Publish
>> >>>>>>> to " + kafkaTopicName));
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> *Something like this I'm trying to do in order to achieve the
>> above
>> >> use
>> >>>>>>> case by applying FlatMap, it could be map as well:*
>> >>>>>>> s3PathList.stream()
>> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
>> directory,
>> >>>>>>> readerParallelism))
>> >>>>>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>> >>>>>>> String>() {
>> >>>>>>>      @Override
>> >>>>>>>      public void flatMap(String value, Collector<String> out)
>> throws
>> >>>>>>> Exception {
>> >>>>>>>       FinalJsonMessage m=objectMapper.readValue(value,
>> >>>>>>> FinalJsonMessage.class);
>> >>>>>>>       System.out.println("Json string:: ------"+m);
>> >>>>>>>        //transformation logic
>> >>>>>>>        out.collect(value);
>> >>>>>>>      }
>> >>>>>>>    })
>> >>>>>>> .map(joinedStream ->
>> >> joinedStream.addSink(kafkaProducer).name("Publish
>> >>>>>>> to " + kafkaTopicName));
>> >>>>>>> [image: image.png]
>> >>>>>>> Request your support on the same.
>> >>>>>>> Regards,
>> >>>>>>> Satya
>> >>>>>>>
>> >>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <
>> satyaadi...@gmail.com>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi @ches...@apache.org <ches...@apache.org> ,
>> >>>>>>>>
>> >>>>>>>> Thanks for your support, it was really helpful.
>> >>>>>>>> Do you know the list of directories when you submit the job?
>> [Yes we
>> >>>>>>>> do have]
>> >>>>>>>> The impletemation is progress and will get back to you if any
>> >> further
>> >>>>>>>> challenges we may face.
>> >>>>>>>> Appreciate your support in this regard.
>> >>>>>>>>
>> >>>>>>>> Regards,
>> >>>>>>>> Satya
>> >>>>>>>>
>> >>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <
>> satyaadi...@gmail.com>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Thank you @Chesnay let me try this change .
>> >>>>>>>>>
>> >>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <
>> >> ches...@apache.org>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> You could also try using streams to make it a little more
>> concise:
>> >>>>>>>>>>
>> >>>>>>>>>> directories.stream()
>> >>>>>>>>>>      .map(directory -> createInputStream(environment,
>> directory))
>> >>>>>>>>>>      .reduce(DataStream::union)
>> >>>>>>>>>>      .map(joinedStream -> joinedStream.addSink(kafka));
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Do you know the list of directories when you submit the job?
>> >>>>>>>>>>
>> >>>>>>>>>> If so, then you can iterate over them, create a source for each
>> >>>>>>>>>> directory, union them, and apply the sink to the union.
>> >>>>>>>>>>
>> >>>>>>>>>> private static
>> >>>>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment
>> >> environment,
>> >>>>>>>>>> String directory) {
>> >>>>>>>>>>      TextInputFormat format =new TextInputFormat(new
>> >>>>>>>>>> org.apache.flink.core.fs.Path(directory));
>> >>>>>>>>>> format.setNestedFileEnumeration(true); return
>> >> environment.readFile(format,
>> >>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>> >>>>>>>>>> FilePathFilter.createDefaultFilter()); }
>> >>>>>>>>>>
>> >>>>>>>>>> public static void runJob()throws Exception {
>> >>>>>>>>>>      StreamExecutionEnvironment environment =
>> >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>> List<String>
>> >>>>>>>>>> directories =getDirectories(); DataStream<String> joinedStreams
>> >> =null; for
>> >>>>>>>>>> (String directory : directories) {
>> >>>>>>>>>>         DataStream<String> inputStream
>> >> =createInputStream(environment,
>> >>>>>>>>>> directory); if (joinedStreams ==null) {
>> >>>>>>>>>>            joinedStreams = inputStream; }else {
>> >>>>>>>>>>            joinedStreams.union(inputStream); }
>> >>>>>>>>>>      }
>> >>>>>>>>>>      // add a sanity check that there was at least 1 directory
>> >>>>>>>>>>
>> >>>>>>>>>>      joinedStreams.addSink(kafka); }
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Guys,
>> >>>>>>>>>>
>> >>>>>>>>>> Got stuck with it please help me here
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Satya
>> >>>>>>>>>>
>> >>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>> >>>>>>>>>> <satyaadi...@gmail.com> <satyaadi...@gmail.com> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Guys,
>> >>>>>>>>>>
>> >>>>>>>>>> Sorry to bother you again, but someone could help me here? Any
>> >> help
>> >>>>>>>>>> in
>> >>>>>>>>>> this regard will be much appreciated.
>> >>>>>>>>>>
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Satya
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <
>> >> satyaadi...@gmail.com>
>> >>>>>>>>>> <satyaadi...@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Guys,
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> I need one help, any leads will be highly appreciated.I have
>> >> written
>> >>>>>>>>>> a
>> >>>>>>>>>> flink streaming job to read the data from s3 bucket and push to
>> >>>>>>>>>> kafka.
>> >>>>>>>>>> Below is the working source that deal with single s3 path:
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> TextInputFormat format = new TextInputFormat(new
>> >>>>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> format.setNestedFileEnumeration(true);
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> DataStream<String> inputStream = environment.readFile(format,
>> >>>>>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE,
>> >> -1,
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> FilePathFilter.createDefaultFilter());
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> inputStream.addSink(kafka);
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> But my requirement is get the list of paths and pass them one
>> by
>> >> one
>> >>>>>>>>>> to
>> >>>>>>>>>> this environment.readFile() method.How we can achieve this.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks,
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Satya
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> --
>> >>>>>>>>>> --------------------------
>> >>>>>>>>>> Best Regards
>> >>>>>>>>>> Satya Prakash
>> >>>>>>>>>> (M)+91-9845111913
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> --
>> >>>>>>>>>>
>> >>>>>>>>>> --------------------------
>> >>>>>>>>>> Best Regards
>> >>>>>>>>>> Satya Prakash
>> >>>>>>>>>> (M)+91-9845111913
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>> --
>> >>>>>>>>> --------------------------
>> >>>>>>>>> Best Regards
>> >>>>>>>>> Satya Prakash
>> >>>>>>>>> (M)+91-9845111913
>> >>>>>>>>>
>> >>>>>>>> --
>> >>>>>>>> --------------------------
>> >>>>>>>> Best Regards
>> >>>>>>>> Satya Prakash
>> >>>>>>>> (M)+91-9845111913
>> >>>>>>>>
>> >>>>>>> --
>> >>>>>>> --------------------------
>> >>>>>>> Best Regards
>> >>>>>>> Satya Prakash
>> >>>>>>> (M)+91-9845111913
>> >>>>>>>
>> >>>>>> --
>> >>>>>> --------------------------
>> >>>>>> Best Regards
>> >>>>>> Satya Prakash
>> >>>>>> (M)+91-9845111913
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>> --
>> >>>>> --------------------------
>> >>>>> Best Regards
>> >>>>> Satya Prakash
>> >>>>> (M)+91-9845111913
>> >>>>>
>> >>>> --
>> >>>> --------------------------
>> >>>> Best Regards
>> >>>> Satya Prakash
>> >>>> (M)+91-9845111913
>> >>>>
>> >>
>>
>>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>


-- 
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913

Reply via email to