ijuma commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r659764053



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -86,25 +88,30 @@ object ConfigEntityName {
  */
 class DynamicConfigManager(private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, 
ConfigHandler],
-                           private val changeExpirationMs: Long = 15*60*1000,
+                           private val changeExpirationMs: Long = 15 * 60 * 
1000,
                            private val time: Time = Time.SYSTEM) extends 
Logging {
   val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(jsonBytes: Array[Byte]) = {
-      // Ignore non-json notifications because they can be from the deprecated 
TopicConfigManager
-      Json.parseBytes(jsonBytes).foreach { js =>
-        val jsObject = js.asJsonObjectOption.getOrElse {
-          throw new IllegalArgumentException("Config change notification has 
an unexpected value. The format is:" +
-            """{"version" : 1, "entity_type":"topics/clients", "entity_name" : 
"topic_name/client_id"} or """ +
-            """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
-            s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
-        }
-        jsObject("version").to[Int] match {
-          case 1 => processEntityConfigChangeVersion1(jsonBytes, jsObject)
-          case 2 => processEntityConfigChangeVersion2(jsonBytes, jsObject)
-          case version => throw new IllegalArgumentException("Config change 
notification has unsupported version " +
-            s"'$version', supported versions are 1 and 2.")
+      val jsonValue = Json.parseBytes(jsonBytes)
+      if (jsonValue.isEmpty) {
+        // Ignore non-json notifications because they can be from the 
deprecated TopicConfigManager
+        warn(s"The non-json notifications are ignored.")

Review comment:
       What is the user expected to do with this information? Also, when would 
we get a "not json" parameter?

##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: 
KafkaZkClient,
   }
 
   private val configChangeListener = new 
ZkNodeChangeNotificationListener(zkClient, 
ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, 
ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, 
ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       Default arguments are dangerous for things like this, I would remove 
them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to