Sebastian Puzoń created KAFKA-7531: --------------------------------------
Summary: NPE NullPointerException at TransactionCoordinator handleEndTransaction Key: KAFKA-7531 URL: https://issues.apache.org/jira/browse/KAFKA-7531 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 2.0.0 Reporter: Sebastian Puzoń Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper. Streams Application 4 instances, each has 5 Streams threads, total 20 stream threads. I observe NPE NullPointerException at coordinator broker which causes all application stream threads shutdown, here's stack from broker: {code:java} [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe in group elo g_agg has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance group elog_agg with old generation 49 (__consumer_offsets-21) (kafka.coordinator.gro up.GroupCoordinator) [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group elog_agg generation 50 (__consumer_offsets-21) (kafka.coordinator.group.GroupCoordinator) [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from leader for group elog_agg for generation 50 (kafka.coordinator.group.GroupCoordina tor) [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on partition _ _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator) [ [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr ue} (kafka.server.KafkaApis) java.lang.NullPointerException at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383) at scala.util.Either$RightProjection.flatMap(Either.scala:702) at kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437) at kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581) at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619) at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619) at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110) at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495) at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590) at kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437) at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653) at kafka.server.KafkaApis.handle(KafkaApis.scala:132) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:745) [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true} (kafka.server.KafkaApis) java.lang.NullPointerException at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383) at scala.util.Either$RightProjection.flatMap(Either.scala:702) at kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437) at kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581) at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619) at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619) at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110) at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495) at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590) at kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437) at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653) at kafka.server.KafkaApis.handle(KafkaApis.scala:132) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:745) [2018-10-22 21:52:27,531] INFO [GroupCoordinator 2]: Member elog_agg-client-sswvlp6804-StreamThread-4-consumer-ae1f00c2-7c2c-4f8e-bed4-20a955ecc122 in group elog_agg has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator){code} On the application side I can see such stack trace: {code:java} 2018-10-22 21:52:15 AssignedStreamsTasks [ERROR] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Failed to commit stream task 0_9 due to the following error: org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:745) 2018-10-22 21:52:15 StreamThread [INFO] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN 2018-10-22 21:52:15 StreamThread [INFO] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Shutting down 2018-10-22 21:52:15 KafkaProducer [INFO] [Producer clientId=elog_agg-client-sswvlp6802-StreamThread-4-0_17-producer, transactionalId=elog_agg-0_17] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2018-10-22 21:52:16 AssignedStreamsTasks [ERROR] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Failed while closing StreamTask 0_9 due to the following error: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:563) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) Caused by: org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907) {code} This way all streams application threads are being shutdown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)