[ https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840543#comment-16840543 ]
Viktor Somogyi-Vass commented on KAFKA-8195: -------------------------------------------- [~grussell] I think I got the full picture on this error. So basically this is what captures the problem I observe: {noformat} [2019-05-15 17:22:32,737] INFO [TransactionCoordinator id=1] Initialized transactionalId txd- with producerId 4001 and producer epoch 2 on partition __transaction_state-21 (kafka.coordinator.transaction.TransactionCoordinator) [2019-05-15 17:22:32,738] TRACE [Transaction State Manager 1]: Appending new metadata TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1557933752736) for transaction id txd- with coordinator epoch 11 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager) [2019-05-15 17:22:32,738] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-3, correlationId=6) -- {transactional_id=txd-,transaction_timeout_ms=60000},response:{throttle_time_ms=0,error_code=0,producer_id=4001,producer_epoch=2} from connection 192.168.1.12:9091-192.168.1.12:50261-3;totalTime:2.592,requestQueueTime:0.105,localTime:2.272,remoteTime:0.0,throttleTime:0.152,responseQueueTime:0.068,sendTime:0.24,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:32,741] DEBUG Completed request:RequestHeader(apiKey=METADATA, apiVersion=8, clientId=producer-3, correlationId=7) -- {topics=[{name=so55510898d}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false},response:{throttle_time_ms=0,brokers=[{node_id=2,host=192.168.1.12,port=9092,rack=null},{node_id=3,host=192.168.1.12,port=9093,rack=null},{node_id=1,host=192.168.1.12,port=9091,rack=null}],cluster_id=NDxiWJjlSwCLOUcmi8ZKpQ,controller_id=1,topics=[{error_code=0,name=so55510898d,is_internal=false,partitions=[{error_code=0,partition_index=0,leader_id=1,leader_epoch=0,replica_nodes=[1],isr_nodes=[1],offline_replicas=[]}],topic_authorized_operations=0}],cluster_authorized_operations=0} from connection 192.168.1.12:9091-192.168.1.12:50261-3;totalTime:1.241,requestQueueTime:0.088,localTime:0.9,remoteTime:0.0,throttleTime:0.414,responseQueueTime:0.066,sendTime:0.215,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:32,743] DEBUG TransactionalId txd- prepare transition from Empty to TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(so55510898d-0), txnStartTimestamp=1557933752743, txnLastUpdateTimestamp=1557933752743) (kafka.coordinator.transaction.TransactionMetadata) [2019-05-15 17:22:32,745] DEBUG TransactionalId txd- complete transition from Empty to TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(so55510898d-0), txnStartTimestamp=1557933752743, txnLastUpdateTimestamp=1557933752743) (kafka.coordinator.transaction.TransactionMetadata) [2019-05-15 17:22:32,745] DEBUG [Transaction State Manager 1]: Updating txd-'s transaction state to TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(so55510898d-0), txnStartTimestamp=1557933752743, txnLastUpdateTimestamp=1557933752743) with coordinator epoch 11 for txd- succeeded (kafka.coordinator.transaction.TransactionStateManager) [2019-05-15 17:22:32,745] TRACE [Transaction State Manager 1]: Appending new metadata TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(so55510898d-0), txnStartTimestamp=1557933752743, txnLastUpdateTimestamp=1557933752743) for transaction id txd- with coordinator epoch 11 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager) [2019-05-15 17:22:32,746] DEBUG Completed request:RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=1, clientId=producer-3, correlationId=8) -- {transactional_id=txd-,producer_id=4001,producer_epoch=2,topics=[{topic=so55510898d,partitions=[0]}]},response:{throttle_time_ms=0,errors=[{topic=so55510898d,partition_errors=[{partition=0,error_code=0}]}]} from connection 192.168.1.12:9091-192.168.1.12:50261-3;totalTime:3.505,requestQueueTime:0.084,localTime:3.122,remoteTime:0.0,throttleTime:0.431,responseQueueTime:0.215,sendTime:0.346,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:32,754] DEBUG Completed request:RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=producer-3, correlationId=9) -- {acks=-1,timeout=30000,partitionSizes=[so55510898d-0=85]},response:{responses=[{topic=so55510898d,partition_responses=[{partition=0,error_code=10,base_offset=-1,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0} from connection 192.168.1.12:9091-192.168.1.12:50261-3;totalTime:4.895,requestQueueTime:0.252,localTime:4.42,remoteTime:0.0,throttleTime:0.49,responseQueueTime:0.103,sendTime:0.151,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:32,757] DEBUG TransactionalId txd- prepare transition from Ongoing to TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=PrepareAbort, topicPartitions=Set(so55510898d-0), txnStartTimestamp=1557933752743, txnLastUpdateTimestamp=1557933752756) (kafka.coordinator.transaction.TransactionMetadata) [2019-05-15 17:22:32,758] DEBUG TransactionalId txd- complete transition from Ongoing to TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=PrepareAbort, topicPartitions=Set(so55510898d-0), txnStartTimestamp=1557933752743, txnLastUpdateTimestamp=1557933752756) (kafka.coordinator.transaction.TransactionMetadata) [2019-05-15 17:22:32,758] DEBUG [Transaction State Manager 1]: Updating txd-'s transaction state to TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=PrepareAbort, topicPartitions=Set(so55510898d-0), txnStartTimestamp=1557933752743, txnLastUpdateTimestamp=1557933752756) with coordinator epoch 11 for txd- succeeded (kafka.coordinator.transaction.TransactionStateManager) [2019-05-15 17:22:32,758] DEBUG TransactionalId txd- prepare transition from PrepareAbort to TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), txnStartTimestamp=1557933752743, txnLastUpdateTimestamp=1557933752758) (kafka.coordinator.transaction.TransactionMetadata) [2019-05-15 17:22:32,758] TRACE [Transaction Marker Channel Manager 1]: Added marker TxnMarkerEntry{producerId=4001, producerEpoch=2, coordinatorEpoch=11, result=ABORT, partitions=[so55510898d-0]} for transactional id txd- to destination broker 1 (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2019-05-15 17:22:32,758] TRACE [Transaction State Manager 1]: Appending new metadata TxnTransitMetadata(producerId=4001, producerEpoch=2, txnTimeoutMs=60000, txnState=PrepareAbort, topicPartitions=Set(so55510898d-0), txnStartTimestamp=1557933752743, txnLastUpdateTimestamp=1557933752756) for transaction id txd- with coordinator epoch 11 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager) [2019-05-15 17:22:32,759] DEBUG Completed request:RequestHeader(apiKey=END_TXN, apiVersion=1, clientId=producer-3, correlationId=10) -- {transactional_id=txd-,producer_id=4001,producer_epoch=2,transaction_result=false},response:{throttle_time_ms=0,error_code=0} from connection 192.168.1.12:9091-192.168.1.12:50261-3;totalTime:2.159,requestQueueTime:0.078,localTime:2.069,remoteTime:0.0,throttleTime:0.09,responseQueueTime:0.21,sendTime:0.188,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:32,760] DEBUG Completed request:RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=0, clientId=broker-1-txn-marker-sender, correlationId=2) -- {transaction_markers=[{producer_id=4001,producer_epoch=2,transaction_result=false,topics=[{topic=so55510898d,partitions=[0]}],coordinator_epoch=11}]},response:{transaction_markers=[{producer_id=4001,topics=[{topic=so55510898d,partitions=[{partition=0,error_code=10}]}]}]} from connection 192.168.1.12:9091-192.168.1.12:50257-2;totalTime:1.122,requestQueueTime:0.071,localTime:0.878,remoteTime:0.0,throttleTime:0.0,responseQueueTime:0.058,sendTime:0.145,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:32,760] DEBUG [Transaction Marker Request Completion Handler 1]: Received WriteTxnMarker response ClientResponse(receivedTimeMs=1557933752760, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=0, clientId=broker-1-txn-marker-sender, correlationId=2), responseBody=org.apache.kafka.common.requests.WriteTxnMarkersResponse@5fb025c2) from node 1 with correlation id 2 (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler) [2019-05-15 17:22:32,761] ERROR [TransactionCoordinator id=1] Uncaught error in request completion: (org.apache.kafka.clients.NetworkClient) java.lang.IllegalStateException: Received fatal error org.apache.kafka.common.errors.RecordTooLargeException while sending txn marker for txd- at kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler.$anonfun$onComplete$14(TransactionMarkerRequestCompletionHandler.scala:143) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791) at kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler.$anonfun$onComplete$12(TransactionMarkerRequestCompletionHandler.scala:133) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) at kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler.$anonfun$onComplete$8(TransactionMarkerRequestCompletionHandler.scala:133) at kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler.$anonfun$onComplete$8$adapted(TransactionMarkerRequestCompletionHandler.scala:92) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler.onComplete(TransactionMarkerRequestCompletionHandler.scala:92) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89) [2019-05-15 17:22:32,879] DEBUG Completed request:RequestHeader(apiKey=API_VERSIONS, apiVersion=2, clientId=producer-4, correlationId=0) -- {},response:{error_code=0,api_versions=[{api_key=0,min_version=0,max_version=7},{api_key=1,min_version=0,max_version=10},{api_key=2,min_version=0,max_version=5},{api_key=3,min_version=0,max_version=8},{api_key=4,min_version=0,max_version=2},{api_key=5,min_version=0,max_version=1},{api_key=6,min_version=0,max_version=5},{api_key=7,min_version=0,max_version=2},{api_key=8,min_version=0,max_version=6},{api_key=9,min_version=0,max_version=5},{api_key=10,min_version=0,max_version=2},{api_key=11,min_version=0,max_version=5},{api_key=12,min_version=0,max_version=2},{api_key=13,min_version=0,max_version=2},{api_key=14,min_version=0,max_version=2},{api_key=15,min_version=0,max_version=3},{api_key=16,min_version=0,max_version=2},{api_key=17,min_version=0,max_version=1},{api_key=18,min_version=0,max_version=2},{api_key=19,min_version=0,max_version=3},{api_key=20,min_version=0,max_version=3},{api_key=21,min_version=0,max_version=1},{api_key=22,min_version=0,max_version=1},{api_key=23,min_version=0,max_version=2},{api_key=24,min_version=0,max_version=1},{api_key=25,min_version=0,max_version=1},{api_key=26,min_version=0,max_version=1},{api_key=27,min_version=0,max_version=0},{api_key=28,min_version=0,max_version=2},{api_key=29,min_version=0,max_version=1},{api_key=30,min_version=0,max_version=1},{api_key=31,min_version=0,max_version=1},{api_key=32,min_version=0,max_version=2},{api_key=33,min_version=0,max_version=1},{api_key=34,min_version=0,max_version=1},{api_key=35,min_version=0,max_version=1},{api_key=36,min_version=0,max_version=1},{api_key=37,min_version=0,max_version=1},{api_key=38,min_version=0,max_version=1},{api_key=39,min_version=0,max_version=1},{api_key=40,min_version=0,max_version=1},{api_key=41,min_version=0,max_version=1},{api_key=42,min_version=0,max_version=1},{api_key=43,min_version=0,max_version=0},{api_key=44,min_version=0,max_version=0}],throttle_time_ms=0} from connection 127.0.0.1:9091-127.0.0.1:50262-3;totalTime:1.679,requestQueueTime:0.084,localTime:0.682,remoteTime:0.0,throttleTime:0.494,responseQueueTime:0.293,sendTime:0.78,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:32,887] DEBUG Completed request:RequestHeader(apiKey=METADATA, apiVersion=8, clientId=producer-4, correlationId=2) -- {topics=[],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false},response:{throttle_time_ms=0,brokers=[{node_id=2,host=192.168.1.12,port=9092,rack=null},{node_id=3,host=192.168.1.12,port=9093,rack=null},{node_id=1,host=192.168.1.12,port=9091,rack=null}],cluster_id=NDxiWJjlSwCLOUcmi8ZKpQ,controller_id=1,topics=[],cluster_authorized_operations=0} from connection 127.0.0.1:9091-127.0.0.1:50262-3;totalTime:1.297,requestQueueTime:0.319,localTime:0.64,remoteTime:0.0,throttleTime:0.257,responseQueueTime:0.123,sendTime:0.268,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:32,892] DEBUG Completed request:RequestHeader(apiKey=API_VERSIONS, apiVersion=2, clientId=producer-4, correlationId=5) -- {},response:{error_code=0,api_versions=[{api_key=0,min_version=0,max_version=7},{api_key=1,min_version=0,max_version=10},{api_key=2,min_version=0,max_version=5},{api_key=3,min_version=0,max_version=8},{api_key=4,min_version=0,max_version=2},{api_key=5,min_version=0,max_version=1},{api_key=6,min_version=0,max_version=5},{api_key=7,min_version=0,max_version=2},{api_key=8,min_version=0,max_version=6},{api_key=9,min_version=0,max_version=5},{api_key=10,min_version=0,max_version=2},{api_key=11,min_version=0,max_version=5},{api_key=12,min_version=0,max_version=2},{api_key=13,min_version=0,max_version=2},{api_key=14,min_version=0,max_version=2},{api_key=15,min_version=0,max_version=3},{api_key=16,min_version=0,max_version=2},{api_key=17,min_version=0,max_version=1},{api_key=18,min_version=0,max_version=2},{api_key=19,min_version=0,max_version=3},{api_key=20,min_version=0,max_version=3},{api_key=21,min_version=0,max_version=1},{api_key=22,min_version=0,max_version=1},{api_key=23,min_version=0,max_version=2},{api_key=24,min_version=0,max_version=1},{api_key=25,min_version=0,max_version=1},{api_key=26,min_version=0,max_version=1},{api_key=27,min_version=0,max_version=0},{api_key=28,min_version=0,max_version=2},{api_key=29,min_version=0,max_version=1},{api_key=30,min_version=0,max_version=1},{api_key=31,min_version=0,max_version=1},{api_key=32,min_version=0,max_version=2},{api_key=33,min_version=0,max_version=1},{api_key=34,min_version=0,max_version=1},{api_key=35,min_version=0,max_version=1},{api_key=36,min_version=0,max_version=1},{api_key=37,min_version=0,max_version=1},{api_key=38,min_version=0,max_version=1},{api_key=39,min_version=0,max_version=1},{api_key=40,min_version=0,max_version=1},{api_key=41,min_version=0,max_version=1},{api_key=42,min_version=0,max_version=1},{api_key=43,min_version=0,max_version=0},{api_key=44,min_version=0,max_version=0}],throttle_time_ms=0} from connection 192.168.1.12:9091-192.168.1.12:50265-4;totalTime:0.727,requestQueueTime:0.076,localTime:0.425,remoteTime:0.0,throttleTime:0.332,responseQueueTime:0.106,sendTime:0.192,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:32,997] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-4, correlationId=6) -- {transactional_id=txd-,transaction_timeout_ms=60000},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1} from connection 192.168.1.12:9091-192.168.1.12:50265-4;totalTime:1.204,requestQueueTime:0.201,localTime:0.748,remoteTime:0.0,throttleTime:0.487,responseQueueTime:0.075,sendTime:0.216,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:33,101] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-4, correlationId=7) -- {transactional_id=txd-,transaction_timeout_ms=60000},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1} from connection 192.168.1.12:9091-192.168.1.12:50265-4;totalTime:0.708,requestQueueTime:0.098,localTime:0.382,remoteTime:0.0,throttleTime:0.16,responseQueueTime:0.073,sendTime:0.189,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:36,019] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-4, correlationId=8) -- {transactional_id=txd-,transaction_timeout_ms=60000},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1} from connection 192.168.1.12:9091-192.168.1.12:50265-4;totalTime:0.549,requestQueueTime:0.081,localTime:0.265,remoteTime:0.0,throttleTime:0.103,responseQueueTime:0.078,sendTime:0.149,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:36,125] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-4, correlationId=9) -- {transactional_id=txd-,transaction_timeout_ms=60000},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1} from connection 192.168.1.12:9091-192.168.1.12:50265-4;totalTime:0.543,requestQueueTime:0.077,localTime:0.281,remoteTime:0.0,throttleTime:0.109,responseQueueTime:0.091,sendTime:0.142,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2019-05-15 17:22:36,231] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-4, correlationId=10) -- {transactional_id=txd-,transaction_timeout_ms=60000},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1} from connection 192.168.1.12:9091-192.168.1.12:50265-4;totalTime:1.977,requestQueueTime:0.134,localTime:0.58,remoteTime:0.0,throttleTime:0.215,responseQueueTime:0.162,sendTime:1.188,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) {noformat} For which the steps are: # The producer initializes the transaction # Tries to send messages but unable to do so as the partition has a limit on message size for the topic # The broker tries to abort the transaction as it detects it's an unrecoverable error # It is unable to do so as it isn't able to commit the transaction end marker to the topic as its size limit is so small # The transaction therefore is not able to finish # The producer ends up in a retry loop for init_producer_id as it only knows about the "record too large" problem for the message but not that the broker is unable to finish the transaction I'll have to think how this can be sold the right way but I'd argue that this issue would happen in production that much as such small max message sizes for topics are not recommended. Therefore I'd lower the priority of the jira but please let me know if you have a good reason for not to do so. > Unstable Producer After Send Failure in Transaction > --------------------------------------------------- > > Key: KAFKA-8195 > URL: https://issues.apache.org/jira/browse/KAFKA-8195 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.0.1, 2.2.0, 2.3.0 > Reporter: Gary Russell > Assignee: Viktor Somogyi-Vass > Priority: Blocker > > This journey started with [this Stack Overflow question | > https://stackoverflow.com/questions/55510898]. > I easily reproduced his issue (see my comments on his question). > My first step was to take Spring out of the picture and replicate the issue > with the native {{Producer}} apis. The following code shows the result; I > have attached logs and stack traces. > There are four methods in the test; the first performs 2 transactions, > successfully, on the same {{Producer}} instance. > The second aborts 2 transactions, successfully, on the same {{Producer}} > instance - application level failures after performing a send. > There are two flavors of the problem: > The third attempts to send 2 messages, on the same {{Producer}} that are too > large for the topic; the first aborts as expected; the second send hangs in > {{abortTransaction}} after getting a {{TimeoutException}} when attempting to > {{get}} the send metadata. See log {{hang.same.producer.log}} - it also > includes the stack trace showing the hang. > The fourth test is similar to the third but it closes the producer after the > first failure; this time, we timeout in {{initTransactions()}}. > Subsequent executions of this test get the timeout on the first attempt - > that {{transactional.id}} seems to be unusable. Removing the logs was one way > I found to get past the problem. > Test code > {code:java} > public ApplicationRunner runner(AdminClient client, > DefaultKafkaProducerFactory<String, String> pf) { > return args -> { > try { > Map<String, Object> configs = new > HashMap<>(pf.getConfigurationProperties()); > int committed = testGoodTx(client, configs); > System.out.println("Successes (same producer) > committed: " + committed); > int rolledBack = testAppFailureTx(client, > configs); > System.out.println("App failures (same > producer) rolled back: " + rolledBack); > > // first flavor - hung thread in > abortTransaction() > // rolledBack = > testSendFailureTxSameProducer(client, configs); > // System.out.println("Send failures (same > producer) rolled back: " + rolledBack); > > // second flavor - timeout in initTransactions() > rolledBack = > testSendFailureTxNewProducer(client, configs); > System.out.println("Send failures (new > producer) rolled back: " + rolledBack); > } > catch (Exception e) { > e.printStackTrace(); > } > }; > } > private int testGoodTx(AdminClient client, Map<String, Object> configs) > throws ExecutionException { > int commits = 0; > NewTopic topic = TopicBuilder.name("so55510898a") > .partitions(1) > .replicas(1) > .build(); > createTopic(client, topic); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txa-"); > Producer<String, String> producer = new > KafkaProducer<>(configs); > try { > producer.initTransactions(); > for (int i = 0; i < 2; i++) { > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898a", "foooooooooooooooooooooo")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > producer.commitTransaction(); > commits++; > } > } > catch (ProducerFencedException | OutOfOrderSequenceException | > AuthorizationException e) { > // We can't recover from these exceptions, so our only > option is to close the producer and exit. > } > catch (Exception e) { > System.out.println("aborting"); > producer.abortTransaction(); > } > finally { > producer.close(); > } > return commits; > } > private int testAppFailureTx(AdminClient client, Map<String, Object> > configs) > throws ExecutionException { > int aborts = 0; > NewTopic topic = TopicBuilder.name("so55510898b") > .partitions(1) > .replicas(1) > .build(); > createTopic(client, topic); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txb-"); > Producer<String, String> producer = new > KafkaProducer<>(configs); > producer.initTransactions(); > for (int i = 0; i < 2; i++) { > try { > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898b", "baaaaaaaaaaaaaaaar")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > throw new RuntimeException("App failed after > send"); > } > catch (ProducerFencedException | > OutOfOrderSequenceException | AuthorizationException e) { > // We can't recover from these exceptions, so > our only option is to close the producer and exit. > } > catch (Exception e) { > System.out.println("aborting"); > producer.abortTransaction(); > aborts++; > } > } > producer.close(); > return aborts; > } > private int testSendFailureTxSameProducer(AdminClient client, > Map<String, Object> configs) > throws ExecutionException { > int aborts = 0; > NewTopic topic = TopicBuilder.name("so55510898c") > .partitions(1) > .replicas(1) > .config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, > "10") > .build(); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txc-"); > createTopic(client, topic); > Producer<String, String> producer = new > KafkaProducer<>(configs); > producer.initTransactions(); > for (int i = 0; i < 2; i++) { > try { > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898c", "baaaaaaaaaaaaaaaaz")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > throw new RuntimeException("App failed after > send"); > } > catch (ProducerFencedException | > OutOfOrderSequenceException | AuthorizationException e) { > // We can't recover from these exceptions, so > our only option is to close the producer and exit. > } > catch (Exception e) { > System.out.println("aborting"); > e.printStackTrace(); > producer.abortTransaction(); > aborts++; > } > } > producer.close(); > return aborts; > } > private int testSendFailureTxNewProducer(AdminClient client, > Map<String, Object> configs) > throws ExecutionException { > int aborts = 0; > NewTopic topic = TopicBuilder.name("so55510898d") > .partitions(1) > .replicas(1) > .config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, > "10") > .build(); > createTopic(client, topic); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txd-"); > for (int i = 0; i < 2; i++) { > Producer<String, String> producer = new > KafkaProducer<>(configs); > try { > try { > producer.initTransactions(); > } > catch (Exception e) { > e.printStackTrace(); > return aborts; > } > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898d", "quuuuuuuuuuuuuuux")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > throw new RuntimeException("App failed after > send"); > } > catch (ProducerFencedException | > OutOfOrderSequenceException | AuthorizationException e) { > // We can't recover from these exceptions, so > our only option is to close the producer and exit. > } > catch (Exception e) { > System.out.println("aborting"); > e.printStackTrace(); > producer.abortTransaction(); > aborts++; > } > finally { > producer.close(); > } > } > return aborts; > } > private void createTopic(AdminClient client, NewTopic topic) throws > ExecutionException { > try { > > client.createTopics(Collections.singletonList(topic)).all().get(); > return; > } > catch (InterruptedException e) { > Thread.currentThread().interrupt(); > return; > } > catch (ExecutionException e) { > if (!(e.getCause() instanceof TopicExistsException)) { > throw e; > } > } > } > {code} > hang.same.producer.log > {noformat} > 2019-04-05 12:24:42.235 INFO 15404 --- [ main] > com.example.So55510898Application : Starting So55510898Application on > Gollum2.local with PID 15404 > (/Users/grussell/Development/stsws/so55510898/target/classes started by > grussell in /Users/grussell/Development/stsws/so55510898) > 2019-04-05 12:24:42.237 INFO 15404 --- [ main] > com.example.So55510898Application : No active profile set, falling > back to default profiles: default > 2019-04-05 12:24:42.546 INFO 15404 --- [ main] > trationDelegate$BeanPostProcessorChecker : Bean > 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type > [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$b2eeb124] > is not eligible for getting processed by all BeanPostProcessors (for > example: not eligible for auto-proxying) > 2019-04-05 12:24:42.639 INFO 15404 --- [ main] > o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values: > bootstrap.servers = [localhost:9092] > client.dns.lookup = default > client.id = > connections.max.idle.ms = 300000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 120000 > retries = 5 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > 2019-04-05 12:24:42.671 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:24:42.672 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:24:42.853 INFO 15404 --- [ main] > com.example.So55510898Application : Started So55510898Application in > 0.8 seconds (JVM running for 1.233) > 2019-04-05 12:24:43.058 INFO 15404 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txa- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:24:43.063 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, > transactionalId=txa-] Instantiated a transactional producer. > 2019-04-05 12:24:43.074 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:24:43.075 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:24:43.075 INFO 15404 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, > transactionalId=txa-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:24:43.176 INFO 15404 --- [ad | producer-1] > org.apache.kafka.clients.Metadata : Cluster ID: 0q-Ge1ROSyKNehyv1sVJCQ > 2019-04-05 12:24:43.927 INFO 15404 --- [ad | producer-1] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, > transactionalId=txa-] ProducerId set to 0 with epoch 0 > so55510898a-0@0 > so55510898a-0@2 > 2019-04-05 12:24:44.034 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, > transactionalId=txa-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > Successes (same producer) committed: 2 > 2019-04-05 12:24:44.062 INFO 15404 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txb- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:24:44.062 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, > transactionalId=txb-] Instantiated a transactional producer. > 2019-04-05 12:24:44.066 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:24:44.066 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:24:44.066 INFO 15404 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, > transactionalId=txb-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:24:44.171 INFO 15404 --- [ad | producer-2] > org.apache.kafka.clients.Metadata : Cluster ID: 0q-Ge1ROSyKNehyv1sVJCQ > 2019-04-05 12:24:44.339 INFO 15404 --- [ad | producer-2] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, > transactionalId=txb-] ProducerId set to 1 with epoch 0 > so55510898b-0@0 > aborting > so55510898b-0@2 > aborting > 2019-04-05 12:24:44.384 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, > transactionalId=txb-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > App failures (same producer) rolled back: 2 > 2019-04-05 12:24:44.416 INFO 15404 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txc- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:24:44.417 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-3, > transactionalId=txc-] Instantiated a transactional producer. > 2019-04-05 12:24:44.419 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:24:44.420 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:24:44.420 INFO 15404 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-3, > transactionalId=txc-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:24:44.522 INFO 15404 --- [ad | producer-3] > org.apache.kafka.clients.Metadata : Cluster ID: 0q-Ge1ROSyKNehyv1sVJCQ > 2019-04-05 12:24:44.634 INFO 15404 --- [ad | producer-3] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-3, > transactionalId=txc-] ProducerId set to 2 with epoch 0 > aborting > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.RecordTooLargeException: The request included > a message larger than the max message size the server will accept. > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:81) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > at > com.example.So55510898Application.testSendFailureTxSameProducer(So55510898Application.java:176) > at > com.example.So55510898Application.lambda$0(So55510898Application.java:84) > at > org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) > at > org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:324) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) > at com.example.So55510898Application.main(So55510898Application.java:36) > Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The > request included a message larger than the max message size the server will > accept. > aborting > java.util.concurrent.TimeoutException: Timeout after waiting for 10000 ms. > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:78) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > at > com.example.So55510898Application.testSendFailureTxSameProducer(So55510898Application.java:176) > at > com.example.So55510898Application.lambda$0(So55510898Application.java:84) > at > org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) > at > org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:324) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) > at com.example.So55510898Application.main(So55510898Application.java:36) > Thread stiuck in abort after RecordTooLargeException: > "main" #1 prio=5 os_prio=31 tid=0x00007fc36b001800 nid=0x2603 waiting on > condition [0x0000700000b36000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000000076b5b3208> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:718) > at > com.example.So55510898Application.testSendFailureTxSameProducer(So55510898Application.java:186) > at > com.example.So55510898Application.lambda$0(So55510898Application.java:84) > at com.example.So55510898Application$$Lambda$204/1026483832.run(Unknown > Source) > at > org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) > at > org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:324) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) > at com.example.So55510898Application.main(So55510898Application.java:36) > {noformat} > inittrans.timeout.log > {noformat} > 2019-04-05 12:42:30.007 INFO 21503 --- [ main] > com.example.So55510898Application : Starting So55510898Application on > Gollum2.local with PID 21503 > (/Users/grussell/Development/stsws/so55510898/target/classes started by > grussell in /Users/grussell/Development/stsws/so55510898) > 2019-04-05 12:42:30.009 INFO 21503 --- [ main] > com.example.So55510898Application : No active profile set, falling > back to default profiles: default > 2019-04-05 12:42:30.323 INFO 21503 --- [ main] > trationDelegate$BeanPostProcessorChecker : Bean > 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type > [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$e567a345] > is not eligible for getting processed by all BeanPostProcessors (for > example: not eligible for auto-proxying) > 2019-04-05 12:42:30.414 INFO 21503 --- [ main] > o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values: > bootstrap.servers = [localhost:9092] > client.dns.lookup = default > client.id = > connections.max.idle.ms = 300000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 120000 > retries = 5 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > 2019-04-05 12:42:30.443 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:30.444 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:30.591 INFO 21503 --- [ main] > com.example.So55510898Application : Started So55510898Application in > 0.769 seconds (JVM running for 1.163) > 2019-04-05 12:42:30.806 INFO 21503 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txa- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:42:30.811 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, > transactionalId=txa-] Instantiated a transactional producer. > 2019-04-05 12:42:30.825 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:30.825 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:30.826 INFO 21503 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, > transactionalId=txa-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:42:30.927 INFO 21503 --- [ad | producer-1] > org.apache.kafka.clients.Metadata : Cluster ID: u1qliw8hRg-kG3RA3GnspQ > 2019-04-05 12:42:31.792 INFO 21503 --- [ad | producer-1] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, > transactionalId=txa-] ProducerId set to 0 with epoch 0 > so55510898a-0@0 > so55510898a-0@2 > 2019-04-05 12:42:31.901 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, > transactionalId=txa-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > Successes (same producer) committed: 2 > 2019-04-05 12:42:31.938 INFO 21503 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txb- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:42:31.938 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, > transactionalId=txb-] Instantiated a transactional producer. > 2019-04-05 12:42:31.943 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:31.943 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:31.943 INFO 21503 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, > transactionalId=txb-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:42:32.046 INFO 21503 --- [ad | producer-2] > org.apache.kafka.clients.Metadata : Cluster ID: u1qliw8hRg-kG3RA3GnspQ > 2019-04-05 12:42:32.157 INFO 21503 --- [ad | producer-2] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, > transactionalId=txb-] ProducerId set to 1 with epoch 0 > so55510898b-0@0 > aborting > so55510898b-0@2 > aborting > 2019-04-05 12:42:32.200 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, > transactionalId=txb-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > App failures (same producer) rolled back: 2 > 2019-04-05 12:42:32.231 INFO 21503 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txd- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:42:32.231 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-3, > transactionalId=txd-] Instantiated a transactional producer. > 2019-04-05 12:42:32.234 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:32.234 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:32.234 INFO 21503 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-3, > transactionalId=txd-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:42:32.339 INFO 21503 --- [ad | producer-3] > org.apache.kafka.clients.Metadata : Cluster ID: u1qliw8hRg-kG3RA3GnspQ > 2019-04-05 12:42:32.449 INFO 21503 --- [ad | producer-3] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-3, > transactionalId=txd-] ProducerId set to 2 with epoch 0 > aborting > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.RecordTooLargeException: The request included > a message larger than the max message size the server will accept. > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:81) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > at > com.example.So55510898Application.testSendFailureTxNewProducer(So55510898Application.java:222) > at > com.example.So55510898Application.lambda$0(So55510898Application.java:87) > at > org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) > at > org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:324) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) > at com.example.So55510898Application.main(So55510898Application.java:36) > Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The > request included a message larger than the max message size the server will > accept. > 2019-04-05 12:42:32.463 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-3, > transactionalId=txd-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > 2019-04-05 12:42:32.466 INFO 21503 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txd- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:42:32.466 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-4, > transactionalId=txd-] Instantiated a transactional producer. > 2019-04-05 12:42:32.470 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:32.470 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:32.470 INFO 21503 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-4, > transactionalId=txd-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:42:32.574 INFO 21503 --- [ad | producer-4] > org.apache.kafka.clients.Metadata : Cluster ID: u1qliw8hRg-kG3RA3GnspQ > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > initializing transactional state in 5000ms. > 2019-04-05 12:42:37.472 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-4, > transactionalId=txd-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > Send failures (new producer) rolled back: 1 > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)