> > Regarding the try catch block
Sorry I meant the try catch in SensorMessageToSensorTimeSeriesFunction. Also, just to be clear, does disabling restart make it easier for you to > debug? > Yes the log will be quite small then. Currently, it's just repeating the same things a couple of times. Btw if you also have a second taskmanager, that log would be even more interesting. So best to attach all logs (JM + TMs). On Thu, Jan 28, 2021 at 4:24 PM Marco Villalobos <mvillalo...@kineteque.com> wrote: > Regarding the try catch block, it rethrows the exception. Here is the > code: > > catch (RuntimeException e) { > logger.error("Error in timer.", e); > throw e; > } > > That would be okay, right? > > Also, just to be clear, does disabling restart make it easier for you to > debug? > > On Thu, Jan 28, 2021 at 1:17 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Marco, >> >> In general, sending a compressed log to ML is totally fine. You can >> further minimize the log by disabling restarts. >> I looked into the logs that you provided. >> >> 2021-01-26 04:37:43,280 INFO org.apache.flink.runtime.taskmanager.Task >>> [] - Attempting to cancel task forward fill -> (Sink: tag >>> db sink, Sink: back fill db sink, Sink: min max step db sink) (2/2) >>> (8c1f256176fb89f112c27883350a02bc). >>> 2021-01-26 04:37:43,280 INFO org.apache.flink.runtime.taskmanager.Task >>> [] - forward fill -> (Sink: tag db sink, Sink: back fill >>> db sink, Sink: min max step db sink) (2/2) >>> (8c1f256176fb89f112c27883350a02bc) switched from RUNNING to CANCELING. >>> 2021-01-26 04:37:43,280 INFO org.apache.flink.runtime.taskmanager.Task >>> [] - Triggering cancellation of task code forward fill >>> -> (Sink: tag db sink, Sink: back fill db sink, Sink: min max step db sink) >>> (2/2) (8c1f256176fb89f112c27883350a02bc). >>> 2021-01-26 04:37:43,282 ERROR >>> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction [] >>> - Error in timer. >>> java.lang.RuntimeException: Buffer pool is destroyed. >>> >> >> I can see that my suspicion is most likely correct: It first tries to >> cancel the task for some reason and then a later timer will show you the >> respective error. I created the ticket to resolve the issue [1]. There may >> also be an issue of swalled interruption exceptions, which we are looking >> into in parallel. >> >> However, there is a reason why the task is canceling in the first place >> and we need to find that. I recommend to not have a try-catch block around >> *collector.collect* in *ForwardFillKeyedProcessFunction*. Just have it >> around your user code but not around system calls. This may swallow the >> real cause. >> >> Are you executing the code in IDE? You may be able to set some >> breakpoints to quickly figure out what's going wrong (I can help then). >> >> [1] https://issues.apache.org/jira/browse/FLINK-21181 >> >> On Wed, Jan 27, 2021 at 8:54 AM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Marco, >>> >>> could you share your full task manager and job manager log? We >>> double-checked and saw that the buffer pool is only released on >>> cancellation or shutdown. >>> >>> So I'm assuming that there is another issue (e.g., Kafka cluster not >>> reachable) and there is a race condition while shutting down. It seems like >>> the buffer pool exception is shadowing the actual cause then for yet >>> unknown reasons (this is an issue on its own, but you should be able to see >>> the actual issue in task manager log). >>> >>> Best, >>> >>> Arvid >>> >>> On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos < >>> mvillalo...@kineteque.com> wrote: >>> >>>> Actually, the log I sent in my previous message, shows the only error >>>> that occurred before the buffer pool was destroyed. That >>>> intermittent warning: >>>> >>>> 2021-01-26 04:14:33,140 WARN >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - >>>> Committing offsets to Kafka takes longer than the checkpoint interval. >>>> Skipping commit of previous offsets because newer complete checkpoint >>>> offsets are available. This does not compromise Flink's checkpoint >>>> integrity. >>>> 2021-01-26 04:14:33,143 INFO >>>> org.apache.kafka.clients.FetchSessionHandler [] - >>>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch >>>> request (sessionId=936633685, epoch=1) to node 2: {}. >>>> org.apache.kafka.common.errors.DisconnectException: null >>>> >>>> I know that probably doesn't help much. Sorry. >>>> >>>> On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise <ar...@apache.org> wrote: >>>> >>>>> Hi Marco, >>>>> >>>>> the network buffer pool is destroyed when the task manager is >>>>> shutdown. Could you check if you have an error before that in your log? >>>>> >>>>> It seems like the timer is triggered at a point where it shouldn't. >>>>> I'll check if there is a known issue that has been fixed in later >>>>> versions. >>>>> Do you have the option to upgrade to 1.11.3? >>>>> >>>>> Best, >>>>> >>>>> Arvid >>>>> >>>>> On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos < >>>>> mvillalo...@kineteque.com> wrote: >>>>> >>>>>> Hi. What causes a buffer pool exception? How can I mitigate it? It >>>>>> is causing us plenty of problems right now. >>>>>> >>>>>> 2021-01-26 04:14:33,041 INFO >>>>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - >>>>>> Subtask 1 received completion notification for checkpoint with id=4. >>>>>> 2021-01-26 04:14:33,140 WARN >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - >>>>>> Committing offsets to Kafka takes longer than the checkpoint interval. >>>>>> Skipping commit of previous offsets because newer complete checkpoint >>>>>> offsets are available. This does not compromise Flink's checkpoint >>>>>> integrity. >>>>>> 2021-01-26 04:14:33,143 INFO >>>>>> org.apache.kafka.clients.FetchSessionHandler [] - >>>>>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending >>>>>> fetch >>>>>> request (sessionId=936633685, epoch=1) to node 2: {}. >>>>>> org.apache.kafka.common.errors.DisconnectException: null >>>>>> 2021-01-26 04:14:33,146 INFO >>>>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - >>>>>> Subtask 1 checkpointing for checkpoint with id=5 (max part counter=1). >>>>>> >>>>>> THEN FINALLY >>>>>> >>>>>> ERROR >>>>>> ai.beyond.luminai.sensor.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction >>>>>> [] - Error in timer. >>>>>> java.lang.RuntimeException: Buffer pool is destroyed. >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> mypackage.MyOperator.collect(MyOperator.java:452) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at mypackage.MyOperator.onTimer(MyOperator.java:277) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onProcessingTime(KeyedProcessOperator.java:78) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1181) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) >>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282) >>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) >>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) >>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) >>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) >>>>>> [flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>>>> [develop-17e9fd0e.jar:?] >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>>>> [develop-17e9fd0e.jar:?] >>>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] >>>>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. >>>>>> at >>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:290) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) >>>>>> ~[develop-17e9fd0e.jar:?] >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) >>>>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0] >>>>>> >>>>>