Hi Arvid, I use a docker image. Here is the Dockerfile: FROM flink:1.9.1-scala_2.12
RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar /opt/flink/plugins/flink-s3-fs-hadoop/ Please let me know if you need more information. On Wed, Feb 12, 2020 at 9:15 PM Arvid Heise <ar...@ververica.com> wrote: > Hi David, > > can you double-check the folder structure of your plugin? It should reside > in its own subfolder. Here is an example. > > flink-dist > ├── conf > ├── lib > ... > └── plugins > └── s3 > └── flink-s3-fs-hadoop.jar > > I will investigate your error deeply in the next few days but I'd like to > have a final confirmation about the folder structure. > > > On Wed, Feb 12, 2020 at 8:56 PM David Magalhães <speeddra...@gmail.com> > wrote: > >> Hi Robert, I couldn't found any previous mention before the >> NoClassDefFoundError. >> Here is the full log [1] if you want to look for something more specific. >> >> [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0 >> >> On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> According to this answer [1] the first exception "mentioning" >>> org/joda/time/format/DateTimeParserBucket should be a different one. >>> Can you go through the logs to make sure it is really a >>> ClassNotFoundException, and not a ExceptionInInitializerError or something >>> else? >>> >>> [1]https://stackoverflow.com/a/5756989/568695 >>> >>> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <speeddra...@gmail.com> >>> wrote: >>> >>>> Hi Arvid, >>>> >>>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said >>>> previously, this works normally until an exception is throw inside the >>>> sink. It will try to recover again, but sometimes doesn't recover giving >>>> this error. >>>> >>>> To write to S3 I use *AvroParquetWriter* with the following code: >>>> >>>> val writer = AvroParquetWriter >>>> .builder[GenericRecord](new Path(finalFilePath)) >>>> >>>> *Path* is from *org.apache.hadoop.fs*, the other option is to use* >>>> org.apache.flink.formats.parquet.StreamOutputFile >>>> *which will use flink S3 plugin, right ? Not sure how can I convert >>>> from Path to StreamOuputFile. I know that when I've used StreamingFileSink, >>>> I used StreamOuputFile. >>>> >>>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com> >>>> wrote: >>>> >>>>> Hi David, >>>>> >>>>> upon closer reviewing your stacktrace, it seems like you are trying to >>>>> access S3 without our S3 plugin. That's in general not recommended at all. >>>>> >>>>> Best, >>>>> >>>>> Arvid >>>>> >>>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi David, >>>>>> >>>>>> this seems to be a bug in our s3 plugin. The joda dependency should >>>>>> be bundled there. >>>>>> >>>>>> Are you using s3 as a plugin by any chance? Which flink version are >>>>>> you using? >>>>>> >>>>>> If you are using s3 as a plugin, you could put joda in your plugin >>>>>> folder like this >>>>>> >>>>>> flink-dist >>>>>> ├── conf >>>>>> ├── lib >>>>>> ... >>>>>> └── plugins >>>>>> └── s3 >>>>>> ├── joda.jar >>>>>> └── flink-s3-fs-hadoop.jar >>>>>> >>>>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into >>>>>> that. >>>>>> >>>>>> Adding joda to your user code will unfortunately not work. >>>>>> >>>>>> Best, >>>>>> >>>>>> Arvid >>>>>> >>>>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães < >>>>>> speeddra...@gmail.com> wrote: >>>>>> >>>>>>> Hi Andrey, thanks for your reply. >>>>>>> >>>>>>> The class is on the jar created with `*sbt assembly*` that is >>>>>>> submitted to Flink to start a Job. >>>>>>> >>>>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep >>>>>>> DateTimeParserBucket >>>>>>> 1649 05-27-2016 10:24 >>>>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class >>>>>>> 1984 05-27-2016 10:24 >>>>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class >>>>>>> 8651 05-27-2016 10:24 >>>>>>> org/joda/time/format/DateTimeParserBucket.class >>>>>>> >>>>>>> Shouldn't this be enough ? >>>>>>> >>>>>>> I think it uses is when nothing happens, but as soon it have some >>>>>>> exceptions, looks like it "forgets" it. >>>>>>> >>>>>>> Like I said before, this is kind of intermittent. >>>>>>> >>>>>>> Thanks, >>>>>>> David >>>>>>> >>>>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <azagre...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi David, >>>>>>>> >>>>>>>> This looks like a problem with resolution of maven dependencies or >>>>>>>> something. >>>>>>>> The custom WindowParquetGenericRecordListFileSink probably >>>>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket >>>>>>>> and it is missing on the runtime classpath of Flink. >>>>>>>> >>>>>>>> Best, >>>>>>>> Andrey >>>>>>>> >>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães < >>>>>>>> speeddra...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I'm implementing an exponential backoff inside a custom sink that >>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of >>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout >>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as >>>>>>>>> intended, >>>>>>>>> and when S3 is offline, it waits until it is online. >>>>>>>>> >>>>>>>>> I also want to test that the back pressure and the checkpoints are >>>>>>>>> working as intended, and for the first one, I can see the back >>>>>>>>> pressure in >>>>>>>>> Flink UI going up, and recover as expected and not reading more data >>>>>>>>> from >>>>>>>>> Kafka. >>>>>>>>> >>>>>>>>> For the checkpoints, and I've added inside the sink invoke >>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem >>>>>>>>> has >>>>>>>>> happen, and need to recover from the last good checkpoint), but >>>>>>>>> something >>>>>>>>> strange happens. I can see the job is being canceled and created >>>>>>>>> again, and >>>>>>>>> running fine, other times after a X number of times of being created >>>>>>>>> and >>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep >>>>>>>>> giving that forever. >>>>>>>>> >>>>>>>>> Do you guys have any thoughts? >>>>>>>>> >>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: >>>>>>>>> Caught exception while processing timer. >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) >>>>>>>>> at >>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError: >>>>>>>>> org/joda/time/format/DateTimeParserBucket} >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) >>>>>>>>> ... 7 more >>>>>>>>> Caused by: java.lang.NoClassDefFoundError: >>>>>>>>> org/joda/time/format/DateTimeParserBucket >>>>>>>>> at >>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825) >>>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) >>>>>>>>> at >>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) >>>>>>>>> at >>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) >>>>>>>>> at >>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) >>>>>>>>> at >>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) >>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) >>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) >>>>>>>>> at >>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81) >>>>>>>>> at >>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246) >>>>>>>>> at >>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) >>>>>>>>> at >>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59) >>>>>>>>> at >>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) >>>>>>>>> at scala.util.Try$.apply(Try.scala:209) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) >>>>>>>>> ... 7 more >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães < >>>>>>>> speeddra...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I'm implementing an exponential backoff inside a custom sink that >>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of >>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout >>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as >>>>>>>>> intended, >>>>>>>>> and when S3 is offline, it waits until it is online. >>>>>>>>> >>>>>>>>> I also want to test that the back pressure and the checkpoints are >>>>>>>>> working as intended, and for the first one, I can see the back >>>>>>>>> pressure in >>>>>>>>> Flink UI going up, and recover as expected and not reading more data >>>>>>>>> from >>>>>>>>> Kafka. >>>>>>>>> >>>>>>>>> For the checkpoints, and I've added inside the sink invoke >>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem >>>>>>>>> has >>>>>>>>> happen, and need to recover from the last good checkpoint), but >>>>>>>>> something >>>>>>>>> strange happens. I can see the job is being canceled and created >>>>>>>>> again, and >>>>>>>>> running fine, other times after a X number of times of being created >>>>>>>>> and >>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep >>>>>>>>> giving that forever. >>>>>>>>> >>>>>>>>> Do you guys have any thoughts? >>>>>>>>> >>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: >>>>>>>>> Caught exception while processing timer. >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) >>>>>>>>> at >>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError: >>>>>>>>> org/joda/time/format/DateTimeParserBucket} >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) >>>>>>>>> ... 7 more >>>>>>>>> Caused by: java.lang.NoClassDefFoundError: >>>>>>>>> org/joda/time/format/DateTimeParserBucket >>>>>>>>> at >>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825) >>>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) >>>>>>>>> at >>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) >>>>>>>>> at >>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) >>>>>>>>> at >>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) >>>>>>>>> at >>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050) >>>>>>>>> at >>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) >>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) >>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) >>>>>>>>> at >>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81) >>>>>>>>> at >>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246) >>>>>>>>> at >>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) >>>>>>>>> at >>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59) >>>>>>>>> at >>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) >>>>>>>>> at scala.util.Try$.apply(Try.scala:209) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45) >>>>>>>>> at >>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) >>>>>>>>> ... 7 more >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>