[ 
https://issues.apache.org/jira/browse/KAFKA-807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dragos Manolescu updated KAFKA-807:
-----------------------------------

    Labels: patch producer  (was: )
    Status: Patch Available  (was: Open)

Index: core/build.sbt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/build.sbt      (date 1364252653000)
+++ core/build.sbt      (date 1364254450000)
@@ -18,8 +18,9 @@
 
 libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
   deps :+ (sv match {
-    case "2.8.0" => "org.scalatest" %  "scalatest" % "1.2" % "test"
+    case "2.8.0" => "org.scalatest" %  "scalatest" % "1.2"   % "test"
+    case "2.9.2" => "org.scalatest" %% "scalatest" % "1.9.1" % "test"
-    case _       => "org.scalatest" %% "scalatest" % "1.8" % "test"
+    case _       => "org.scalatest" %% "scalatest" % "1.8"   % "test"
   })
 }
 
Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/producer/ConsoleProducer.scala    (date 
1364252653000)
+++ core/src/main/scala/kafka/producer/ConsoleProducer.scala    (date 
1364254450000)
@@ -196,7 +196,7 @@
       topic = props.getProperty("topic")
       if(props.containsKey("parse.key"))
         parseKey = 
props.getProperty("parse.key").trim.toLowerCase.equals("true")
-      if(props.containsKey("key.seperator"))
+      if(props.containsKey("key.separator"))
         keySeparator = props.getProperty("key.separator")
       if(props.containsKey("ignore.error"))
         ignoreError = 
props.getProperty("ignore.error").trim.toLowerCase.equals("true")

                
> LineMessageReader doesn't correctly parse the key separator
> -----------------------------------------------------------
>
>                 Key: KAFKA-807
>                 URL: https://issues.apache.org/jira/browse/KAFKA-807
>             Project: Kafka
>          Issue Type: Bug
>          Components: tools
>    Affects Versions: 0.8
>            Reporter: Dragos Manolescu
>            Priority: Trivial
>              Labels: producer, patch
>             Fix For: 0.8
>
>
> Typo in key name prevents extracting the key separator. The patch follows; 
> what's the recommended way to submit patches?
> Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
> IDEA additional info:
> Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> <+>UTF-8
> Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
> <+>/**\n * Licensed to the Apache Software Foundation (ASF) under one or 
> more\n * contributor license agreements.  See the NOTICE file distributed 
> with\n * this work for additional information regarding copyright 
> ownership.\n * The ASF licenses this file to You under the Apache License, 
> Version 2.0\n * (the \"License\"); you may not use this file except in 
> compliance with\n * the License.  You may obtain a copy of the License at\n * 
> \n *    http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by 
> applicable law or agreed to in writing, software\n * distributed under the 
> License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR 
> CONDITIONS OF ANY KIND, either express or implied.\n * See the License for 
> the specific language governing permissions and\n * limitations under the 
> License.\n */\n\npackage kafka.producer\n\nimport 
> scala.collection.JavaConversions._\nimport joptsimple._\nimport 
> java.util.Properties\nimport java.io._\nimport kafka.common._\nimport 
> kafka.message._\nimport kafka.serializer._\n\nobject ConsoleProducer { \n\n  
> def main(args: Array[String]) { \n    val parser = new OptionParser\n    val 
> topicOpt = parser.accepts(\"topic\", \"REQUIRED: The topic id to produce 
> messages to.\")\n                           .withRequiredArg\n                
>            .describedAs(\"topic\")\n                           
> .ofType(classOf[String])\n    val brokerListOpt = 
> parser.accepts(\"broker-list\", \"REQUIRED: The broker list string in the 
> form HOST1:PORT1,HOST2:PORT2.\")\n                           
> .withRequiredArg\n                           .describedAs(\"broker-list\")\n  
>                          .ofType(classOf[String])\n    val syncOpt = 
> parser.accepts(\"sync\", \"If set message send requests to the brokers are 
> synchronously, one at a time as they arrive.\")\n    val compressOpt = 
> parser.accepts(\"compress\", \"If set, messages batches are sent 
> compressed\")\n    val batchSizeOpt = parser.accepts(\"batch-size\", \"Number 
> of messages to send in a single batch if they are not being sent 
> synchronously.\")\n                             .withRequiredArg\n            
>                  .describedAs(\"size\")\n                             
> .ofType(classOf[java.lang.Integer])\n                             
> .defaultsTo(200)\n    val sendTimeoutOpt = parser.accepts(\"timeout\", \"If 
> set and the producer is running in asynchronous mode, this gives the maximum 
> amount of time\" + \n                                                   \" a 
> message will queue awaiting suffient batch size. The value is given in 
> ms.\")\n                               .withRequiredArg\n                     
>           .describedAs(\"timeout_ms\")\n                               
> .ofType(classOf[java.lang.Long])\n                               
> .defaultsTo(1000)\n    val queueSizeOpt = parser.accepts(\"queue-size\", \"If 
> set and the producer is running in asynchronous mode, this gives the maximum 
> amount of \" + \n                                                   \" 
> messages will queue awaiting suffient batch size.\")\n                        
>        .withRequiredArg\n                               
> .describedAs(\"queue_size\")\n                               
> .ofType(classOf[java.lang.Long])\n                               
> .defaultsTo(10000)\n    val queueEnqueueTimeoutMsOpt = 
> parser.accepts(\"queue-enqueuetimeout-ms\", \"Timeout for event enqueue\")\n  
>                              .withRequiredArg\n                               
> .describedAs(\"queue enqueuetimeout ms\")\n                               
> .ofType(classOf[java.lang.Long])\n                               
> .defaultsTo(0)\n    val requestRequiredAcksOpt = 
> parser.accepts(\"request-required-acks\", \"The required acks of the producer 
> requests\")\n                               .withRequiredArg\n                
>                .describedAs(\"request required acks\")\n                      
>          .ofType(classOf[java.lang.Integer])\n                               
> .defaultsTo(0)\n    val requestTimeoutMsOpt = 
> parser.accepts(\"request-timeout-ms\", \"The ack timeout of the producer 
> requests. Value must be non-negative and non-zero\")\n                        
>        .withRequiredArg\n                               
> .describedAs(\"request timeout ms\")\n                               
> .ofType(classOf[java.lang.Integer])\n                               
> .defaultsTo(1500)\n    val valueEncoderOpt = 
> parser.accepts(\"value-serializer\", \"The class name of the message encoder 
> implementation to use for serializing values.\")\n                            
>      .withRequiredArg\n                                 
> .describedAs(\"encoder_class\")\n                                 
> .ofType(classOf[java.lang.String])\n                                 
> .defaultsTo(classOf[StringEncoder].getName)\n    val keyEncoderOpt = 
> parser.accepts(\"key-serializer\", \"The class name of the message encoder 
> implementation to use for serializing keys.\")\n                              
>    .withRequiredArg\n                                 
> .describedAs(\"encoder_class\")\n                                 
> .ofType(classOf[java.lang.String])\n                                 
> .defaultsTo(classOf[StringEncoder].getName)\n    val messageReaderOpt = 
> parser.accepts(\"line-reader\", \"The class name of the class to use for 
> reading lines from standard in. \" + \n                                       
>                    \"By default each line is read as a separate message.\")\n 
>                                  .withRequiredArg\n                           
>        .describedAs(\"reader_class\")\n                                  
> .ofType(classOf[java.lang.String])\n                                  
> .defaultsTo(classOf[LineMessageReader].getName)\n    val socketBufferSizeOpt 
> = parser.accepts(\"socket-buffer-size\", \"The size of the tcp RECV 
> size.\")\n                                  .withRequiredArg\n                
>                   .describedAs(\"size\")\n                                  
> .ofType(classOf[java.lang.Integer])\n                                  
> .defaultsTo(1024*100)\n    val propertyOpt = parser.accepts(\"property\", \"A 
> mechanism to pass user-defined properties in the form key=value to the 
> message reader. \" +\n                                                 \"This 
> allows custom configuration for a user-defined message reader.\")\n           
>                  .withRequiredArg\n                            
> .describedAs(\"prop\")\n                            
> .ofType(classOf[String])\n\n\n    val options = parser.parse(args : _*)\n    
> for(arg <- List(topicOpt, brokerListOpt)) {\n      if(!options.has(arg)) {\n  
>       System.err.println(\"Missing required argument \\\"\" + arg + 
> \"\\\"\")\n        parser.printHelpOn(System.err)\n        System.exit(1)\n   
>    }\n    }\n\n    val topic = options.valueOf(topicOpt)\n    val brokerList 
> = options.valueOf(brokerListOpt)\n    val sync = options.has(syncOpt)\n    
> val compress = options.has(compressOpt)\n    val batchSize = 
> options.valueOf(batchSizeOpt)\n    val sendTimeout = 
> options.valueOf(sendTimeoutOpt)\n    val queueSize = 
> options.valueOf(queueSizeOpt)\n    val queueEnqueueTimeoutMs = 
> options.valueOf(queueEnqueueTimeoutMsOpt)\n    val requestRequiredAcks = 
> options.valueOf(requestRequiredAcksOpt)\n    val requestTimeoutMs = 
> options.valueOf(requestTimeoutMsOpt)\n    val keyEncoderClass = 
> options.valueOf(keyEncoderOpt)\n    val valueEncoderClass = 
> options.valueOf(valueEncoderOpt)\n    val readerClass = 
> options.valueOf(messageReaderOpt)\n    val socketBuffer = 
> options.valueOf(socketBufferSizeOpt)\n    val cmdLineProps = 
> parseLineReaderArgs(options.valuesOf(propertyOpt))\n    
> cmdLineProps.put(\"topic\", topic)\n\n    val props = new Properties()\n    
> props.put(\"broker.list\", brokerList)\n    val codec = if(compress) 
> DefaultCompressionCodec.codec else NoCompressionCodec.codec\n    
> props.put(\"compression.codec\", codec.toString)\n    
> props.put(\"producer.type\", if(sync) \"sync\" else \"async\")\n    
> if(options.has(batchSizeOpt))\n      props.put(\"batch.num.messages\", 
> batchSize.toString)\n    props.put(\"queue.buffering.max.ms\", 
> sendTimeout.toString)\n    props.put(\"queue.buffering.max.messages\", 
> queueSize.toString)\n    props.put(\"queue.enqueue.timeout.ms\", 
> queueEnqueueTimeoutMs.toString)\n    props.put(\"request.required.acks\", 
> requestRequiredAcks.toString)\n    props.put(\"request.timeout.ms\", 
> requestTimeoutMs.toString)\n    props.put(\"key.serializer.class\", 
> keyEncoderClass)\n    props.put(\"serializer.class\", valueEncoderClass)\n    
> props.put(\"send.buffer.bytes\", socketBuffer.toString)\n    val reader = 
> Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, 
> AnyRef]]\n    reader.init(System.in, cmdLineProps)\n\n    try {\n        val 
> producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))\n\n        
> Runtime.getRuntime.addShutdownHook(new Thread() {\n          override def 
> run() {\n            producer.close()\n          }\n        })\n\n        var 
> message: KeyedMessage[AnyRef, AnyRef] = null\n        do {\n          message 
> = reader.readMessage()\n          if(message != null)\n            
> producer.send(message)\n        } while(message != null)\n    } catch {\n     
>  case e: Exception =>\n        e.printStackTrace\n        System.exit(1)\n    
> }\n    System.exit(0)\n  }\n\n  def parseLineReaderArgs(args: 
> Iterable[String]): Properties = {\n    val splits = args.map(_ split 
> \"=\").filterNot(_ == null).filterNot(_.length == 0)\n    
> if(!splits.forall(_.length == 2)) {\n      System.err.println(\"Invalid line 
> reader properties: \" + args.mkString(\" \"))\n      System.exit(1)\n    }\n  
>   val props = new Properties\n    for(a <- splits)\n      props.put(a(0), 
> a(1))\n    props\n  }\n\n  trait MessageReader[K,V] { \n    def 
> init(inputStream: InputStream, props: Properties) {}\n    def readMessage(): 
> KeyedMessage[K,V]\n    def close() {}\n  }\n\n  class LineMessageReader 
> extends MessageReader[String, String] {\n    var topic: String = null\n    
> var reader: BufferedReader = null\n    var parseKey = false\n    var 
> keySeparator = \"\\t\"\n    var ignoreError = false\n    var lineNumber = 
> 0\n\n    override def init(inputStream: InputStream, props: Properties) {\n   
>    topic = props.getProperty(\"topic\")\n      
> if(props.containsKey(\"parse.key\"))\n        parseKey = 
> props.getProperty(\"parse.key\").trim.toLowerCase.equals(\"true\")\n      
> if(props.containsKey(\"key.seperator\"))\n        keySeparator = 
> props.getProperty(\"key.separator\")\n      
> if(props.containsKey(\"ignore.error\"))\n        ignoreError = 
> props.getProperty(\"ignore.error\").trim.toLowerCase.equals(\"true\")\n      
> reader = new BufferedReader(new InputStreamReader(inputStream))\n    }\n\n    
> override def readMessage() = {\n      lineNumber += 1\n      
> (reader.readLine(), parseKey) match {\n        case (null, _) => null\n       
>  case (line, true) =>\n          line.indexOf(keySeparator) match {\n         
>    case -1 =>\n              if(ignoreError)\n                new 
> KeyedMessage(topic, line)\n              else\n                throw new 
> KafkaException(\"No key found on line \" + lineNumber + \": \" + line)\n      
>       case n =>\n              new KeyedMessage(topic,\n                      
>        line.substring(0, n), \n                             if(n + 
> keySeparator.size > line.size) \"\" else line.substring(n + 
> keySeparator.size))\n          }\n        case (line, false) =>\n          
> new KeyedMessage(topic, line)\n      }\n    }\n  }\n}\n
> ===================================================================
> --- core/src/main/scala/kafka/producer/ConsoleProducer.scala  (revision 
> 290d5e0eac38e9917c64353a131154821b899f26)
> +++ core/src/main/scala/kafka/producer/ConsoleProducer.scala  (revision )
> @@ -196,7 +196,7 @@
>        topic = props.getProperty("topic")
>        if(props.containsKey("parse.key"))
>          parseKey = 
> props.getProperty("parse.key").trim.toLowerCase.equals("true")
> -      if(props.containsKey("key.seperator"))
> +      if(props.containsKey("key.separator"))
>          keySeparator = props.getProperty("key.separator")
>        if(props.containsKey("ignore.error"))
>          ignoreError = 
> props.getProperty("ignore.error").trim.toLowerCase.equals("true")

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to