Dragos Manolescu created KAFKA-807:
--------------------------------------

             Summary: LineMessageReader doesn't correctly parse the key 
separator
                 Key: KAFKA-807
                 URL: https://issues.apache.org/jira/browse/KAFKA-807
             Project: Kafka
          Issue Type: Bug
          Components: tools
    Affects Versions: 0.8
            Reporter: Dragos Manolescu
            Priority: Trivial
             Fix For: 0.8


Typo in key name prevents extracting the key separator. The patch follows; 
what's the recommended way to submit patches?

Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
<+>/**\n * Licensed to the Apache Software Foundation (ASF) under one or more\n 
* contributor license agreements.  See the NOTICE file distributed with\n * 
this work for additional information regarding copyright ownership.\n * The ASF 
licenses this file to You under the Apache License, Version 2.0\n * (the 
\"License\"); you may not use this file except in compliance with\n * the 
License.  You may obtain a copy of the License at\n * \n *    
http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by 
applicable law or agreed to in writing, software\n * distributed under the 
License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the 
specific language governing permissions and\n * limitations under the 
License.\n */\n\npackage kafka.producer\n\nimport 
scala.collection.JavaConversions._\nimport joptsimple._\nimport 
java.util.Properties\nimport java.io._\nimport kafka.common._\nimport 
kafka.message._\nimport kafka.serializer._\n\nobject ConsoleProducer { \n\n  
def main(args: Array[String]) { \n    val parser = new OptionParser\n    val 
topicOpt = parser.accepts(\"topic\", \"REQUIRED: The topic id to produce 
messages to.\")\n                           .withRequiredArg\n                  
         .describedAs(\"topic\")\n                           
.ofType(classOf[String])\n    val brokerListOpt = 
parser.accepts(\"broker-list\", \"REQUIRED: The broker list string in the form 
HOST1:PORT1,HOST2:PORT2.\")\n                           .withRequiredArg\n      
                     .describedAs(\"broker-list\")\n                           
.ofType(classOf[String])\n    val syncOpt = parser.accepts(\"sync\", \"If set 
message send requests to the brokers are synchronously, one at a time as they 
arrive.\")\n    val compressOpt = parser.accepts(\"compress\", \"If set, 
messages batches are sent compressed\")\n    val batchSizeOpt = 
parser.accepts(\"batch-size\", \"Number of messages to send in a single batch 
if they are not being sent synchronously.\")\n                             
.withRequiredArg\n                             .describedAs(\"size\")\n         
                    .ofType(classOf[java.lang.Integer])\n                       
      .defaultsTo(200)\n    val sendTimeoutOpt = parser.accepts(\"timeout\", 
\"If set and the producer is running in asynchronous mode, this gives the 
maximum amount of time\" + \n                                                   
\" a message will queue awaiting suffient batch size. The value is given in 
ms.\")\n                               .withRequiredArg\n                       
        .describedAs(\"timeout_ms\")\n                               
.ofType(classOf[java.lang.Long])\n                               
.defaultsTo(1000)\n    val queueSizeOpt = parser.accepts(\"queue-size\", \"If 
set and the producer is running in asynchronous mode, this gives the maximum 
amount of \" + \n                                                   \" messages 
will queue awaiting suffient batch size.\")\n                               
.withRequiredArg\n                               .describedAs(\"queue_size\")\n 
                              .ofType(classOf[java.lang.Long])\n                
               .defaultsTo(10000)\n    val queueEnqueueTimeoutMsOpt = 
parser.accepts(\"queue-enqueuetimeout-ms\", \"Timeout for event enqueue\")\n    
                           .withRequiredArg\n                               
.describedAs(\"queue enqueuetimeout ms\")\n                               
.ofType(classOf[java.lang.Long])\n                               
.defaultsTo(0)\n    val requestRequiredAcksOpt = 
parser.accepts(\"request-required-acks\", \"The required acks of the producer 
requests\")\n                               .withRequiredArg\n                  
             .describedAs(\"request required acks\")\n                          
     .ofType(classOf[java.lang.Integer])\n                               
.defaultsTo(0)\n    val requestTimeoutMsOpt = 
parser.accepts(\"request-timeout-ms\", \"The ack timeout of the producer 
requests. Value must be non-negative and non-zero\")\n                          
     .withRequiredArg\n                               .describedAs(\"request 
timeout ms\")\n                               
.ofType(classOf[java.lang.Integer])\n                               
.defaultsTo(1500)\n    val valueEncoderOpt = 
parser.accepts(\"value-serializer\", \"The class name of the message encoder 
implementation to use for serializing values.\")\n                              
   .withRequiredArg\n                                 
.describedAs(\"encoder_class\")\n                                 
.ofType(classOf[java.lang.String])\n                                 
.defaultsTo(classOf[StringEncoder].getName)\n    val keyEncoderOpt = 
parser.accepts(\"key-serializer\", \"The class name of the message encoder 
implementation to use for serializing keys.\")\n                                
 .withRequiredArg\n                                 
.describedAs(\"encoder_class\")\n                                 
.ofType(classOf[java.lang.String])\n                                 
.defaultsTo(classOf[StringEncoder].getName)\n    val messageReaderOpt = 
parser.accepts(\"line-reader\", \"The class name of the class to use for 
reading lines from standard in. \" + \n                                         
                 \"By default each line is read as a separate message.\")\n     
                             .withRequiredArg\n                                 
 .describedAs(\"reader_class\")\n                                  
.ofType(classOf[java.lang.String])\n                                  
.defaultsTo(classOf[LineMessageReader].getName)\n    val socketBufferSizeOpt = 
parser.accepts(\"socket-buffer-size\", \"The size of the tcp RECV size.\")\n    
                              .withRequiredArg\n                                
  .describedAs(\"size\")\n                                  
.ofType(classOf[java.lang.Integer])\n                                  
.defaultsTo(1024*100)\n    val propertyOpt = parser.accepts(\"property\", \"A 
mechanism to pass user-defined properties in the form key=value to the message 
reader. \" +\n                                                 \"This allows 
custom configuration for a user-defined message reader.\")\n                    
        .withRequiredArg\n                            .describedAs(\"prop\")\n  
                          .ofType(classOf[String])\n\n\n    val options = 
parser.parse(args : _*)\n    for(arg <- List(topicOpt, brokerListOpt)) {\n      
if(!options.has(arg)) {\n        System.err.println(\"Missing required argument 
\\\"\" + arg + \"\\\"\")\n        parser.printHelpOn(System.err)\n        
System.exit(1)\n      }\n    }\n\n    val topic = options.valueOf(topicOpt)\n   
 val brokerList = options.valueOf(brokerListOpt)\n    val sync = 
options.has(syncOpt)\n    val compress = options.has(compressOpt)\n    val 
batchSize = options.valueOf(batchSizeOpt)\n    val sendTimeout = 
options.valueOf(sendTimeoutOpt)\n    val queueSize = 
options.valueOf(queueSizeOpt)\n    val queueEnqueueTimeoutMs = 
options.valueOf(queueEnqueueTimeoutMsOpt)\n    val requestRequiredAcks = 
options.valueOf(requestRequiredAcksOpt)\n    val requestTimeoutMs = 
options.valueOf(requestTimeoutMsOpt)\n    val keyEncoderClass = 
options.valueOf(keyEncoderOpt)\n    val valueEncoderClass = 
options.valueOf(valueEncoderOpt)\n    val readerClass = 
options.valueOf(messageReaderOpt)\n    val socketBuffer = 
options.valueOf(socketBufferSizeOpt)\n    val cmdLineProps = 
parseLineReaderArgs(options.valuesOf(propertyOpt))\n    
cmdLineProps.put(\"topic\", topic)\n\n    val props = new Properties()\n    
props.put(\"broker.list\", brokerList)\n    val codec = if(compress) 
DefaultCompressionCodec.codec else NoCompressionCodec.codec\n    
props.put(\"compression.codec\", codec.toString)\n    
props.put(\"producer.type\", if(sync) \"sync\" else \"async\")\n    
if(options.has(batchSizeOpt))\n      props.put(\"batch.num.messages\", 
batchSize.toString)\n    props.put(\"queue.buffering.max.ms\", 
sendTimeout.toString)\n    props.put(\"queue.buffering.max.messages\", 
queueSize.toString)\n    props.put(\"queue.enqueue.timeout.ms\", 
queueEnqueueTimeoutMs.toString)\n    props.put(\"request.required.acks\", 
requestRequiredAcks.toString)\n    props.put(\"request.timeout.ms\", 
requestTimeoutMs.toString)\n    props.put(\"key.serializer.class\", 
keyEncoderClass)\n    props.put(\"serializer.class\", valueEncoderClass)\n    
props.put(\"send.buffer.bytes\", socketBuffer.toString)\n    val reader = 
Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, 
AnyRef]]\n    reader.init(System.in, cmdLineProps)\n\n    try {\n        val 
producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))\n\n        
Runtime.getRuntime.addShutdownHook(new Thread() {\n          override def run() 
{\n            producer.close()\n          }\n        })\n\n        var 
message: KeyedMessage[AnyRef, AnyRef] = null\n        do {\n          message = 
reader.readMessage()\n          if(message != null)\n            
producer.send(message)\n        } while(message != null)\n    } catch {\n      
case e: Exception =>\n        e.printStackTrace\n        System.exit(1)\n    
}\n    System.exit(0)\n  }\n\n  def parseLineReaderArgs(args: 
Iterable[String]): Properties = {\n    val splits = args.map(_ split 
\"=\").filterNot(_ == null).filterNot(_.length == 0)\n    
if(!splits.forall(_.length == 2)) {\n      System.err.println(\"Invalid line 
reader properties: \" + args.mkString(\" \"))\n      System.exit(1)\n    }\n    
val props = new Properties\n    for(a <- splits)\n      props.put(a(0), a(1))\n 
   props\n  }\n\n  trait MessageReader[K,V] { \n    def init(inputStream: 
InputStream, props: Properties) {}\n    def readMessage(): KeyedMessage[K,V]\n  
  def close() {}\n  }\n\n  class LineMessageReader extends 
MessageReader[String, String] {\n    var topic: String = null\n    var reader: 
BufferedReader = null\n    var parseKey = false\n    var keySeparator = 
\"\\t\"\n    var ignoreError = false\n    var lineNumber = 0\n\n    override 
def init(inputStream: InputStream, props: Properties) {\n      topic = 
props.getProperty(\"topic\")\n      if(props.containsKey(\"parse.key\"))\n      
  parseKey = 
props.getProperty(\"parse.key\").trim.toLowerCase.equals(\"true\")\n      
if(props.containsKey(\"key.seperator\"))\n        keySeparator = 
props.getProperty(\"key.separator\")\n      
if(props.containsKey(\"ignore.error\"))\n        ignoreError = 
props.getProperty(\"ignore.error\").trim.toLowerCase.equals(\"true\")\n      
reader = new BufferedReader(new InputStreamReader(inputStream))\n    }\n\n    
override def readMessage() = {\n      lineNumber += 1\n      
(reader.readLine(), parseKey) match {\n        case (null, _) => null\n        
case (line, true) =>\n          line.indexOf(keySeparator) match {\n            
case -1 =>\n              if(ignoreError)\n                new 
KeyedMessage(topic, line)\n              else\n                throw new 
KafkaException(\"No key found on line \" + lineNumber + \": \" + line)\n        
    case n =>\n              new KeyedMessage(topic,\n                          
   line.substring(0, n), \n                             if(n + 
keySeparator.size > line.size) \"\" else line.substring(n + 
keySeparator.size))\n          }\n        case (line, false) =>\n          new 
KeyedMessage(topic, line)\n      }\n    }\n  }\n}\n
===================================================================
--- core/src/main/scala/kafka/producer/ConsoleProducer.scala    (revision 
290d5e0eac38e9917c64353a131154821b899f26)
+++ core/src/main/scala/kafka/producer/ConsoleProducer.scala    (revision )
@@ -196,7 +196,7 @@
       topic = props.getProperty("topic")
       if(props.containsKey("parse.key"))
         parseKey = 
props.getProperty("parse.key").trim.toLowerCase.equals("true")
-      if(props.containsKey("key.seperator"))
+      if(props.containsKey("key.separator"))
         keySeparator = props.getProperty("key.separator")
       if(props.containsKey("ignore.error"))
         ignoreError = 
props.getProperty("ignore.error").trim.toLowerCase.equals("true")


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to