[ 
https://issues.apache.org/jira/browse/KAFKA-4811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887119#comment-15887119
 ] 

Jun Rao commented on KAFKA-4811:
--------------------------------

In AbstractFetcherManager, we only create a new ReplicaFetcherThread when it's 
not in the map. However, the map is keyed off BrokerAndFetcherId, which 
includes host. So, when the broker host changes, it's possible that we need to 
create a new ReplicaFetcherThread based on the new host, even though another 
ReplicaFetcherThread based on the old host is still present. If that happens, 
the creation of the new ReplicaFetcherThread will hit the 
IllegalArgumentException in the description since the metric tags are based on 
just broker-id.

One way to fix this is that in AbstractFetcherManager, we key fetcherThreadMap 
on just brokerIdAndFetcherId. When adding a partition, if the fetcherThread 
exists, we check if the host name in the fetcherThread matches the host from 
the input. If so, we will reuse the fetcherThread. Otherwise, we shut down the 
old fetcherThread and create a new one based on the new host name.

> ReplicaFetchThread may fail to create due to existing metric
> ------------------------------------------------------------
>
>                 Key: KAFKA-4811
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4811
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.2.0
>            Reporter: Jun Rao
>
> Someone reported the following error.
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, group=replica-fetcher-metrics, 
> description=Connections closed per second in the window., tags={broker-id=1, 
> fetcher-id=0}]' already exists, can't register another one.
>         at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:433)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
>         at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:680)
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:140)
>         at 
> kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:86)
>         at 
> kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:35)
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at 
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
>         at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:882)
>         at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:700)
>         at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:84)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:62)
>         at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to