Hi David,

can you double-check the folder structure of your plugin? It should reside
in its own subfolder. Here is an example.

flink-dist
├── conf
├── lib
...
└── plugins
    └── s3
        └── flink-s3-fs-hadoop.jar

I will investigate your error deeply in the next few days but I'd like to
have a final confirmation about the folder structure.


On Wed, Feb 12, 2020 at 8:56 PM David Magalhães <speeddra...@gmail.com>
wrote:

> Hi Robert, I couldn't found any previous mention before the
> NoClassDefFoundError.
> Here is the full log [1] if you want to look for something more specific.
>
> [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0
>
> On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> According to this answer [1] the first exception "mentioning"
>> org/joda/time/format/DateTimeParserBucket should be a different one. Can
>> you go through the logs to make sure it is really a ClassNotFoundException,
>> and not a ExceptionInInitializerError or something else?
>>
>> [1]https://stackoverflow.com/a/5756989/568695
>>
>> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <speeddra...@gmail.com>
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
>>> previously, this works normally until an exception is throw inside the
>>> sink. It will try to recover again, but sometimes doesn't recover giving
>>> this error.
>>>
>>> To write to S3 I use *AvroParquetWriter* with the following code:
>>>
>>> val writer = AvroParquetWriter
>>>          .builder[GenericRecord](new Path(finalFilePath))
>>>
>>> *Path* is from *org.apache.hadoop.fs*, the other option is to use* 
>>> org.apache.flink.formats.parquet.StreamOutputFile
>>> *which will use flink S3 plugin, right ? Not sure how can I convert
>>> from Path to StreamOuputFile. I know that when I've used StreamingFileSink,
>>> I used StreamOuputFile.
>>>
>>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> upon closer reviewing your stacktrace, it seems like you are trying to
>>>> access S3 without our S3 plugin. That's in general not recommended at all.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> this seems to be a bug in our s3 plugin. The joda dependency should be
>>>>> bundled there.
>>>>>
>>>>> Are you using s3 as a plugin by any chance? Which flink version are
>>>>> you using?
>>>>>
>>>>> If you are using s3 as a plugin, you could put joda in your plugin
>>>>> folder like this
>>>>>
>>>>> flink-dist
>>>>> ├── conf
>>>>> ├── lib
>>>>> ...
>>>>> └── plugins
>>>>>     └── s3
>>>>>         ├── joda.jar
>>>>>         └── flink-s3-fs-hadoop.jar
>>>>>
>>>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into
>>>>> that.
>>>>>
>>>>> Adding joda to your user code will unfortunately not work.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <speeddra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Andrey, thanks for your reply.
>>>>>>
>>>>>> The class is on the jar created with `*sbt assembly*` that is
>>>>>> submitted to Flink to start a Job.
>>>>>>
>>>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep
>>>>>> DateTimeParserBucket
>>>>>>      1649  05-27-2016 10:24
>>>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>>>>>      1984  05-27-2016 10:24
>>>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>>>>>      8651  05-27-2016 10:24
>>>>>> org/joda/time/format/DateTimeParserBucket.class
>>>>>>
>>>>>> Shouldn't this be enough ?
>>>>>>
>>>>>> I think it uses is when nothing happens, but as soon it have some
>>>>>> exceptions, looks like it "forgets" it.
>>>>>>
>>>>>> Like I said before, this is kind of intermittent.
>>>>>>
>>>>>> Thanks,
>>>>>> David
>>>>>>
>>>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <azagre...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> This looks like a problem with resolution of maven dependencies or
>>>>>>> something.
>>>>>>> The custom WindowParquetGenericRecordListFileSink probably
>>>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket
>>>>>>> and it is missing on the runtime classpath of Flink.
>>>>>>>
>>>>>>> Best,
>>>>>>> Andrey
>>>>>>>
>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>> speeddra...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as 
>>>>>>>> intended,
>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>
>>>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>>>> working as intended, and for the first one, I can see the back 
>>>>>>>> pressure in
>>>>>>>> Flink UI going up, and recover as expected and not reading more data 
>>>>>>>> from
>>>>>>>> Kafka.
>>>>>>>>
>>>>>>>> For the checkpoints, and I've added inside the sink invoke function
>>>>>>>> a randomly exception (1 in 100, to simulate that a problem has happen, 
>>>>>>>> and
>>>>>>>> need to recover from the last good checkpoint), but something strange
>>>>>>>> happens. I can see the job is being canceled and created again, and 
>>>>>>>> running
>>>>>>>> fine, other times after a X number of times of being created and 
>>>>>>>> canceled,
>>>>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>>>>> forever.
>>>>>>>>
>>>>>>>> Do you guys have any thoughts?
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>> Caught exception while processing timer.
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>> at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>> ... 7 more
>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>> at
>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>> at
>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>> ... 7 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>> speeddra...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as 
>>>>>>>> intended,
>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>
>>>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>>>> working as intended, and for the first one, I can see the back 
>>>>>>>> pressure in
>>>>>>>> Flink UI going up, and recover as expected and not reading more data 
>>>>>>>> from
>>>>>>>> Kafka.
>>>>>>>>
>>>>>>>> For the checkpoints, and I've added inside the sink invoke function
>>>>>>>> a randomly exception (1 in 100, to simulate that a problem has happen, 
>>>>>>>> and
>>>>>>>> need to recover from the last good checkpoint), but something strange
>>>>>>>> happens. I can see the job is being canceled and created again, and 
>>>>>>>> running
>>>>>>>> fine, other times after a X number of times of being created and 
>>>>>>>> canceled,
>>>>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>>>>> forever.
>>>>>>>>
>>>>>>>> Do you guys have any thoughts?
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>> Caught exception while processing timer.
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>> at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>> ... 7 more
>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>> at
>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>> at
>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>> ... 7 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to