badaiaqrandista commented on a change in pull request #9099: URL: https://github.com/apache/kafka/pull/9099#discussion_r465105476
########## File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala ########## @@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter { var printKey = false var printValue = true var printPartition = false - var keySeparator = "\t".getBytes(StandardCharsets.UTF_8) - var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8) + var printOffset = false + var printHeaders = false + var keySeparator = utfBytes("\t") + var lineSeparator = utfBytes("\n") + var headersSeparator = utfBytes(",") + var nullLiteral = utfBytes("null") var keyDeserializer: Option[Deserializer[_]] = None var valueDeserializer: Option[Deserializer[_]] = None - - override def configure(configs: Map[String, _]): Unit = { - val props = new java.util.Properties() - configs.asScala.foreach { case (key, value) => props.put(key, value.toString) } - if (props.containsKey("print.timestamp")) - printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true") - if (props.containsKey("print.key")) - printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true") - if (props.containsKey("print.value")) - printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true") - if (props.containsKey("print.partition")) - printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true") - if (props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8) - if (props.containsKey("line.separator")) - lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8) - // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` - if (props.containsKey("key.deserializer")) { - keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor() - .newInstance().asInstanceOf[Deserializer[_]]) - keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true) - } - // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` - if (props.containsKey("value.deserializer")) { - valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor() - .newInstance().asInstanceOf[Deserializer[_]]) - valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false) - } - } - - private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = { - val newProps = new Properties() - props.asScala.foreach { case (key, value) => - if (key.startsWith(prefix) && key.length > prefix.length) - newProps.put(key.substring(prefix.length), value) - } - newProps + var headersDeserializer: Option[Deserializer[_]] = None + + override def init(props: Properties): Unit = { Review comment: @dajac I have replaced `init` with `configure` and changed code to extract the values directly from `Map`. Please review again. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org