[ https://issues.apache.org/jira/browse/KAFKA-4811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887119#comment-15887119 ]
Jun Rao commented on KAFKA-4811: -------------------------------- In AbstractFetcherManager, we only create a new ReplicaFetcherThread when it's not in the map. However, the map is keyed off BrokerAndFetcherId, which includes host. So, when the broker host changes, it's possible that we need to create a new ReplicaFetcherThread based on the new host, even though another ReplicaFetcherThread based on the old host is still present. If that happens, the creation of the new ReplicaFetcherThread will hit the IllegalArgumentException in the description since the metric tags are based on just broker-id. One way to fix this is that in AbstractFetcherManager, we key fetcherThreadMap on just brokerIdAndFetcherId. When adding a partition, if the fetcherThread exists, we check if the host name in the fetcherThread matches the host from the input. If so, we will reuse the fetcherThread. Otherwise, we shut down the old fetcherThread and create a new one based on the new host name. > ReplicaFetchThread may fail to create due to existing metric > ------------------------------------------------------------ > > Key: KAFKA-4811 > URL: https://issues.apache.org/jira/browse/KAFKA-4811 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.2.0 > Reporter: Jun Rao > > Someone reported the following error. > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, group=replica-fetcher-metrics, > description=Connections closed per second in the window., tags={broker-id=1, > fetcher-id=0}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:433) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:680) > at org.apache.kafka.common.network.Selector.<init>(Selector.java:140) > at > kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:86) > at > kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35) > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83) > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78) > at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:882) > at > kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:700) > at > kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148) > at kafka.server.KafkaApis.handle(KafkaApis.scala:84) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:62) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)