[ https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Viktor Somogyi-Vass reassigned KAFKA-8195: ------------------------------------------ Assignee: (was: Viktor Somogyi-Vass) > Unstable Producer After Send Failure in Transaction > --------------------------------------------------- > > Key: KAFKA-8195 > URL: https://issues.apache.org/jira/browse/KAFKA-8195 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.0.1, 2.2.0, 2.3.0 > Reporter: Gary Russell > Priority: Major > > This journey started with [this Stack Overflow question | > https://stackoverflow.com/questions/55510898]. > I easily reproduced his issue (see my comments on his question). > My first step was to take Spring out of the picture and replicate the issue > with the native {{Producer}} apis. The following code shows the result; I > have attached logs and stack traces. > There are four methods in the test; the first performs 2 transactions, > successfully, on the same {{Producer}} instance. > The second aborts 2 transactions, successfully, on the same {{Producer}} > instance - application level failures after performing a send. > There are two flavors of the problem: > The third attempts to send 2 messages, on the same {{Producer}} that are too > large for the topic; the first aborts as expected; the second send hangs in > {{abortTransaction}} after getting a {{TimeoutException}} when attempting to > {{get}} the send metadata. See log {{hang.same.producer.log}} - it also > includes the stack trace showing the hang. > The fourth test is similar to the third but it closes the producer after the > first failure; this time, we timeout in {{initTransactions()}}. > Subsequent executions of this test get the timeout on the first attempt - > that {{transactional.id}} seems to be unusable. Removing the logs was one way > I found to get past the problem. > Test code > {code:java} > public ApplicationRunner runner(AdminClient client, > DefaultKafkaProducerFactory<String, String> pf) { > return args -> { > try { > Map<String, Object> configs = new > HashMap<>(pf.getConfigurationProperties()); > int committed = testGoodTx(client, configs); > System.out.println("Successes (same producer) > committed: " + committed); > int rolledBack = testAppFailureTx(client, > configs); > System.out.println("App failures (same > producer) rolled back: " + rolledBack); > > // first flavor - hung thread in > abortTransaction() > // rolledBack = > testSendFailureTxSameProducer(client, configs); > // System.out.println("Send failures (same > producer) rolled back: " + rolledBack); > > // second flavor - timeout in initTransactions() > rolledBack = > testSendFailureTxNewProducer(client, configs); > System.out.println("Send failures (new > producer) rolled back: " + rolledBack); > } > catch (Exception e) { > e.printStackTrace(); > } > }; > } > private int testGoodTx(AdminClient client, Map<String, Object> configs) > throws ExecutionException { > int commits = 0; > NewTopic topic = TopicBuilder.name("so55510898a") > .partitions(1) > .replicas(1) > .build(); > createTopic(client, topic); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txa-"); > Producer<String, String> producer = new > KafkaProducer<>(configs); > try { > producer.initTransactions(); > for (int i = 0; i < 2; i++) { > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898a", "foooooooooooooooooooooo")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > producer.commitTransaction(); > commits++; > } > } > catch (ProducerFencedException | OutOfOrderSequenceException | > AuthorizationException e) { > // We can't recover from these exceptions, so our only > option is to close the producer and exit. > } > catch (Exception e) { > System.out.println("aborting"); > producer.abortTransaction(); > } > finally { > producer.close(); > } > return commits; > } > private int testAppFailureTx(AdminClient client, Map<String, Object> > configs) > throws ExecutionException { > int aborts = 0; > NewTopic topic = TopicBuilder.name("so55510898b") > .partitions(1) > .replicas(1) > .build(); > createTopic(client, topic); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txb-"); > Producer<String, String> producer = new > KafkaProducer<>(configs); > producer.initTransactions(); > for (int i = 0; i < 2; i++) { > try { > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898b", "baaaaaaaaaaaaaaaar")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > throw new RuntimeException("App failed after > send"); > } > catch (ProducerFencedException | > OutOfOrderSequenceException | AuthorizationException e) { > // We can't recover from these exceptions, so > our only option is to close the producer and exit. > } > catch (Exception e) { > System.out.println("aborting"); > producer.abortTransaction(); > aborts++; > } > } > producer.close(); > return aborts; > } > private int testSendFailureTxSameProducer(AdminClient client, > Map<String, Object> configs) > throws ExecutionException { > int aborts = 0; > NewTopic topic = TopicBuilder.name("so55510898c") > .partitions(1) > .replicas(1) > .config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, > "10") > .build(); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txc-"); > createTopic(client, topic); > Producer<String, String> producer = new > KafkaProducer<>(configs); > producer.initTransactions(); > for (int i = 0; i < 2; i++) { > try { > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898c", "baaaaaaaaaaaaaaaaz")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > throw new RuntimeException("App failed after > send"); > } > catch (ProducerFencedException | > OutOfOrderSequenceException | AuthorizationException e) { > // We can't recover from these exceptions, so > our only option is to close the producer and exit. > } > catch (Exception e) { > System.out.println("aborting"); > e.printStackTrace(); > producer.abortTransaction(); > aborts++; > } > } > producer.close(); > return aborts; > } > private int testSendFailureTxNewProducer(AdminClient client, > Map<String, Object> configs) > throws ExecutionException { > int aborts = 0; > NewTopic topic = TopicBuilder.name("so55510898d") > .partitions(1) > .replicas(1) > .config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, > "10") > .build(); > createTopic(client, topic); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txd-"); > for (int i = 0; i < 2; i++) { > Producer<String, String> producer = new > KafkaProducer<>(configs); > try { > try { > producer.initTransactions(); > } > catch (Exception e) { > e.printStackTrace(); > return aborts; > } > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898d", "quuuuuuuuuuuuuuux")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > throw new RuntimeException("App failed after > send"); > } > catch (ProducerFencedException | > OutOfOrderSequenceException | AuthorizationException e) { > // We can't recover from these exceptions, so > our only option is to close the producer and exit. > } > catch (Exception e) { > System.out.println("aborting"); > e.printStackTrace(); > producer.abortTransaction(); > aborts++; > } > finally { > producer.close(); > } > } > return aborts; > } > private void createTopic(AdminClient client, NewTopic topic) throws > ExecutionException { > try { > > client.createTopics(Collections.singletonList(topic)).all().get(); > return; > } > catch (InterruptedException e) { > Thread.currentThread().interrupt(); > return; > } > catch (ExecutionException e) { > if (!(e.getCause() instanceof TopicExistsException)) { > throw e; > } > } > } > {code} > hang.same.producer.log > {noformat} > 2019-04-05 12:24:42.235 INFO 15404 --- [ main] > com.example.So55510898Application : Starting So55510898Application on > Gollum2.local with PID 15404 > (/Users/grussell/Development/stsws/so55510898/target/classes started by > grussell in /Users/grussell/Development/stsws/so55510898) > 2019-04-05 12:24:42.237 INFO 15404 --- [ main] > com.example.So55510898Application : No active profile set, falling > back to default profiles: default > 2019-04-05 12:24:42.546 INFO 15404 --- [ main] > trationDelegate$BeanPostProcessorChecker : Bean > 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type > [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$b2eeb124] > is not eligible for getting processed by all BeanPostProcessors (for > example: not eligible for auto-proxying) > 2019-04-05 12:24:42.639 INFO 15404 --- [ main] > o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values: > bootstrap.servers = [localhost:9092] > client.dns.lookup = default > client.id = > connections.max.idle.ms = 300000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 120000 > retries = 5 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > 2019-04-05 12:24:42.671 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:24:42.672 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:24:42.853 INFO 15404 --- [ main] > com.example.So55510898Application : Started So55510898Application in > 0.8 seconds (JVM running for 1.233) > 2019-04-05 12:24:43.058 INFO 15404 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txa- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:24:43.063 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, > transactionalId=txa-] Instantiated a transactional producer. > 2019-04-05 12:24:43.074 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:24:43.075 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:24:43.075 INFO 15404 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, > transactionalId=txa-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:24:43.176 INFO 15404 --- [ad | producer-1] > org.apache.kafka.clients.Metadata : Cluster ID: 0q-Ge1ROSyKNehyv1sVJCQ > 2019-04-05 12:24:43.927 INFO 15404 --- [ad | producer-1] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, > transactionalId=txa-] ProducerId set to 0 with epoch 0 > so55510898a-0@0 > so55510898a-0@2 > 2019-04-05 12:24:44.034 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, > transactionalId=txa-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > Successes (same producer) committed: 2 > 2019-04-05 12:24:44.062 INFO 15404 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txb- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:24:44.062 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, > transactionalId=txb-] Instantiated a transactional producer. > 2019-04-05 12:24:44.066 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:24:44.066 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:24:44.066 INFO 15404 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, > transactionalId=txb-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:24:44.171 INFO 15404 --- [ad | producer-2] > org.apache.kafka.clients.Metadata : Cluster ID: 0q-Ge1ROSyKNehyv1sVJCQ > 2019-04-05 12:24:44.339 INFO 15404 --- [ad | producer-2] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, > transactionalId=txb-] ProducerId set to 1 with epoch 0 > so55510898b-0@0 > aborting > so55510898b-0@2 > aborting > 2019-04-05 12:24:44.384 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, > transactionalId=txb-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > App failures (same producer) rolled back: 2 > 2019-04-05 12:24:44.416 INFO 15404 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txc- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:24:44.417 INFO 15404 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-3, > transactionalId=txc-] Instantiated a transactional producer. > 2019-04-05 12:24:44.419 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:24:44.420 INFO 15404 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:24:44.420 INFO 15404 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-3, > transactionalId=txc-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:24:44.522 INFO 15404 --- [ad | producer-3] > org.apache.kafka.clients.Metadata : Cluster ID: 0q-Ge1ROSyKNehyv1sVJCQ > 2019-04-05 12:24:44.634 INFO 15404 --- [ad | producer-3] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-3, > transactionalId=txc-] ProducerId set to 2 with epoch 0 > aborting > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.RecordTooLargeException: The request included > a message larger than the max message size the server will accept. > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:81) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > at > com.example.So55510898Application.testSendFailureTxSameProducer(So55510898Application.java:176) > at > com.example.So55510898Application.lambda$0(So55510898Application.java:84) > at > org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) > at > org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:324) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) > at com.example.So55510898Application.main(So55510898Application.java:36) > Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The > request included a message larger than the max message size the server will > accept. > aborting > java.util.concurrent.TimeoutException: Timeout after waiting for 10000 ms. > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:78) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > at > com.example.So55510898Application.testSendFailureTxSameProducer(So55510898Application.java:176) > at > com.example.So55510898Application.lambda$0(So55510898Application.java:84) > at > org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) > at > org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:324) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) > at com.example.So55510898Application.main(So55510898Application.java:36) > Thread stiuck in abort after RecordTooLargeException: > "main" #1 prio=5 os_prio=31 tid=0x00007fc36b001800 nid=0x2603 waiting on > condition [0x0000700000b36000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000000076b5b3208> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:718) > at > com.example.So55510898Application.testSendFailureTxSameProducer(So55510898Application.java:186) > at > com.example.So55510898Application.lambda$0(So55510898Application.java:84) > at com.example.So55510898Application$$Lambda$204/1026483832.run(Unknown > Source) > at > org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) > at > org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:324) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) > at com.example.So55510898Application.main(So55510898Application.java:36) > {noformat} > inittrans.timeout.log > {noformat} > 2019-04-05 12:42:30.007 INFO 21503 --- [ main] > com.example.So55510898Application : Starting So55510898Application on > Gollum2.local with PID 21503 > (/Users/grussell/Development/stsws/so55510898/target/classes started by > grussell in /Users/grussell/Development/stsws/so55510898) > 2019-04-05 12:42:30.009 INFO 21503 --- [ main] > com.example.So55510898Application : No active profile set, falling > back to default profiles: default > 2019-04-05 12:42:30.323 INFO 21503 --- [ main] > trationDelegate$BeanPostProcessorChecker : Bean > 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type > [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$e567a345] > is not eligible for getting processed by all BeanPostProcessors (for > example: not eligible for auto-proxying) > 2019-04-05 12:42:30.414 INFO 21503 --- [ main] > o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values: > bootstrap.servers = [localhost:9092] > client.dns.lookup = default > client.id = > connections.max.idle.ms = 300000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 120000 > retries = 5 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > 2019-04-05 12:42:30.443 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:30.444 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:30.591 INFO 21503 --- [ main] > com.example.So55510898Application : Started So55510898Application in > 0.769 seconds (JVM running for 1.163) > 2019-04-05 12:42:30.806 INFO 21503 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txa- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:42:30.811 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, > transactionalId=txa-] Instantiated a transactional producer. > 2019-04-05 12:42:30.825 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:30.825 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:30.826 INFO 21503 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, > transactionalId=txa-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:42:30.927 INFO 21503 --- [ad | producer-1] > org.apache.kafka.clients.Metadata : Cluster ID: u1qliw8hRg-kG3RA3GnspQ > 2019-04-05 12:42:31.792 INFO 21503 --- [ad | producer-1] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, > transactionalId=txa-] ProducerId set to 0 with epoch 0 > so55510898a-0@0 > so55510898a-0@2 > 2019-04-05 12:42:31.901 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, > transactionalId=txa-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > Successes (same producer) committed: 2 > 2019-04-05 12:42:31.938 INFO 21503 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txb- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:42:31.938 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, > transactionalId=txb-] Instantiated a transactional producer. > 2019-04-05 12:42:31.943 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:31.943 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:31.943 INFO 21503 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, > transactionalId=txb-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:42:32.046 INFO 21503 --- [ad | producer-2] > org.apache.kafka.clients.Metadata : Cluster ID: u1qliw8hRg-kG3RA3GnspQ > 2019-04-05 12:42:32.157 INFO 21503 --- [ad | producer-2] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, > transactionalId=txb-] ProducerId set to 1 with epoch 0 > so55510898b-0@0 > aborting > so55510898b-0@2 > aborting > 2019-04-05 12:42:32.200 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2, > transactionalId=txb-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > App failures (same producer) rolled back: 2 > 2019-04-05 12:42:32.231 INFO 21503 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txd- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:42:32.231 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-3, > transactionalId=txd-] Instantiated a transactional producer. > 2019-04-05 12:42:32.234 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:32.234 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:32.234 INFO 21503 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-3, > transactionalId=txd-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:42:32.339 INFO 21503 --- [ad | producer-3] > org.apache.kafka.clients.Metadata : Cluster ID: u1qliw8hRg-kG3RA3GnspQ > 2019-04-05 12:42:32.449 INFO 21503 --- [ad | producer-3] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-3, > transactionalId=txd-] ProducerId set to 2 with epoch 0 > aborting > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.RecordTooLargeException: The request included > a message larger than the max message size the server will accept. > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:81) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > at > com.example.So55510898Application.testSendFailureTxNewProducer(So55510898Application.java:222) > at > com.example.So55510898Application.lambda$0(So55510898Application.java:87) > at > org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) > at > org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:324) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) > at com.example.So55510898Application.main(So55510898Application.java:36) > Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The > request included a message larger than the max message size the server will > accept. > 2019-04-05 12:42:32.463 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-3, > transactionalId=txd-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > 2019-04-05 12:42:32.466 INFO 21503 --- [ main] > o.a.k.clients.producer.ProducerConfig : ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 5000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = txd- > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > 2019-04-05 12:42:32.466 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-4, > transactionalId=txd-] Instantiated a transactional producer. > 2019-04-05 12:42:32.470 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.2.0 > 2019-04-05 12:42:32.470 INFO 21503 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 05fcfde8f69b0349 > 2019-04-05 12:42:32.470 INFO 21503 --- [ main] > o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-4, > transactionalId=txd-] ProducerId set to -1 with epoch -1 > 2019-04-05 12:42:32.574 INFO 21503 --- [ad | producer-4] > org.apache.kafka.clients.Metadata : Cluster ID: u1qliw8hRg-kG3RA3GnspQ > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > initializing transactional state in 5000ms. > 2019-04-05 12:42:37.472 INFO 21503 --- [ main] > o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-4, > transactionalId=txd-] Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > Send failures (new producer) rolled back: 1 > {noformat} -- This message was sent by Atlassian Jira (v8.20.7#820007)