ijuma commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r659764053
##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -86,25 +88,30 @@ object ConfigEntityName {
*/
class DynamicConfigManager(private val zkClient: KafkaZkClient,
private val configHandlers: Map[String,
ConfigHandler],
- private val changeExpirationMs: Long = 15*60*1000,
+ private val changeExpirationMs: Long = 15 * 60 *
1000,
private val time: Time = Time.SYSTEM) extends
Logging {
val adminZkClient = new AdminZkClient(zkClient)
object ConfigChangedNotificationHandler extends NotificationHandler {
override def processNotification(jsonBytes: Array[Byte]) = {
- // Ignore non-json notifications because they can be from the deprecated
TopicConfigManager
- Json.parseBytes(jsonBytes).foreach { js =>
- val jsObject = js.asJsonObjectOption.getOrElse {
- throw new IllegalArgumentException("Config change notification has
an unexpected value. The format is:" +
- """{"version" : 1, "entity_type":"topics/clients", "entity_name" :
"topic_name/client_id"} or """ +
- """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
- s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
- }
- jsObject("version").to[Int] match {
- case 1 => processEntityConfigChangeVersion1(jsonBytes, jsObject)
- case 2 => processEntityConfigChangeVersion2(jsonBytes, jsObject)
- case version => throw new IllegalArgumentException("Config change
notification has unsupported version " +
- s"'$version', supported versions are 1 and 2.")
+ val jsonValue = Json.parseBytes(jsonBytes)
+ if (jsonValue.isEmpty) {
+ // Ignore non-json notifications because they can be from the
deprecated TopicConfigManager
+ warn(s"The non-json notifications are ignored.")
Review comment:
What is the user expected to do with this information? Also, when would
we get a "not json" parameter?
##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient:
KafkaZkClient,
}
private val configChangeListener = new
ZkNodeChangeNotificationListener(zkClient,
ConfigEntityChangeNotificationZNode.path,
- ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix,
ConfigChangedNotificationHandler)
+ ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix,
ConfigChangedNotificationHandler, changeExpirationMs, time)
Review comment:
Default arguments are dangerous for things like this, I would remove
them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]