jolshan commented on code in PR #14702: URL: https://github.com/apache/kafka/pull/14702#discussion_r1409765577
########## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ########## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { + val numRecords = 50 + val numProducersWithCompression = 5 + val numTransactions = 40 + val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + + for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") + } + + // KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: This made sense to me before, but I'm trying to remember why. 😅 Is it possible that when we are building and sending the response we could receive the callback request and that means it will be handled on a different thread? Vs when acks=all, we free the thread after sending out the verification, so it can be available to receive the callback? ########## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ########## @@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionsWithCompression(quorum: String): Unit = { + val numRecords = 50 + val numProducersWithCompression = 5 + val numTransactions = 40 + val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + + for (i <- 0 until numProducersWithCompression) { + transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") + } + + // KAFKA-15653 is triggered more easily with replication factor 1 Review Comment: This made sense to me before, but I'm trying to remember why. 😅 Is it possible that when we are building and sending the response we could receive the callback request and that means it will be handled on a different thread? Vs when acks=all, we free the thread after sending out the verification, so it can be available to receive the callback? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org