dajac commented on a change in pull request #11695:
URL: https://github.com/apache/kafka/pull/11695#discussion_r795497752



##########
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##########
@@ -362,11 +375,21 @@ object ConsoleProducer {
     }
 
     private def splitHeaders(headers: String): Array[(String, Array[Byte])] = {
-      headerSeparatorPattern.split(headers).map { pair =>
-        (pair.indexOf(headerKeySeparator), ignoreError) match {
+      headersSeparatorPattern.split(headers).map { pair =>
+        (pair.indexOf(headersKeySeparator), ignoreError) match {
           case (-1, false) => throw new KafkaException(s"No header key 
separator found in pair '$pair' on line number $lineNumber")
           case (-1, true) => (pair, null)
-          case (i, _) => (pair.substring(0, i), pair.substring(i + 
headerKeySeparator.length).getBytes(StandardCharsets.UTF_8))
+          case (i, _) =>
+            val headerKey = pair.substring(0, i) match {
+              case k =>
+                if (k == nullMarker)
+                  throw new KafkaException(s"Header keys should not be equal 
to the null marker '$nullMarker' as they can't be null")
+                else k

Review comment:
       nit: We could use `case k if (k == nullMarker) =>` followed by `case k 
=>` for the default case. That seems a bit more intuitive when using pattern 
matching.

##########
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##########
@@ -302,22 +304,33 @@ object ConsoleProducer {
       if (props.containsKey("key.separator"))
         keySeparator = props.getProperty("key.separator")
       if (props.containsKey("parse.headers"))
-        parseHeader = 
props.getProperty("parse.headers").trim.equalsIgnoreCase("true")
+        parseHeaders = 
props.getProperty("parse.headers").trim.equalsIgnoreCase("true")
       if (props.containsKey("headers.delimiter"))
         headersDelimiter = props.getProperty("headers.delimiter")
       if (props.containsKey("headers.separator"))
         headersSeparator = props.getProperty("headers.separator")
-        headerSeparatorPattern = Pattern.compile(headersSeparator)
+        headersSeparatorPattern = Pattern.compile(headersSeparator)
       if (props.containsKey("headers.key.separator"))
-        headerKeySeparator = props.getProperty("headers.key.separator")
+        headersKeySeparator = props.getProperty("headers.key.separator")
       if (props.containsKey("ignore.error"))
         ignoreError = 
props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
       if (headersDelimiter == headersSeparator)
         throw new KafkaException("headers.delimiter and headers.separator may 
not be equal")
-      if (headersDelimiter == headerKeySeparator)
+      if (headersDelimiter == headersKeySeparator)
         throw new KafkaException("headers.delimiter and headers.key.separator 
may not be equal")
-      if (headersSeparator == headerKeySeparator)
+      if (headersSeparator == headersKeySeparator)
         throw new KafkaException("headers.separator and headers.key.separator 
may not be equal")
+      if (props.containsKey("null.marker")) {
+        nullMarker = props.getProperty("null.marker")
+      }

Review comment:
       nit: I would remove the curly braces here to stay inline with the style 
of the other `if`s.

##########
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##########
@@ -362,11 +375,21 @@ object ConsoleProducer {
     }
 
     private def splitHeaders(headers: String): Array[(String, Array[Byte])] = {
-      headerSeparatorPattern.split(headers).map { pair =>
-        (pair.indexOf(headerKeySeparator), ignoreError) match {
+      headersSeparatorPattern.split(headers).map { pair =>
+        (pair.indexOf(headersKeySeparator), ignoreError) match {
           case (-1, false) => throw new KafkaException(s"No header key 
separator found in pair '$pair' on line number $lineNumber")
           case (-1, true) => (pair, null)
-          case (i, _) => (pair.substring(0, i), pair.substring(i + 
headerKeySeparator.length).getBytes(StandardCharsets.UTF_8))
+          case (i, _) =>
+            val headerKey = pair.substring(0, i) match {
+              case k =>
+                if (k == nullMarker)
+                  throw new KafkaException(s"Header keys should not be equal 
to the null marker '$nullMarker' as they can't be null")
+                else k
+            }
+            val headerValue = pair.substring(i + headersKeySeparator.length) 
match {
+              case v => if (v == nullMarker) null else 
v.getBytes(StandardCharsets.UTF_8)

Review comment:
       nit: ditto.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to