To fix the OutOfMemory error you will have to provide Flink with more memory, use more task executors or possibly reduce the parallelism.

Did the job fail before the first checkpoint has occurred?

What sink are you using?

On 10/27/2020 12:45 PM, Satyaa Dixit wrote:
In continuation of the above email, I have tried below code also but it is restarting the job from the beginning.

environment.enableCheckpointing(30000L);
environment.disableOperatorChaining();
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
environment.getCheckpointConfig().setPreferCheckpointForRecovery(true);

environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, // max failures per interval
Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate
Time.of(60, TimeUnit.SECONDS) // delay
));

Please have a look into this as well.

Regards,
Satya

On Tue, Oct 27, 2020 at 4:46 PM Satyaa Dixit <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>> wrote:

    Hi Chesnay & Team,

    I'm using already using "ContinuousFileMonitoringFunction"  but
    still I'm not able to achieve the below use case. For example once
    job started and it process half of the data and in between job got
    failed because of below exception. how to avoid this exception?
    could you please help us on this too.

    *org.apache.flink.streaming.runtime.tasks.AsynchronousException:
    Caught exception when processing split: [110]
    
s3://messages/qa/test-data/DEMO/0077/data/2020-09-03/10/2020-09-03_10:01:51_53807.jsonmod@
    1599127313000 : 0 + 345
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
    at
    
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
    Caused by: java.lang.OutOfMemoryError: Java heap space*

    If I start the job back again, it is causing the data duplication.
    How to fix this negative use case with flink s3 reader source
    using checkpointing?

    Regards,

    Satya


    On Thu, Oct 22, 2020 at 7:05 PM Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        The existing ContinuousFileMonitoringFunction and
        ContinuousFileReaderOperator already take care of that.
        Unless you aren't re-implementing them from scratch you
        shouldn't have
        to do anything.

        On 10/22/2020 1:47 PM, Satyaa Dixit wrote:
        > Hi Chesnay,
        > Thanks for your support.It helped a lot. I need one more
        help on how to do
        > checkpointing as part of the s3 reader source in case if
        some failure
        > happens due to OutOfMemoryError exception or it could be any
        other failure,
        > and want to recover the data from last reader splitted
        offset during
        > restart the job in continuation of the previous job in order
        to avoid
        > duplicate data.
        >
        > Thanks,
        > Satya
        >
        > On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>> wrote:
        >
        >> hmm...I don't see an easy way.
        >> You may have to replicated
        StreamExecutionEnvironment#createFileInput and
        >> create a custom ContinuousFileMonitoringFunction that
        ignores missing
        >> files in it's run() method.
        >>
        >> Alternatively, use some library to check the existence of
        the S3
        >> directories before creating the sources.
        >>
        >> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
        >>> Hi Chesnay/Team,
        >>>
        >>> Thanks, we got the fix for our problem but got stuck with
        the below
        >> issue,
        >>> request your support.
        >>>
        >>>
        >>> How to catch FileNotFoundException during runtime,if any
        directory is
        >>> missing in s3 as part of the below source code to avoid
        job failure.
        >>>
        >>>
        >>> s3PathList.stream().map(directory ->
        >>> S3Service.createInputStream(environment, directory,
        readerParallelism))
        >>> .reduce(DataStream::union).map(joinedStream ->
        >>> joinedStream.addSink(kafkaProducer));
        >>>
        >>>
        >>>
        >>>
        >>>
        >>> Regards,
        >>>
        >>> Satya
        >>>
        >>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit
        <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>>
        >> wrote:
        >>>> Hi Chesnay/Team
        >>>>
        >>>> Thank you so much.I have tried with the solution but it
        is not working
        >> as
        >>>> expected showing compilation issues and tried all the
        ways .Please find
        >>>> below code snippet :
        >>>>
        >>>> s3PathList.stream()
        >>>> .map(directory ->
        S3Service.customCreateInputStream(environment,
        >>>> directory, readerParallelism))
        >>>> .reduce(DataStream::union).map(joinedStream ->
        stream.flatMap(new
        >>>> IntermidiateOperator()).map(joinedStream ->
        >>>> joinedStream.addSink(kafkaProducer).name("Publish to " +
        >> kafkaTopicName));
        >>>> public static class IntermidiateOperator implements
        >>>> FlatMapFunction<String, String> {
        >>>> private static final ObjectMapper objectMapper1 = new
        ObjectMapper();
        >>>>
        >>>> @Override
        >>>> public void flatMap(String value, Collector<String> out)
        throws
        >> Exception {
        >>>> Test m = objectMapper1.readValue(value, Test.class);
        >>>> System.out.println("Json string:: ------" + m);
        >>>> // logger.info <http://logger.info>("Json string::
        ------"+m);
        >>>> out.collect(value);
        >>>> }
        >>>> }
        >>>>
        >>>> Also just to clarify one doubt , How to handle
        *FileNotFoundException*
        >> as
        >>>> part of flink reader during runtime if in case directory
        is not
        >> available
        >>>> in s3. How to avoid job failure in that use case.
        >>>>
        >>>> Regards,
        >>>> Satya
        >>>>
        >>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit
        <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>>
        >>>> wrote:
        >>>>
        >>>>> Thanks, I'll check it out.
        >>>>>
        >>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>>
        >>>>> wrote:
        >>>>>
        >>>>>> 1) There's no mechanism in the API to restrict the
        number of  number
        >> of
        >>>>>> readers across several sources. I can't quite think of
        a way to
        >> achieve
        >>>>>> this; maybe Kostas has an idea.
        >>>>>>
        >>>>>> 2) You're mixing  up the Java Streams and Finks
        DataStream API.
        >>>>>>
        >>>>>> Try this:
        >>>>>>
        >>>>>> s3PathList.stream()
        >>>>>> .map(...)
        >>>>>> .reduce(...)
        >>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
        >>>>>> .map(joinedStream-> joinedStream.addSink...)
        >>>>>>
        >>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
        >>>>>>
        >>>>>> Hi Team,
        >>>>>>
        >>>>>> Could you please help me here. I’m sorry for asking on
        such short
        >> notice
        >>>>>> but my work has stopped due to this.
        >>>>>>
        >>>>>>
        >>>>>> Regards,
        >>>>>> Satya
        >>>>>>
        >>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit
        <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>>
        >>>>>> wrote:
        >>>>>>
        >>>>>>> Hi  Shesnay/Team,
        >>>>>>>
        >>>>>>> Thank you so much for the reply.In the continuation of
        the previous
        >>>>>>> email, below is the block diagram where I am reading
        the file  from
        >> s3 and
        >>>>>>> pushing it to kafka.Now with the current setup, I have
        total 4
        >> directory
        >>>>>>> based on the readfile method from flink environment
        ,we are
        >> creating 4
        >>>>>>> readers parallely to process the data from s3 .
        >>>>>>>
        >>>>>>> Below are my Questions:
        >>>>>>> 1. Can we restrict the no. of readers to process the 
        data parallely.
        >>>>>>> e.g let's say if  we have a thousand of directory , in
        that case i
        >> want to
        >>>>>>> restrict the no. of readers to 10 and ten parallel
        threads will
        >> continue
        >>>>>>> with 100 sequential reading of the directory per
        thread to consume
        >> the data
        >>>>>>> .
        >>>>>>>
        >>>>>>> 2.In between the two flink operators i.e s3 reader and
        kafka sink , i
        >>>>>>> just want to implement one more operator in order to
        transform the
        >> data
        >>>>>>> which i am reading from s3 bucket and then want to
        push into the
        >> kafka
        >>>>>>> sink. Below is my working code.Here i am finding 
        difficulties to
        >>>>>>> implement  map operator in order to transform the union of
        >> datastreams  by
        >>>>>>> applying union method over each directory's reader
        before pushing to
        >> kafka.
        >>>>>>> List<String> s3PathList =
        >> S3Service.getListOfS3Paths(finalParameters);
        >>>>>>> s3PathList.stream()
        >>>>>>> .map(directory ->
        S3Service.customInputStream(environment, directory,
        >>>>>>> readerParallelism))
        >>>>>>> .reduce(DataStream::union)
        >>>>>>> .map(joinedStream ->
        >> joinedStream.addSink(kafkaProducer).name("Publish
        >>>>>>> to " + kafkaTopicName));
        >>>>>>>
        >>>>>>>
        >>>>>>> *Something like this I'm trying to do in order to
        achieve the above
        >> use
        >>>>>>> case by applying FlatMap, it could be map as well:*
        >>>>>>> s3PathList.stream()
        >>>>>>> .map(directory ->
        S3Service.customInputStream(environment, directory,
        >>>>>>> readerParallelism))
        >>>>>>> .reduce(DataStream::union).flatMap(new
        FlatMapFunction<DataStream,
        >>>>>>> String>() {
        >>>>>>>      @Override
        >>>>>>>      public void flatMap(String value,
        Collector<String> out) throws
        >>>>>>> Exception {
        >>>>>>>       FinalJsonMessage m=objectMapper.readValue(value,
        >>>>>>> FinalJsonMessage.class);
        >>>>>>>  System.out.println("Json string:: ------"+m);
        >>>>>>>        //transformation logic
        >>>>>>>        out.collect(value);
        >>>>>>>      }
        >>>>>>>    })
        >>>>>>> .map(joinedStream ->
        >> joinedStream.addSink(kafkaProducer).name("Publish
        >>>>>>> to " + kafkaTopicName));
        >>>>>>> [image: image.png]
        >>>>>>> Request your support on the same.
        >>>>>>> Regards,
        >>>>>>> Satya
        >>>>>>>
        >>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit
        <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>>
        >>>>>>> wrote:
        >>>>>>>
        >>>>>>>> Hi @ches...@apache.org <mailto:ches...@apache.org>
        <ches...@apache.org <mailto:ches...@apache.org>> ,
        >>>>>>>>
        >>>>>>>> Thanks for your support, it was really helpful.
        >>>>>>>> Do you know the list of directories when you submit
        the job? [Yes we
        >>>>>>>> do have]
        >>>>>>>> The impletemation is progress and will get back to
        you if any
        >> further
        >>>>>>>> challenges we may face.
        >>>>>>>> Appreciate your support in this regard.
        >>>>>>>>
        >>>>>>>> Regards,
        >>>>>>>> Satya
        >>>>>>>>
        >>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit
        <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>>
        >>>>>>>> wrote:
        >>>>>>>>
        >>>>>>>>> Thank you @Chesnay let me try this change .
        >>>>>>>>>
        >>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <
        >> ches...@apache.org <mailto:ches...@apache.org>>
        >>>>>>>>> wrote:
        >>>>>>>>>
        >>>>>>>>>> You could also try using streams to make it a
        little more concise:
        >>>>>>>>>>
        >>>>>>>>>> directories.stream()
        >>>>>>>>>> .map(directory -> createInputStream(environment,
        directory))
        >>>>>>>>>> .reduce(DataStream::union)
        >>>>>>>>>> .map(joinedStream -> joinedStream.addSink(kafka));
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
        >>>>>>>>>>
        >>>>>>>>>> Do you know the list of directories when you submit
        the job?
        >>>>>>>>>>
        >>>>>>>>>> If so, then you can iterate over them, create a
        source for each
        >>>>>>>>>> directory, union them, and apply the sink to the union.
        >>>>>>>>>>
        >>>>>>>>>> private static
        >>>>>>>>>>
        DataStream<String>createInputStream(StreamExecutionEnvironment
        >> environment,
        >>>>>>>>>> String directory) {
        >>>>>>>>>> TextInputFormat format =new TextInputFormat(new
        >>>>>>>>>> org.apache.flink.core.fs.Path(directory));
        >>>>>>>>>> format.setNestedFileEnumeration(true); return
        >> environment.readFile(format,
        >>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
        >>>>>>>>>> FilePathFilter.createDefaultFilter()); }
        >>>>>>>>>>
        >>>>>>>>>> public static void runJob()throws Exception {
        >>>>>>>>>> StreamExecutionEnvironment environment =
        >>>>>>>>>>
        StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
        >>>>>>>>>> directories =getDirectories(); DataStream<String>
        joinedStreams
        >> =null; for
        >>>>>>>>>> (String directory : directories) {
        >>>>>>>>>>  DataStream<String> inputStream
        >> =createInputStream(environment,
        >>>>>>>>>> directory); if (joinedStreams ==null) {
        >>>>>>>>>> joinedStreams = inputStream; }else {
        >>>>>>>>>> joinedStreams.union(inputStream); }
        >>>>>>>>>>      }
        >>>>>>>>>>      // add a sanity check that there was at least
        1 directory
        >>>>>>>>>>
        >>>>>>>>>> joinedStreams.addSink(kafka); }
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
        >>>>>>>>>>
        >>>>>>>>>> Hi Guys,
        >>>>>>>>>>
        >>>>>>>>>> Got stuck with it please help me here
        >>>>>>>>>> Regards,
        >>>>>>>>>> Satya
        >>>>>>>>>>
        >>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
        >>>>>>>>>> <satyaadi...@gmail.com
        <mailto:satyaadi...@gmail.com>> <satyaadi...@gmail.com
        <mailto:satyaadi...@gmail.com>> wrote:
        >>>>>>>>>>
        >>>>>>>>>> Hi Guys,
        >>>>>>>>>>
        >>>>>>>>>> Sorry to bother you again, but someone could help
        me here? Any
        >> help
        >>>>>>>>>> in
        >>>>>>>>>> this regard will be much appreciated.
        >>>>>>>>>>
        >>>>>>>>>> Regards,
        >>>>>>>>>> Satya
        >>>>>>>>>>
        >>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <
        >> satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>>
        >>>>>>>>>> <satyaadi...@gmail.com <mailto:satyaadi...@gmail.com>>
        >>>>>>>>>> wrote:
        >>>>>>>>>>
        >>>>>>>>>> Hi Guys,
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> I need one help, any leads will be highly
        appreciated.I have
        >> written
        >>>>>>>>>> a
        >>>>>>>>>> flink streaming job to read the data from s3 bucket
        and push to
        >>>>>>>>>> kafka.
        >>>>>>>>>> Below is the working source that deal with single
        s3 path:
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> TextInputFormat format = new TextInputFormat(new
        >>>>>>>>>>
        org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> format.setNestedFileEnumeration(true);
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> DataStream<String> inputStream =
        environment.readFile(format,
        >>>>>>>>>> "s3a://directory/2020-09-03/",
        FileProcessingMode.PROCESS_ONCE,
        >> -1,
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> FilePathFilter.createDefaultFilter());
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> inputStream.addSink(kafka);
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> But my requirement is get the list of paths and
        pass them one by
        >> one
        >>>>>>>>>> to
        >>>>>>>>>> this environment.readFile() method.How we can
        achieve this.
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> Thanks,
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> Satya
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> --
        >>>>>>>>>> --------------------------
        >>>>>>>>>> Best Regards
        >>>>>>>>>> Satya Prakash
        >>>>>>>>>> (M)+91-9845111913
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>> --
        >>>>>>>>>>
        >>>>>>>>>> --------------------------
        >>>>>>>>>> Best Regards
        >>>>>>>>>> Satya Prakash
        >>>>>>>>>> (M)+91-9845111913
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>>>
        >>>>>>>>> --
        >>>>>>>>> --------------------------
        >>>>>>>>> Best Regards
        >>>>>>>>> Satya Prakash
        >>>>>>>>> (M)+91-9845111913
        >>>>>>>>>
        >>>>>>>> --
        >>>>>>>> --------------------------
        >>>>>>>> Best Regards
        >>>>>>>> Satya Prakash
        >>>>>>>> (M)+91-9845111913
        >>>>>>>>
        >>>>>>> --
        >>>>>>> --------------------------
        >>>>>>> Best Regards
        >>>>>>> Satya Prakash
        >>>>>>> (M)+91-9845111913
        >>>>>>>
        >>>>>> --
        >>>>>> --------------------------
        >>>>>> Best Regards
        >>>>>> Satya Prakash
        >>>>>> (M)+91-9845111913
        >>>>>>
        >>>>>>
        >>>>>>
        >>>>> --
        >>>>> --------------------------
        >>>>> Best Regards
        >>>>> Satya Prakash
        >>>>> (M)+91-9845111913
        >>>>>
        >>>> --
        >>>> --------------------------
        >>>> Best Regards
        >>>> Satya Prakash
        >>>> (M)+91-9845111913
        >>>>
        >>



-- --------------------------
    Best Regards
    Satya Prakash
    (M)+91-9845111913



--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913


Reply via email to