[ https://issues.apache.org/jira/browse/FLINK-29492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17627690#comment-17627690 ]
Fabian Paul commented on FLINK-29492: ------------------------------------- [~ruanhang1993] do you think this issue is blocking the 1.15.3 release? > Kafka exactly-once sink causes OutOfMemoryError > ----------------------------------------------- > > Key: FLINK-29492 > URL: https://issues.apache.org/jira/browse/FLINK-29492 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.15.2 > Reporter: Robert Metzger > Assignee: Hang Ruan > Priority: Critical > > My Kafka exactly-once sinks are periodically failing with a > {{OutOfMemoryError: Java heap space}}. > This looks very similar to FLINK-28250. But I am running 1.15.2, which > contains a fix for FLINK-28250. > Exception: > {code:java} > java.io.IOException: Could not perform checkpoint 2281 for operator > http_events[3]: Writer (1/1)#1. > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493) > at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74) > at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 2281 for operator http_events[3]: Writer (1/1)#1. Failure > reason: Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198) > ... 22 more > Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka > producer > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303) > at > org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.getOrCreateTransactionalProducer(KafkaWriter.java:327) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.getTransactionalProducer(KafkaWriter.java:315) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.snapshotState(KafkaWriter.java:227) > at > org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.snapshotState(StatefulSinkWriterStateHandler.java:124) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.snapshotState(SinkWriterOperator.java:152) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222) > ... 33 more > Caused by: org.apache.kafka.common.KafkaException: > java.lang.OutOfMemoryError: Java heap space > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) > at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) > at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) > at > org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429) > ... 43 more > Caused by: java.lang.OutOfMemoryError: Java heap space > at java.base/java.io.BufferedInputStream.fill(Unknown Source) > at java.base/java.io.BufferedInputStream.read1(Unknown Source) > at java.base/java.io.BufferedInputStream.read(Unknown Source) > at java.base/java.io.DataInputStream.read(Unknown Source) > at java.base/java.io.InputStream.readNBytes(Unknown Source) > at java.base/sun.security.util.IOUtils.readExactlyNBytes(Unknown Source) > at java.base/sun.security.provider.JavaKeyStore.engineLoad(Unknown > Source) > at java.base/sun.security.util.KeyStoreDelegator.engineLoad(Unknown > Source) > at java.base/java.security.KeyStore.load(Unknown Source) > at > org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:374) > at > org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:349) > at > org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createTruststore(DefaultSslEngineFactory.java:322) > at > org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:168) > at > org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:138) > at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:180) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) > at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) > at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) > at > org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303) > at > org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.getOrCreateTransactionalProducer(KafkaWriter.java:327) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.getTransactionalProducer(KafkaWriter.java:315) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.snapshotState(KafkaWriter.java:227) > at > org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.snapshotState(StatefulSinkWriterStateHandler.java:124) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.snapshotState(SinkWriterOperator.java:152) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) > {code} > What I'm observing is that affected TaskManagers have a high number of > {{kafka-producer-network-thread}} (350+ after some time). It seems that the > Kafka exactly-once sink is still leaking memory. -- This message was sent by Atlassian Jira (v8.20.10#820010)