[ https://issues.apache.org/jira/browse/KAFKA-1079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kostya Golikov updated KAFKA-1079: ---------------------------------- Fix Version/s: 0.8.1 Status: Patch Available (was: Open) >From 7845a5af42ee44f9786542178c0789d93c66429f Mon Sep 17 00:00:00 2001 From: Kostya Golikov <johnysilv...@gmail.com> Date: Thu, 24 Oct 2013 00:48:56 +0400 Subject: [PATCH] Fixed liars in primary api test: now checking is done not only against no-compression producer, but against with-compression producer as well. --- .../unit/kafka/integration/PrimitiveApiTest.scala | 141 ++++++--------------- 1 file changed, 40 insertions(+), 101 deletions(-) diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 5f331d2..c001b4e 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -35,12 +35,12 @@ import kafka.utils.{TestUtils, Utils} * End to end tests of the primitive apis against a local server */ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness { + val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) - val port = TestUtils.choosePort + val port = TestUtils.choosePort() val props = TestUtils.createBrokerConfig(0, port) val config = new KafkaConfig(props) val configs = List(config) - val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) def testFetchRequestCanProperlySerialize() { val request = new FetchRequestBuilder() @@ -100,7 +100,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val stringProducer1 = new Producer[String, String](config) stringProducer1.send(new KeyedMessage[String, String](topic, "test-message")) - var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) val messageSet = fetched.messageSet(topic, 0) assertTrue(messageSet.iterator.hasNext) @@ -108,8 +108,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8")) } - def testProduceAndMultiFetch() { - createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) + private def produceAndMultiFetch(producer: Producer[String, String]) { + createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4")) // send some messages val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); @@ -171,117 +171,56 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with requestHandlerLogger.setLevel(Level.ERROR) } - def testProduceAndMultiFetchWithCompression() { - createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) - - // send some messages - val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); - { - val messages = new mutable.HashMap[String, Seq[String]] - val builder = new FetchRequestBuilder() - for( (topic, partition) <- topics) { - val messageList = List("a_" + topic, "b_" + topic) - val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _)) - messages += topic -> messageList - producer.send(producerData:_*) - builder.addFetch(topic, partition, 0, 10000) - } - - // wait a bit for produced message to be available - val request = builder.build() - val response = consumer.fetch(request) - for( (topic, partition) <- topics) { - val fetched = response.messageSet(topic, partition) - assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) - } - } - - // temporarily set request handler logger to a higher level - requestHandlerLogger.setLevel(Level.FATAL) - - { - // send some invalid offsets - val builder = new FetchRequestBuilder() - for( (topic, partition) <- topics) - builder.addFetch(topic, partition, -1, 10000) - - try { - val request = builder.build() - val response = consumer.fetch(request) - response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) - fail("Expected exception when fetching message with invalid offset") - } catch { - case e: OffsetOutOfRangeException => "this is good" - } - } - - { - // send some invalid partitions - val builder = new FetchRequestBuilder() - for( (topic, _) <- topics) - builder.addFetch(topic, -1, 0, 10000) - - try { - val request = builder.build() - val response = consumer.fetch(request) - response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) - fail("Expected exception when fetching message with invalid partition") - } catch { - case e: UnknownTopicOrPartitionException => "this is good" - } - } + def testProduceAndMultiFetch() { + val props = producer.config.props.props + val config = new ProducerConfig(props) + val noCompressionProducer = new Producer[String, String](config) + produceAndMultiFetch(noCompressionProducer) + } - // restore set request handler logger to a higher level - requestHandlerLogger.setLevel(Level.ERROR) + def testProduceAndMultiFetchWithCompression() { + val props = producer.config.props.props + props.put("compression", "true") + val config = new ProducerConfig(props) + val producerWithCompression = new Producer[String, String](config) + produceAndMultiFetch(producerWithCompression) } - def testMultiProduce() { - createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) + private def multiProduce(producer: Producer[String, String]) { + val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0) + createSimpleTopicsAndAwaitLeader(zkClient, topics.keys) - // send some messages - val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() - var produceList: List[KeyedMessage[String, String]] = Nil - for( (topic, partition) <- topics) { + for((topic, partition) <- topics) { val messageList = List("a_" + topic, "b_" + topic) val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _)) messages += topic -> messageList producer.send(producerData:_*) builder.addFetch(topic, partition, 0, 10000) } - producer.send(produceList: _*) val request = builder.build() val response = consumer.fetch(request) - for( (topic, partition) <- topics) { + for((topic, partition) <- topics) { val fetched = response.messageSet(topic, partition) assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) } } - def testMultiProduceWithCompression() { - // send some messages - val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); - val messages = new mutable.HashMap[String, Seq[String]] - val builder = new FetchRequestBuilder() - var produceList: List[KeyedMessage[String, String]] = Nil - for( (topic, partition) <- topics) { - val messageList = List("a_" + topic, "b_" + topic) - val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _)) - messages += topic -> messageList - producer.send(producerData:_*) - builder.addFetch(topic, partition, 0, 10000) - } - producer.send(produceList: _*) + def testMultiProduce() { + val props = producer.config.props.props + val config = new ProducerConfig(props) + val noCompressionProducer = new Producer[String, String](config) + multiProduce(noCompressionProducer) + } - // wait a bit for produced message to be available - val request = builder.build() - val response = consumer.fetch(request) - for( (topic, partition) <- topics) { - val fetched = response.messageSet(topic, 0) - assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) - } + def testMultiProduceWithCompression() { + val props = producer.config.props.props + props.put("compression", "true") + val config = new ProducerConfig(props) + val producerWithCompression = new Producer[String, String](config) + multiProduce(producerWithCompression) } def testConsumerEmptyTopic() { @@ -294,16 +233,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } def testPipelinedProduceRequests() { - createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) + val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0) + createSimpleTopicsAndAwaitLeader(zkClient, topics.keys) val props = producer.config.props.props props.put("request.required.acks", "0") val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props)) // send some messages - val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() - var produceList: List[KeyedMessage[String, String]] = Nil for( (topic, partition) <- topics) { val messageList = List("a_" + topic, "b_" + topic) val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _)) @@ -338,10 +276,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with * For testing purposes, just create these topics each with one partition and one replica for * which the provided broker should the leader for. Create and wait for broker to lead. Simple. */ - def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) { + private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) { for( topic <- topics ) { - AdminUtils.createTopic(zkClient, topic, 1, 1) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) + AdminUtils.deleteTopic(zkClient, topic) + AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0, timeoutMs = 500) } } } -- 1.8.5.1 > Liars in PrimitiveApiTest that promise to test api in compression mode, but > don't do this actually > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-1079 > URL: https://issues.apache.org/jira/browse/KAFKA-1079 > Project: Kafka > Issue Type: Test > Components: core > Affects Versions: 0.8.0 > Reporter: Kostya Golikov > Priority: Minor > Labels: newbie, test > Fix For: 0.8.1 > > Attachments: testing-with-compression-producer.patch > > > Long time ago (0.7) we had ByteBufferMessageSet as a part of api and it's > allowed us to control compression. Times goes on and now PrimitiveApiTest > have methods that promise to test api with compression enabled, but in fact > they don't. Moreover this methods almost entirely copy their counterparts > without compression. In particular I'm talking about > `testProduceAndMultiFetch` / `testProduceAndMultiFetchWithCompression` and > `testMultiProduce`/`testMultiProduceWithCompression` pairs. > The fix could be super-easy and soundness -- just parameterize methods with > producer of each type (with/without compression). Sadly but it isn't feasible > for junit3, so straightforward solution is to do the same ugly thing as > `testDefaultEncoderProducerAndFetchWithCompression` method does -- forget > about class-wide producer and roll-out it's own. I will attach path if that > is a problem indeed. -- This message was sent by Atlassian JIRA (v6.1.4#6159)