I'm implementing an exponential backoff inside a custom sink that uses an AvroParquetWriter to write to S3. I've change the number of attempts to 0 inside the core-site.xml, and I'm capturing the timeout exception, doing a Thread.sleep for X seconds. This is working as intended, and when S3 is offline, it waits until it is online.
I also want to test that the back pressure and the checkpoints are working as intended, and for the first one, I can see the back pressure in Flink UI going up, and recover as expected and not reading more data from Kafka. For the checkpoints, and I've added inside the sink invoke function a randomly exception (1 in 100, to simulate that a problem has happen, and need to recover from the last good checkpoint), but something strange happens. I can see the job is being canceled and created again, and running fine, other times after a X number of times of being created and canceled, it gives a *NoClassDefFoundError*, and it will keep giving that forever. Do you guys have any thoughts? org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: TimerException{java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeParserBucket} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) ... 7 more Caused by: java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeParserBucket at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825) at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) at com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78) at com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553) at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535) at com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.util.Try$.apply(Try.scala:209) at com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54) at com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45) at com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ... 7 more