jolshan commented on code in PR #14702:
URL: https://github.com/apache/kafka/pull/14702#discussion_r1409765577


##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
     assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+    val numRecords = 50
+    val numProducersWithCompression = 5
+    val numTransactions = 40
+    val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+    for (i <- 0 until numProducersWithCompression) {
+      transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+    }
+
+    // KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   This made sense to me before, but I'm trying to remember why. 😅 
   
   Is it possible that when we are building and sending the response we could 
receive the callback request and that means it will be handled on a different 
thread? Vs when acks=all, we free the thread after sending out the 
verification, so it can be available to receive the callback?



##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -820,6 +823,37 @@ class TransactionsTest extends IntegrationTestHarness {
     assertEquals((initialProducerEpoch + 1).toShort, 
producerStateEntry.producerEpoch)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionsWithCompression(quorum: String): Unit = {
+    val numRecords = 50
+    val numProducersWithCompression = 5
+    val numTransactions = 40
+    val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
+
+    for (i <- 0 until numProducersWithCompression) {
+      transactionalCompressionProducers += 
createTransactionalProducer("transactional-compression-producer-" + i.toString, 
 compressionType = "snappy")
+    }
+
+    // KAFKA-15653 is triggered more easily with replication factor 1

Review Comment:
   This made sense to me before, but I'm trying to remember why. 😅 
   
   Is it possible that when we are building and sending the response we could 
receive the callback request and that means it will be handled on a different 
thread? Vs when acks=all, we free the thread after sending out the 
verification, so it can be available to receive the callback?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to