Hi David, this seems to be a bug in our s3 plugin. The joda dependency should be bundled there.
Are you using s3 as a plugin by any chance? Which flink version are you using? If you are using s3 as a plugin, you could put joda in your plugin folder like this flink-dist ├── conf ├── lib ... └── plugins └── s3 ├── joda.jar └── flink-s3-fs-hadoop.jar If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that. Adding joda to your user code will unfortunately not work. Best, Arvid On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <speeddra...@gmail.com> wrote: > Hi Andrey, thanks for your reply. > > The class is on the jar created with `*sbt assembly*` that is > submitted to Flink to start a Job. > > unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket > 1649 05-27-2016 10:24 > org/joda/time/format/DateTimeParserBucket$SavedField.class > 1984 05-27-2016 10:24 > org/joda/time/format/DateTimeParserBucket$SavedState.class > 8651 05-27-2016 10:24 > org/joda/time/format/DateTimeParserBucket.class > > Shouldn't this be enough ? > > I think it uses is when nothing happens, but as soon it have some > exceptions, looks like it "forgets" it. > > Like I said before, this is kind of intermittent. > > Thanks, > David > > On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <azagre...@apache.org> > wrote: > >> Hi David, >> >> This looks like a problem with resolution of maven dependencies or >> something. >> The custom WindowParquetGenericRecordListFileSink probably transitively >> depends on org/joda/time/format/DateTimeParserBucket >> and it is missing on the runtime classpath of Flink. >> >> Best, >> Andrey >> >> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <speeddra...@gmail.com> >> wrote: >> >>> I'm implementing an exponential backoff inside a custom sink that uses >>> an AvroParquetWriter to write to S3. I've change the number of attempts to >>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing >>> a Thread.sleep for X seconds. This is working as intended, and when S3 is >>> offline, it waits until it is online. >>> >>> I also want to test that the back pressure and the checkpoints are >>> working as intended, and for the first one, I can see the back pressure in >>> Flink UI going up, and recover as expected and not reading more data from >>> Kafka. >>> >>> For the checkpoints, and I've added inside the sink invoke function a >>> randomly exception (1 in 100, to simulate that a problem has happen, and >>> need to recover from the last good checkpoint), but something strange >>> happens. I can see the job is being canceled and created again, and running >>> fine, other times after a X number of times of being created and canceled, >>> it gives a *NoClassDefFoundError*, and it will keep giving that >>> forever. >>> >>> Do you guys have any thoughts? >>> >>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught >>> exception while processing timer. >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) >>> at >>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: TimerException{java.lang.NoClassDefFoundError: >>> org/joda/time/format/DateTimeParserBucket} >>> at >>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) >>> ... 7 more >>> Caused by: java.lang.NoClassDefFoundError: >>> org/joda/time/format/DateTimeParserBucket >>> at >>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825) >>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) >>> at >>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78) >>> at >>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) >>> at >>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) >>> at >>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) >>> at >>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) >>> at >>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) >>> at >>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) >>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904) >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553) >>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) >>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) >>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) >>> at >>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81) >>> at >>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246) >>> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) >>> at >>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535) >>> at >>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59) >>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) >>> at scala.util.Try$.apply(Try.scala:209) >>> at >>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54) >>> at >>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18) >>> at >>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>> at >>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45) >>> at >>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) >>> at >>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >>> at >>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) >>> ... 7 more >>> >>> >>> >> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <speeddra...@gmail.com> >> wrote: >> >>> I'm implementing an exponential backoff inside a custom sink that uses >>> an AvroParquetWriter to write to S3. I've change the number of attempts to >>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing >>> a Thread.sleep for X seconds. This is working as intended, and when S3 is >>> offline, it waits until it is online. >>> >>> I also want to test that the back pressure and the checkpoints are >>> working as intended, and for the first one, I can see the back pressure in >>> Flink UI going up, and recover as expected and not reading more data from >>> Kafka. >>> >>> For the checkpoints, and I've added inside the sink invoke function a >>> randomly exception (1 in 100, to simulate that a problem has happen, and >>> need to recover from the last good checkpoint), but something strange >>> happens. I can see the job is being canceled and created again, and running >>> fine, other times after a X number of times of being created and canceled, >>> it gives a *NoClassDefFoundError*, and it will keep giving that >>> forever. >>> >>> Do you guys have any thoughts? >>> >>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught >>> exception while processing timer. >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) >>> at >>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: TimerException{java.lang.NoClassDefFoundError: >>> org/joda/time/format/DateTimeParserBucket} >>> at >>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) >>> ... 7 more >>> Caused by: java.lang.NoClassDefFoundError: >>> org/joda/time/format/DateTimeParserBucket >>> at >>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825) >>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) >>> at >>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78) >>> at >>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) >>> at >>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) >>> at >>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) >>> at >>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) >>> at >>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) >>> at >>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) >>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904) >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553) >>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) >>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) >>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) >>> at >>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81) >>> at >>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246) >>> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) >>> at >>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535) >>> at >>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59) >>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) >>> at scala.util.Try$.apply(Try.scala:209) >>> at >>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54) >>> at >>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18) >>> at >>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>> at >>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45) >>> at >>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) >>> at >>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >>> at >>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) >>> ... 7 more >>> >>> >>> >>>