[ https://issues.apache.org/jira/browse/KAFKA-807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dragos Manolescu updated KAFKA-807: ----------------------------------- Labels: patch producer (was: ) Status: Patch Available (was: Open) Index: core/build.sbt IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/build.sbt (date 1364252653000) +++ core/build.sbt (date 1364254450000) @@ -18,8 +18,9 @@ libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) => deps :+ (sv match { - case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test" + case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test" + case "2.9.2" => "org.scalatest" %% "scalatest" % "1.9.1" % "test" - case _ => "org.scalatest" %% "scalatest" % "1.8" % "test" + case _ => "org.scalatest" %% "scalatest" % "1.8" % "test" }) } Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/producer/ConsoleProducer.scala (date 1364252653000) +++ core/src/main/scala/kafka/producer/ConsoleProducer.scala (date 1364254450000) @@ -196,7 +196,7 @@ topic = props.getProperty("topic") if(props.containsKey("parse.key")) parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.seperator")) + if(props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator") if(props.containsKey("ignore.error")) ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") > LineMessageReader doesn't correctly parse the key separator > ----------------------------------------------------------- > > Key: KAFKA-807 > URL: https://issues.apache.org/jira/browse/KAFKA-807 > Project: Kafka > Issue Type: Bug > Components: tools > Affects Versions: 0.8 > Reporter: Dragos Manolescu > Priority: Trivial > Labels: producer, patch > Fix For: 0.8 > > > Typo in key name prevents extracting the key separator. The patch follows; > what's the recommended way to submit patches? > Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala > IDEA additional info: > Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP > <+>UTF-8 > Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP > <+>/**\n * Licensed to the Apache Software Foundation (ASF) under one or > more\n * contributor license agreements. See the NOTICE file distributed > with\n * this work for additional information regarding copyright > ownership.\n * The ASF licenses this file to You under the Apache License, > Version 2.0\n * (the \"License\"); you may not use this file except in > compliance with\n * the License. You may obtain a copy of the License at\n * > \n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by > applicable law or agreed to in writing, software\n * distributed under the > License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR > CONDITIONS OF ANY KIND, either express or implied.\n * See the License for > the specific language governing permissions and\n * limitations under the > License.\n */\n\npackage kafka.producer\n\nimport > scala.collection.JavaConversions._\nimport joptsimple._\nimport > java.util.Properties\nimport java.io._\nimport kafka.common._\nimport > kafka.message._\nimport kafka.serializer._\n\nobject ConsoleProducer { \n\n > def main(args: Array[String]) { \n val parser = new OptionParser\n val > topicOpt = parser.accepts(\"topic\", \"REQUIRED: The topic id to produce > messages to.\")\n .withRequiredArg\n > .describedAs(\"topic\")\n > .ofType(classOf[String])\n val brokerListOpt = > parser.accepts(\"broker-list\", \"REQUIRED: The broker list string in the > form HOST1:PORT1,HOST2:PORT2.\")\n > .withRequiredArg\n .describedAs(\"broker-list\")\n > .ofType(classOf[String])\n val syncOpt = > parser.accepts(\"sync\", \"If set message send requests to the brokers are > synchronously, one at a time as they arrive.\")\n val compressOpt = > parser.accepts(\"compress\", \"If set, messages batches are sent > compressed\")\n val batchSizeOpt = parser.accepts(\"batch-size\", \"Number > of messages to send in a single batch if they are not being sent > synchronously.\")\n .withRequiredArg\n > .describedAs(\"size\")\n > .ofType(classOf[java.lang.Integer])\n > .defaultsTo(200)\n val sendTimeoutOpt = parser.accepts(\"timeout\", \"If > set and the producer is running in asynchronous mode, this gives the maximum > amount of time\" + \n \" a > message will queue awaiting suffient batch size. The value is given in > ms.\")\n .withRequiredArg\n > .describedAs(\"timeout_ms\")\n > .ofType(classOf[java.lang.Long])\n > .defaultsTo(1000)\n val queueSizeOpt = parser.accepts(\"queue-size\", \"If > set and the producer is running in asynchronous mode, this gives the maximum > amount of \" + \n \" > messages will queue awaiting suffient batch size.\")\n > .withRequiredArg\n > .describedAs(\"queue_size\")\n > .ofType(classOf[java.lang.Long])\n > .defaultsTo(10000)\n val queueEnqueueTimeoutMsOpt = > parser.accepts(\"queue-enqueuetimeout-ms\", \"Timeout for event enqueue\")\n > .withRequiredArg\n > .describedAs(\"queue enqueuetimeout ms\")\n > .ofType(classOf[java.lang.Long])\n > .defaultsTo(0)\n val requestRequiredAcksOpt = > parser.accepts(\"request-required-acks\", \"The required acks of the producer > requests\")\n .withRequiredArg\n > .describedAs(\"request required acks\")\n > .ofType(classOf[java.lang.Integer])\n > .defaultsTo(0)\n val requestTimeoutMsOpt = > parser.accepts(\"request-timeout-ms\", \"The ack timeout of the producer > requests. Value must be non-negative and non-zero\")\n > .withRequiredArg\n > .describedAs(\"request timeout ms\")\n > .ofType(classOf[java.lang.Integer])\n > .defaultsTo(1500)\n val valueEncoderOpt = > parser.accepts(\"value-serializer\", \"The class name of the message encoder > implementation to use for serializing values.\")\n > .withRequiredArg\n > .describedAs(\"encoder_class\")\n > .ofType(classOf[java.lang.String])\n > .defaultsTo(classOf[StringEncoder].getName)\n val keyEncoderOpt = > parser.accepts(\"key-serializer\", \"The class name of the message encoder > implementation to use for serializing keys.\")\n > .withRequiredArg\n > .describedAs(\"encoder_class\")\n > .ofType(classOf[java.lang.String])\n > .defaultsTo(classOf[StringEncoder].getName)\n val messageReaderOpt = > parser.accepts(\"line-reader\", \"The class name of the class to use for > reading lines from standard in. \" + \n > \"By default each line is read as a separate message.\")\n > .withRequiredArg\n > .describedAs(\"reader_class\")\n > .ofType(classOf[java.lang.String])\n > .defaultsTo(classOf[LineMessageReader].getName)\n val socketBufferSizeOpt > = parser.accepts(\"socket-buffer-size\", \"The size of the tcp RECV > size.\")\n .withRequiredArg\n > .describedAs(\"size\")\n > .ofType(classOf[java.lang.Integer])\n > .defaultsTo(1024*100)\n val propertyOpt = parser.accepts(\"property\", \"A > mechanism to pass user-defined properties in the form key=value to the > message reader. \" +\n \"This > allows custom configuration for a user-defined message reader.\")\n > .withRequiredArg\n > .describedAs(\"prop\")\n > .ofType(classOf[String])\n\n\n val options = parser.parse(args : _*)\n > for(arg <- List(topicOpt, brokerListOpt)) {\n if(!options.has(arg)) {\n > System.err.println(\"Missing required argument \\\"\" + arg + > \"\\\"\")\n parser.printHelpOn(System.err)\n System.exit(1)\n > }\n }\n\n val topic = options.valueOf(topicOpt)\n val brokerList > = options.valueOf(brokerListOpt)\n val sync = options.has(syncOpt)\n > val compress = options.has(compressOpt)\n val batchSize = > options.valueOf(batchSizeOpt)\n val sendTimeout = > options.valueOf(sendTimeoutOpt)\n val queueSize = > options.valueOf(queueSizeOpt)\n val queueEnqueueTimeoutMs = > options.valueOf(queueEnqueueTimeoutMsOpt)\n val requestRequiredAcks = > options.valueOf(requestRequiredAcksOpt)\n val requestTimeoutMs = > options.valueOf(requestTimeoutMsOpt)\n val keyEncoderClass = > options.valueOf(keyEncoderOpt)\n val valueEncoderClass = > options.valueOf(valueEncoderOpt)\n val readerClass = > options.valueOf(messageReaderOpt)\n val socketBuffer = > options.valueOf(socketBufferSizeOpt)\n val cmdLineProps = > parseLineReaderArgs(options.valuesOf(propertyOpt))\n > cmdLineProps.put(\"topic\", topic)\n\n val props = new Properties()\n > props.put(\"broker.list\", brokerList)\n val codec = if(compress) > DefaultCompressionCodec.codec else NoCompressionCodec.codec\n > props.put(\"compression.codec\", codec.toString)\n > props.put(\"producer.type\", if(sync) \"sync\" else \"async\")\n > if(options.has(batchSizeOpt))\n props.put(\"batch.num.messages\", > batchSize.toString)\n props.put(\"queue.buffering.max.ms\", > sendTimeout.toString)\n props.put(\"queue.buffering.max.messages\", > queueSize.toString)\n props.put(\"queue.enqueue.timeout.ms\", > queueEnqueueTimeoutMs.toString)\n props.put(\"request.required.acks\", > requestRequiredAcks.toString)\n props.put(\"request.timeout.ms\", > requestTimeoutMs.toString)\n props.put(\"key.serializer.class\", > keyEncoderClass)\n props.put(\"serializer.class\", valueEncoderClass)\n > props.put(\"send.buffer.bytes\", socketBuffer.toString)\n val reader = > Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, > AnyRef]]\n reader.init(System.in, cmdLineProps)\n\n try {\n val > producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))\n\n > Runtime.getRuntime.addShutdownHook(new Thread() {\n override def > run() {\n producer.close()\n }\n })\n\n var > message: KeyedMessage[AnyRef, AnyRef] = null\n do {\n message > = reader.readMessage()\n if(message != null)\n > producer.send(message)\n } while(message != null)\n } catch {\n > case e: Exception =>\n e.printStackTrace\n System.exit(1)\n > }\n System.exit(0)\n }\n\n def parseLineReaderArgs(args: > Iterable[String]): Properties = {\n val splits = args.map(_ split > \"=\").filterNot(_ == null).filterNot(_.length == 0)\n > if(!splits.forall(_.length == 2)) {\n System.err.println(\"Invalid line > reader properties: \" + args.mkString(\" \"))\n System.exit(1)\n }\n > val props = new Properties\n for(a <- splits)\n props.put(a(0), > a(1))\n props\n }\n\n trait MessageReader[K,V] { \n def > init(inputStream: InputStream, props: Properties) {}\n def readMessage(): > KeyedMessage[K,V]\n def close() {}\n }\n\n class LineMessageReader > extends MessageReader[String, String] {\n var topic: String = null\n > var reader: BufferedReader = null\n var parseKey = false\n var > keySeparator = \"\\t\"\n var ignoreError = false\n var lineNumber = > 0\n\n override def init(inputStream: InputStream, props: Properties) {\n > topic = props.getProperty(\"topic\")\n > if(props.containsKey(\"parse.key\"))\n parseKey = > props.getProperty(\"parse.key\").trim.toLowerCase.equals(\"true\")\n > if(props.containsKey(\"key.seperator\"))\n keySeparator = > props.getProperty(\"key.separator\")\n > if(props.containsKey(\"ignore.error\"))\n ignoreError = > props.getProperty(\"ignore.error\").trim.toLowerCase.equals(\"true\")\n > reader = new BufferedReader(new InputStreamReader(inputStream))\n }\n\n > override def readMessage() = {\n lineNumber += 1\n > (reader.readLine(), parseKey) match {\n case (null, _) => null\n > case (line, true) =>\n line.indexOf(keySeparator) match {\n > case -1 =>\n if(ignoreError)\n new > KeyedMessage(topic, line)\n else\n throw new > KafkaException(\"No key found on line \" + lineNumber + \": \" + line)\n > case n =>\n new KeyedMessage(topic,\n > line.substring(0, n), \n if(n + > keySeparator.size > line.size) \"\" else line.substring(n + > keySeparator.size))\n }\n case (line, false) =>\n > new KeyedMessage(topic, line)\n }\n }\n }\n}\n > =================================================================== > --- core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision > 290d5e0eac38e9917c64353a131154821b899f26) > +++ core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision ) > @@ -196,7 +196,7 @@ > topic = props.getProperty("topic") > if(props.containsKey("parse.key")) > parseKey = > props.getProperty("parse.key").trim.toLowerCase.equals("true") > - if(props.containsKey("key.seperator")) > + if(props.containsKey("key.separator")) > keySeparator = props.getProperty("key.separator") > if(props.containsKey("ignore.error")) > ignoreError = > props.getProperty("ignore.error").trim.toLowerCase.equals("true") -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira