In continuation of the above email, I have tried below code also but it is restarting the job from the beginning.
environment.enableCheckpointing(30000L); environment.disableOperatorChaining(); environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1); environment.getCheckpointConfig().setPreferCheckpointForRecovery(true); environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, // max failures per interval Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate Time.of(60, TimeUnit.SECONDS) // delay )); Please have a look into this as well. Regards, Satya On Tue, Oct 27, 2020 at 4:46 PM Satyaa Dixit <satyaadi...@gmail.com> wrote: > Hi Chesnay & Team, > > I'm using already using "ContinuousFileMonitoringFunction" but still I'm > not able to achieve the below use case. For example once job started and it > process half of the data and in between job got failed because of below > exception. how to avoid this exception? could you please help us on this > too. > > > > > > *org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught > exception when processing split: [110] > s3://messages/qa/test-data/DEMO/0077/data/2020-09-03/10/2020-09-03_10:01:51_53807.json > mod@ 1599127313000 : 0 + 345 at > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357) > Caused by: java.lang.OutOfMemoryError: Java heap space* > > If I start the job back again, it is causing the data duplication. How to > fix this negative use case with flink s3 reader source using checkpointing? > > Regards, > > Satya > > On Thu, Oct 22, 2020 at 7:05 PM Chesnay Schepler <ches...@apache.org> > wrote: > >> The existing ContinuousFileMonitoringFunction and >> ContinuousFileReaderOperator already take care of that. >> Unless you aren't re-implementing them from scratch you shouldn't have >> to do anything. >> >> On 10/22/2020 1:47 PM, Satyaa Dixit wrote: >> > Hi Chesnay, >> > Thanks for your support.It helped a lot. I need one more help on how to >> do >> > checkpointing as part of the s3 reader source in case if some failure >> > happens due to OutOfMemoryError exception or it could be any other >> failure, >> > and want to recover the data from last reader splitted offset during >> > restart the job in continuation of the previous job in order to avoid >> > duplicate data. >> > >> > Thanks, >> > Satya >> > >> > On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler <ches...@apache.org> >> wrote: >> > >> >> hmm...I don't see an easy way. >> >> You may have to replicated StreamExecutionEnvironment#createFileInput >> and >> >> create a custom ContinuousFileMonitoringFunction that ignores missing >> >> files in it's run() method. >> >> >> >> Alternatively, use some library to check the existence of the S3 >> >> directories before creating the sources. >> >> >> >> On 10/15/2020 11:49 AM, Satyaa Dixit wrote: >> >>> Hi Chesnay/Team, >> >>> >> >>> Thanks, we got the fix for our problem but got stuck with the below >> >> issue, >> >>> request your support. >> >>> >> >>> >> >>> How to catch FileNotFoundException during runtime,if any directory is >> >>> missing in s3 as part of the below source code to avoid job failure. >> >>> >> >>> >> >>> s3PathList.stream().map(directory -> >> >>> S3Service.createInputStream(environment, directory, >> readerParallelism)) >> >>> .reduce(DataStream::union).map(joinedStream -> >> >>> joinedStream.addSink(kafkaProducer)); >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> Regards, >> >>> >> >>> Satya >> >>> >> >>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <satyaadi...@gmail.com> >> >> wrote: >> >>>> Hi Chesnay/Team >> >>>> >> >>>> Thank you so much.I have tried with the solution but it is not >> working >> >> as >> >>>> expected showing compilation issues and tried all the ways .Please >> find >> >>>> below code snippet : >> >>>> >> >>>> s3PathList.stream() >> >>>> .map(directory -> S3Service.customCreateInputStream(environment, >> >>>> directory, readerParallelism)) >> >>>> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new >> >>>> IntermidiateOperator()).map(joinedStream -> >> >>>> joinedStream.addSink(kafkaProducer).name("Publish to " + >> >> kafkaTopicName)); >> >>>> public static class IntermidiateOperator implements >> >>>> FlatMapFunction<String, String> { >> >>>> private static final ObjectMapper objectMapper1 = new ObjectMapper(); >> >>>> >> >>>> @Override >> >>>> public void flatMap(String value, Collector<String> out) throws >> >> Exception { >> >>>> Test m = objectMapper1.readValue(value, Test.class); >> >>>> System.out.println("Json string:: ------" + m); >> >>>> // logger.info("Json string:: ------"+m); >> >>>> out.collect(value); >> >>>> } >> >>>> } >> >>>> >> >>>> Also just to clarify one doubt , How to handle >> *FileNotFoundException* >> >> as >> >>>> part of flink reader during runtime if in case directory is not >> >> available >> >>>> in s3. How to avoid job failure in that use case. >> >>>> >> >>>> Regards, >> >>>> Satya >> >>>> >> >>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <satyaadi...@gmail.com >> > >> >>>> wrote: >> >>>> >> >>>>> Thanks, I'll check it out. >> >>>>> >> >>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler < >> ches...@apache.org> >> >>>>> wrote: >> >>>>> >> >>>>>> 1) There's no mechanism in the API to restrict the number of >> number >> >> of >> >>>>>> readers across several sources. I can't quite think of a way to >> >> achieve >> >>>>>> this; maybe Kostas has an idea. >> >>>>>> >> >>>>>> 2) You're mixing up the Java Streams and Finks DataStream API. >> >>>>>> >> >>>>>> Try this: >> >>>>>> >> >>>>>> s3PathList.stream() >> >>>>>> .map(...) >> >>>>>> .reduce(...) >> >>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...)) >> >>>>>> .map(joinedStream-> joinedStream.addSink...) >> >>>>>> >> >>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote: >> >>>>>> >> >>>>>> Hi Team, >> >>>>>> >> >>>>>> Could you please help me here. I’m sorry for asking on such short >> >> notice >> >>>>>> but my work has stopped due to this. >> >>>>>> >> >>>>>> >> >>>>>> Regards, >> >>>>>> Satya >> >>>>>> >> >>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <satyaadi...@gmail.com >> > >> >>>>>> wrote: >> >>>>>> >> >>>>>>> Hi Shesnay/Team, >> >>>>>>> >> >>>>>>> Thank you so much for the reply.In the continuation of the >> previous >> >>>>>>> email, below is the block diagram where I am reading the file >> from >> >> s3 and >> >>>>>>> pushing it to kafka.Now with the current setup, I have total 4 >> >> directory >> >>>>>>> based on the readfile method from flink environment ,we are >> >> creating 4 >> >>>>>>> readers parallely to process the data from s3 . >> >>>>>>> >> >>>>>>> Below are my Questions: >> >>>>>>> 1. Can we restrict the no. of readers to process the data >> parallely. >> >>>>>>> e.g let's say if we have a thousand of directory , in that case i >> >> want to >> >>>>>>> restrict the no. of readers to 10 and ten parallel threads will >> >> continue >> >>>>>>> with 100 sequential reading of the directory per thread to consume >> >> the data >> >>>>>>> . >> >>>>>>> >> >>>>>>> 2.In between the two flink operators i.e s3 reader and kafka sink >> , i >> >>>>>>> just want to implement one more operator in order to transform the >> >> data >> >>>>>>> which i am reading from s3 bucket and then want to push into the >> >> kafka >> >>>>>>> sink. Below is my working code.Here i am finding difficulties to >> >>>>>>> implement map operator in order to transform the union of >> >> datastreams by >> >>>>>>> applying union method over each directory's reader before pushing >> to >> >> kafka. >> >>>>>>> List<String> s3PathList = >> >> S3Service.getListOfS3Paths(finalParameters); >> >>>>>>> s3PathList.stream() >> >>>>>>> .map(directory -> S3Service.customInputStream(environment, >> directory, >> >>>>>>> readerParallelism)) >> >>>>>>> .reduce(DataStream::union) >> >>>>>>> .map(joinedStream -> >> >> joinedStream.addSink(kafkaProducer).name("Publish >> >>>>>>> to " + kafkaTopicName)); >> >>>>>>> >> >>>>>>> >> >>>>>>> *Something like this I'm trying to do in order to achieve the >> above >> >> use >> >>>>>>> case by applying FlatMap, it could be map as well:* >> >>>>>>> s3PathList.stream() >> >>>>>>> .map(directory -> S3Service.customInputStream(environment, >> directory, >> >>>>>>> readerParallelism)) >> >>>>>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream, >> >>>>>>> String>() { >> >>>>>>> @Override >> >>>>>>> public void flatMap(String value, Collector<String> out) >> throws >> >>>>>>> Exception { >> >>>>>>> FinalJsonMessage m=objectMapper.readValue(value, >> >>>>>>> FinalJsonMessage.class); >> >>>>>>> System.out.println("Json string:: ------"+m); >> >>>>>>> //transformation logic >> >>>>>>> out.collect(value); >> >>>>>>> } >> >>>>>>> }) >> >>>>>>> .map(joinedStream -> >> >> joinedStream.addSink(kafkaProducer).name("Publish >> >>>>>>> to " + kafkaTopicName)); >> >>>>>>> [image: image.png] >> >>>>>>> Request your support on the same. >> >>>>>>> Regards, >> >>>>>>> Satya >> >>>>>>> >> >>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit < >> satyaadi...@gmail.com> >> >>>>>>> wrote: >> >>>>>>> >> >>>>>>>> Hi @ches...@apache.org <ches...@apache.org> , >> >>>>>>>> >> >>>>>>>> Thanks for your support, it was really helpful. >> >>>>>>>> Do you know the list of directories when you submit the job? >> [Yes we >> >>>>>>>> do have] >> >>>>>>>> The impletemation is progress and will get back to you if any >> >> further >> >>>>>>>> challenges we may face. >> >>>>>>>> Appreciate your support in this regard. >> >>>>>>>> >> >>>>>>>> Regards, >> >>>>>>>> Satya >> >>>>>>>> >> >>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit < >> satyaadi...@gmail.com> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> Thank you @Chesnay let me try this change . >> >>>>>>>>> >> >>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler < >> >> ches...@apache.org> >> >>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> You could also try using streams to make it a little more >> concise: >> >>>>>>>>>> >> >>>>>>>>>> directories.stream() >> >>>>>>>>>> .map(directory -> createInputStream(environment, >> directory)) >> >>>>>>>>>> .reduce(DataStream::union) >> >>>>>>>>>> .map(joinedStream -> joinedStream.addSink(kafka)); >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote: >> >>>>>>>>>> >> >>>>>>>>>> Do you know the list of directories when you submit the job? >> >>>>>>>>>> >> >>>>>>>>>> If so, then you can iterate over them, create a source for each >> >>>>>>>>>> directory, union them, and apply the sink to the union. >> >>>>>>>>>> >> >>>>>>>>>> private static >> >>>>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment >> >> environment, >> >>>>>>>>>> String directory) { >> >>>>>>>>>> TextInputFormat format =new TextInputFormat(new >> >>>>>>>>>> org.apache.flink.core.fs.Path(directory)); >> >>>>>>>>>> format.setNestedFileEnumeration(true); return >> >> environment.readFile(format, >> >>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1, >> >>>>>>>>>> FilePathFilter.createDefaultFilter()); } >> >>>>>>>>>> >> >>>>>>>>>> public static void runJob()throws Exception { >> >>>>>>>>>> StreamExecutionEnvironment environment = >> >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >> List<String> >> >>>>>>>>>> directories =getDirectories(); DataStream<String> joinedStreams >> >> =null; for >> >>>>>>>>>> (String directory : directories) { >> >>>>>>>>>> DataStream<String> inputStream >> >> =createInputStream(environment, >> >>>>>>>>>> directory); if (joinedStreams ==null) { >> >>>>>>>>>> joinedStreams = inputStream; }else { >> >>>>>>>>>> joinedStreams.union(inputStream); } >> >>>>>>>>>> } >> >>>>>>>>>> // add a sanity check that there was at least 1 directory >> >>>>>>>>>> >> >>>>>>>>>> joinedStreams.addSink(kafka); } >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote: >> >>>>>>>>>> >> >>>>>>>>>> Hi Guys, >> >>>>>>>>>> >> >>>>>>>>>> Got stuck with it please help me here >> >>>>>>>>>> Regards, >> >>>>>>>>>> Satya >> >>>>>>>>>> >> >>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit >> >>>>>>>>>> <satyaadi...@gmail.com> <satyaadi...@gmail.com> wrote: >> >>>>>>>>>> >> >>>>>>>>>> Hi Guys, >> >>>>>>>>>> >> >>>>>>>>>> Sorry to bother you again, but someone could help me here? Any >> >> help >> >>>>>>>>>> in >> >>>>>>>>>> this regard will be much appreciated. >> >>>>>>>>>> >> >>>>>>>>>> Regards, >> >>>>>>>>>> Satya >> >>>>>>>>>> >> >>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit < >> >> satyaadi...@gmail.com> >> >>>>>>>>>> <satyaadi...@gmail.com> >> >>>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>> Hi Guys, >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> I need one help, any leads will be highly appreciated.I have >> >> written >> >>>>>>>>>> a >> >>>>>>>>>> flink streaming job to read the data from s3 bucket and push to >> >>>>>>>>>> kafka. >> >>>>>>>>>> Below is the working source that deal with single s3 path: >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> TextInputFormat format = new TextInputFormat(new >> >>>>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/")); >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> format.setNestedFileEnumeration(true); >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> DataStream<String> inputStream = environment.readFile(format, >> >>>>>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, >> >> -1, >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> FilePathFilter.createDefaultFilter()); >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> inputStream.addSink(kafka); >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> But my requirement is get the list of paths and pass them one >> by >> >> one >> >>>>>>>>>> to >> >>>>>>>>>> this environment.readFile() method.How we can achieve this. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Thanks, >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Satya >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> -- >> >>>>>>>>>> -------------------------- >> >>>>>>>>>> Best Regards >> >>>>>>>>>> Satya Prakash >> >>>>>>>>>> (M)+91-9845111913 >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> -- >> >>>>>>>>>> >> >>>>>>>>>> -------------------------- >> >>>>>>>>>> Best Regards >> >>>>>>>>>> Satya Prakash >> >>>>>>>>>> (M)+91-9845111913 >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> -- >> >>>>>>>>> -------------------------- >> >>>>>>>>> Best Regards >> >>>>>>>>> Satya Prakash >> >>>>>>>>> (M)+91-9845111913 >> >>>>>>>>> >> >>>>>>>> -- >> >>>>>>>> -------------------------- >> >>>>>>>> Best Regards >> >>>>>>>> Satya Prakash >> >>>>>>>> (M)+91-9845111913 >> >>>>>>>> >> >>>>>>> -- >> >>>>>>> -------------------------- >> >>>>>>> Best Regards >> >>>>>>> Satya Prakash >> >>>>>>> (M)+91-9845111913 >> >>>>>>> >> >>>>>> -- >> >>>>>> -------------------------- >> >>>>>> Best Regards >> >>>>>> Satya Prakash >> >>>>>> (M)+91-9845111913 >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>> -- >> >>>>> -------------------------- >> >>>>> Best Regards >> >>>>> Satya Prakash >> >>>>> (M)+91-9845111913 >> >>>>> >> >>>> -- >> >>>> -------------------------- >> >>>> Best Regards >> >>>> Satya Prakash >> >>>> (M)+91-9845111913 >> >>>> >> >> >> >> > > -- > -------------------------- > Best Regards > Satya Prakash > (M)+91-9845111913 > -- -------------------------- Best Regards Satya Prakash (M)+91-9845111913