[ https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678214#comment-16678214 ]
Sebastian Puzoń commented on KAFKA-7531: ---------------------------------------- Few more cents about application which had problem. It is streams application with session windowing (40min timeout). I took some time to investigate but I was running into exact same problem as described here: [https://lists.apache.org/thread.html/2b44e74eaec7172b107bcff96861cf8b4837f55a44714f69d033cc2e@%3Cusers.kafka.apache.org%3E] My data sources have big amount of unique sessions, TreeMap is not efficient to store such data per session hence I could observe increasing processing time and contention around TreeMap.get(). Not sure if there's ticket for this, if not I may create one with details. At the moment I have different application which uses 5min time windows, even if I break session timeouts settings between brokers & app I'm not able to replicate NPE in Broker. I will try to get back to previous application implementation which used Session windows and try replicate NPE. > NPE NullPointerException at TransactionCoordinator handleEndTransaction > ----------------------------------------------------------------------- > > Key: KAFKA-7531 > URL: https://issues.apache.org/jira/browse/KAFKA-7531 > Project: Kafka > Issue Type: Bug > Components: controller > Affects Versions: 2.0.0 > Reporter: Sebastian Puzoń > Priority: Critical > Fix For: 2.1.1, 2.0.2 > > > Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper. > Streams Application 4 instances, each has 5 Streams threads, total 20 stream > threads. > I observe NPE NullPointerException at coordinator broker which causes all > application stream threads shutdown, here's stack from broker: > {code:java} > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member > elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe > in group elo > g_agg has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance > group elog_agg with old generation 49 (__consumer_offsets-21) > (kafka.coordinator.gro > up.GroupCoordinator) > [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group > elog_agg generation 50 (__consumer_offsets-21) > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from > leader for group elog_agg for generation 50 > (kafka.coordinator.group.GroupCoordina > tor) > [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized > transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on > partition _ > _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator) > [ > [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request > {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr > ue} (kafka.server.KafkaApis) > java.lang.NullPointerException > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383) > at scala.util.Either$RightProjection.flatMap(Either.scala:702) > at > kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) > at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110) > at > kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) > at > kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590) > at > kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437) > at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653) > at kafka.server.KafkaApis.handle(KafkaApis.scala:132) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745) > [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request > {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true} > (kafka.server.KafkaApis) > java.lang.NullPointerException > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383) > at scala.util.Either$RightProjection.flatMap(Either.scala:702) > at > kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) > at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110) > at > kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) > at > kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590) > at > kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437) > at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653) > at kafka.server.KafkaApis.handle(KafkaApis.scala:132) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745) > [2018-10-22 21:52:27,531] INFO [GroupCoordinator 2]: Member > elog_agg-client-sswvlp6804-StreamThread-4-consumer-ae1f00c2-7c2c-4f8e-bed4-20a955ecc122 > in group elog_agg has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator){code} > > > On the application side I can see such stack trace: > > > {code:java} > 2018-10-22 21:52:15 AssignedStreamsTasks [ERROR] stream-thread > [elog_agg-client-sswvlp6802-StreamThread-4] Failed to commit stream task 0_9 > due to the following error: > org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: > The server experienced an unexpected error when processing the request > at > org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907) > at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) > at java.lang.Thread.run(Thread.java:745) > 2018-10-22 21:52:15 StreamThread [INFO] stream-thread > [elog_agg-client-sswvlp6802-StreamThread-4] State transition from RUNNING to > PENDING_SHUTDOWN > 2018-10-22 21:52:15 StreamThread [INFO] stream-thread > [elog_agg-client-sswvlp6802-StreamThread-4] Shutting down > 2018-10-22 21:52:15 KafkaProducer [INFO] [Producer > clientId=elog_agg-client-sswvlp6802-StreamThread-4-0_17-producer, > transactionalId=elog_agg-0_17] Closing the Kafka producer with timeoutMillis > = 9223372036854775807 ms. > 2018-10-22 21:52:16 AssignedStreamsTasks [ERROR] stream-thread > [elog_agg-client-sswvlp6802-StreamThread-4] Failed while closing StreamTask > 0_9 due to the following error: > org.apache.kafka.common.KafkaException: Cannot execute transactional method > because we are in an error state > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) > at > org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:563) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) > Caused by: org.apache.kafka.common.KafkaException: Unhandled error in > EndTxnResponse: The server experienced an unexpected error when processing > the request > at > org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907) > {code} > This way all streams application threads are being shutdown. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)