[ 
https://issues.apache.org/jira/browse/KAFKA-1079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostya Golikov updated KAFKA-1079:
----------------------------------

    Fix Version/s: 0.8.1
           Status: Patch Available  (was: Open)

>From 7845a5af42ee44f9786542178c0789d93c66429f Mon Sep 17 00:00:00 2001
From: Kostya Golikov <johnysilv...@gmail.com>
Date: Thu, 24 Oct 2013 00:48:56 +0400
Subject: [PATCH] Fixed liars in primary api test: now checking is done not
 only against no-compression producer, but against with-compression producer
 as well.

---
 .../unit/kafka/integration/PrimitiveApiTest.scala  | 141 ++++++---------------
 1 file changed, 40 insertions(+), 101 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 5f331d2..c001b4e 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -35,12 +35,12 @@ import kafka.utils.{TestUtils, Utils}
  * End to end tests of the primitive apis against a local server
  */
 class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness 
with ZooKeeperTestHarness {
+  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
-  val port = TestUtils.choosePort
+  val port = TestUtils.choosePort()
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props)
   val configs = List(config)
-  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
   def testFetchRequestCanProperlySerialize() {
     val request = new FetchRequestBuilder()
@@ -100,7 +100,7 @@ class PrimitiveApiTest extends JUnit3Suite with 
ProducerConsumerTestHarness with
     val stringProducer1 = new Producer[String, String](config)
     stringProducer1.send(new KeyedMessage[String, String](topic, 
"test-message"))
 
-    var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 
0, 10000).build())
+    val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 
0, 10000).build())
     val messageSet = fetched.messageSet(topic, 0)
     assertTrue(messageSet.iterator.hasNext)
 
@@ -108,8 +108,8 @@ class PrimitiveApiTest extends JUnit3Suite with 
ProducerConsumerTestHarness with
     assertEquals("test-message", 
Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
   }
 
-  def testProduceAndMultiFetch() {
-    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", 
"test4"), config.brokerId)
+  private def produceAndMultiFetch(producer: Producer[String, String]) {
+    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", 
"test4"))
 
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@@ -171,117 +171,56 @@ class PrimitiveApiTest extends JUnit3Suite with 
ProducerConsumerTestHarness with
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
-  def testProduceAndMultiFetchWithCompression() {
-    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", 
"test4"), config.brokerId)
-
-    // send some messages
-    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    {
-      val messages = new mutable.HashMap[String, Seq[String]]
-      val builder = new FetchRequestBuilder()
-      for( (topic, partition) <- topics) {
-        val messageList = List("a_" + topic, "b_" + topic)
-        val producerData = messageList.map(new KeyedMessage[String, 
String](topic, topic, _))
-        messages += topic -> messageList
-        producer.send(producerData:_*)
-        builder.addFetch(topic, partition, 0, 10000)
-      }
-
-      // wait a bit for produced message to be available
-      val request = builder.build()
-      val response = consumer.fetch(request)
-      for( (topic, partition) <- topics) {
-        val fetched = response.messageSet(topic, partition)
-        assertEquals(messages(topic), fetched.map(messageAndOffset => 
Utils.readString(messageAndOffset.message.payload)))
-      }
-    }
-
-    // temporarily set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.FATAL)
-
-    {
-      // send some invalid offsets
-      val builder = new FetchRequestBuilder()
-      for( (topic, partition) <- topics)
-        builder.addFetch(topic, partition, -1, 10000)
-
-      try {
-        val request = builder.build()
-        val response = consumer.fetch(request)
-        response.data.values.foreach(pdata => 
ErrorMapping.maybeThrowException(pdata.error))
-        fail("Expected exception when fetching message with invalid offset")
-      } catch {
-        case e: OffsetOutOfRangeException => "this is good"
-      }
-    }
-
-    {
-      // send some invalid partitions
-      val builder = new FetchRequestBuilder()
-      for( (topic, _) <- topics)
-        builder.addFetch(topic, -1, 0, 10000)
-
-      try {
-        val request = builder.build()
-        val response = consumer.fetch(request)
-        response.data.values.foreach(pdata => 
ErrorMapping.maybeThrowException(pdata.error))
-        fail("Expected exception when fetching message with invalid partition")
-      } catch {
-        case e: UnknownTopicOrPartitionException => "this is good"
-      }
-    }
+  def testProduceAndMultiFetch() {
+    val props = producer.config.props.props
+    val config = new ProducerConfig(props)
+    val noCompressionProducer = new Producer[String, String](config)
+    produceAndMultiFetch(noCompressionProducer)
+  }
 
-    // restore set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.ERROR)
+  def testProduceAndMultiFetchWithCompression() {
+    val props = producer.config.props.props
+    props.put("compression", "true")
+    val config = new ProducerConfig(props)
+    val producerWithCompression = new Producer[String, String](config)
+    produceAndMultiFetch(producerWithCompression)
   }
 
-  def testMultiProduce() {
-    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", 
"test4"), config.brokerId)
+  private def multiProduce(producer: Producer[String, String]) {
+    val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
+    createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
 
-    // send some messages
-    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[KeyedMessage[String, String]] = Nil
-    for( (topic, partition) <- topics) {
+    for((topic, partition) <- topics) {
       val messageList = List("a_" + topic, "b_" + topic)
       val producerData = messageList.map(new KeyedMessage[String, 
String](topic, topic, _))
       messages += topic -> messageList
       producer.send(producerData:_*)
       builder.addFetch(topic, partition, 0, 10000)
     }
-    producer.send(produceList: _*)
 
     val request = builder.build()
     val response = consumer.fetch(request)
-    for( (topic, partition) <- topics) {
+    for((topic, partition) <- topics) {
       val fetched = response.messageSet(topic, partition)
       assertEquals(messages(topic), fetched.map(messageAndOffset => 
Utils.readString(messageAndOffset.message.payload)))
     }
   }
 
-  def testMultiProduceWithCompression() {
-    // send some messages
-    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    val messages = new mutable.HashMap[String, Seq[String]]
-    val builder = new FetchRequestBuilder()
-    var produceList: List[KeyedMessage[String, String]] = Nil
-    for( (topic, partition) <- topics) {
-      val messageList = List("a_" + topic, "b_" + topic)
-      val producerData = messageList.map(new KeyedMessage[String, 
String](topic, topic, _))
-      messages += topic -> messageList
-      producer.send(producerData:_*)
-      builder.addFetch(topic, partition, 0, 10000)
-    }
-    producer.send(produceList: _*)
+  def testMultiProduce() {
+    val props = producer.config.props.props
+    val config = new ProducerConfig(props)
+    val noCompressionProducer = new Producer[String, String](config)
+    multiProduce(noCompressionProducer)
+  }
 
-    // wait a bit for produced message to be available
-    val request = builder.build()
-    val response = consumer.fetch(request)
-    for( (topic, partition) <- topics) {
-      val fetched = response.messageSet(topic, 0)
-      assertEquals(messages(topic), fetched.map(messageAndOffset => 
Utils.readString(messageAndOffset.message.payload)))
-    }
+  def testMultiProduceWithCompression() {
+    val props = producer.config.props.props
+    props.put("compression", "true")
+    val config = new ProducerConfig(props)
+    val producerWithCompression = new Producer[String, String](config)
+    multiProduce(producerWithCompression)
   }
 
   def testConsumerEmptyTopic() {
@@ -294,16 +233,15 @@ class PrimitiveApiTest extends JUnit3Suite with 
ProducerConsumerTestHarness with
   }
 
   def testPipelinedProduceRequests() {
-    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", 
"test4"), config.brokerId)
+    val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
+    createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
     val props = producer.config.props.props
     props.put("request.required.acks", "0")
     val pipelinedProducer: Producer[String, String] = new Producer(new 
ProducerConfig(props))
 
     // send some messages
-    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[KeyedMessage[String, String]] = Nil
     for( (topic, partition) <- topics) {
       val messageList = List("a_" + topic, "b_" + topic)
       val producerData = messageList.map(new KeyedMessage[String, 
String](topic, topic, _))
@@ -338,10 +276,11 @@ class PrimitiveApiTest extends JUnit3Suite with 
ProducerConsumerTestHarness with
    * For testing purposes, just create these topics each with one partition 
and one replica for
    * which the provided broker should the leader for.  Create and wait for 
broker to lead.  Simple.
    */
-  def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: 
Seq[String], brokerId: Int) {
+  private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: 
Iterable[String]) {
     for( topic <- topics ) {
-      AdminUtils.createTopic(zkClient, topic, 1, 1)
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+      AdminUtils.deleteTopic(zkClient, topic)
+      AdminUtils.createTopic(zkClient, topic, partitions = 1, 
replicationFactor = 1)
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 
0, timeoutMs = 500)
     }
   }
 }
-- 
1.8.5.1


> Liars in PrimitiveApiTest that promise to test api in compression mode, but 
> don't do this actually
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1079
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1079
>             Project: Kafka
>          Issue Type: Test
>          Components: core
>    Affects Versions: 0.8.0
>            Reporter: Kostya Golikov
>            Priority: Minor
>              Labels: newbie, test
>             Fix For: 0.8.1
>
>         Attachments: testing-with-compression-producer.patch
>
>
> Long time ago (0.7) we had ByteBufferMessageSet as a part of api and it's 
> allowed us to control compression. Times goes on and now PrimitiveApiTest 
> have methods that promise to test api with compression enabled, but in fact 
> they don't. Moreover this methods almost entirely copy their counterparts 
> without compression. In particular I'm talking about 
> `testProduceAndMultiFetch` / `testProduceAndMultiFetchWithCompression` and 
> `testMultiProduce`/`testMultiProduceWithCompression` pairs. 
> The fix could be super-easy and soundness -- just parameterize methods with 
> producer of each type (with/without compression). Sadly but it isn't feasible 
> for junit3, so straightforward solution is to do the same ugly thing as 
> `testDefaultEncoderProducerAndFetchWithCompression` method does -- forget 
> about class-wide producer and roll-out it's own. I will attach path if that 
> is a problem indeed. 



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Reply via email to