[ https://issues.apache.org/jira/browse/KAFKA-8066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-8066. ------------------------------------ Resolution: Fixed Fix Version/s: 2.2.1 2.1.2 2.0.2 > ReplicaFetcherThread fails to startup because of failing to register the > metric. > -------------------------------------------------------------------------------- > > Key: KAFKA-8066 > URL: https://issues.apache.org/jira/browse/KAFKA-8066 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.1.0, 1.1.1, 1.1.2, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, > 2.0.2, 2.1.2 > Reporter: Zhanxiang (Patrick) Huang > Assignee: Zhanxiang (Patrick) Huang > Priority: Major > Fix For: 2.0.2, 2.1.2, 2.2.1 > > > After KAFKA-6051, we close leaderEndPoint in replica fetcher thread > initiateShutdown to try to preempt in-progress fetch request and accelerate > repica fetcher thread shutdown. However, the selector may fail to close the > channel and throw an Exception when the replica fetcher thread is still > actively fetching. In this case, the sensor will not be cleaned up. > Basically, if `close(id)` throws an exception in `Selector.close()`, then > `sensors.close()` will not be called and thus the sensors will not get > unregistered (See codes below). > {code:java} > public void close() { > List<String> connections = new ArrayList<>(channels.keySet()); > for (String id : connections) > close(id); > try { > this.nioSelector.close(); > } catch (IOException | SecurityException e) { > log.error("Exception closing nioSelector:", e); > } > sensors.close(); > channelBuilder.close(); > } > {code} > If this happen, when the broker want to start up the ReplicaFetcherThread > with the same fetch id to the same destination broker again (e.g. due to > leadership changes or new partitions get created), the ReplicaFetcherThread > will fail to start up because the selector will throw an > IllegalArgumentException if the metric with the same name already exists: > {noformat} > 2019/02/27 10:24:26.938 ERROR [KafkaApis] [kafka-request-handler-6] > [kafka-server] [] [KafkaApi-38031] Error when handling request {} > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-count, group=replica-fetcher-metrics, description=The > current number of active connections., tags={broker-id=29712, fetcher-id=3}]' > already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:559) > ~[kafka-clients-2.0.0.66.jar:?] > at > org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:502) > ~[kafka-clients-2.0.0.66.jar:?] > at > org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:485) > ~[kafka-clients-2.0.0.66.jar:?] > at > org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:470) > ~[kafka-clients-2.0.0.66.jar:?] > at > org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:963) > ~[kafka-clients-2.0.0.66.jar:?] > at org.apache.kafka.common.network.Selector.<init>(Selector.java:170) > ~[kafka-clients-2.0.0.66.jar:?] > at org.apache.kafka.common.network.Selector.<init>(Selector.java:188) > ~[kafka-clients-2.0.0.66.jar:?] > at > kafka.server.ReplicaFetcherBlockingSend.<init>(ReplicaFetcherBlockingSend.scala:61) > ~[kafka_2.11-2.0.0.66.jar:?] > at > kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68) > ~[kafka_2.11-2.0.0.66.jar:?] > at > kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68) > ~[kafka_2.11-2.0.0.66.jar:?] > at scala.Option.getOrElse(Option.scala:121) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:67) > ~[kafka_2.11-2.0.0.66.jar:?] > at > kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:32) > ~[kafka_2.11-2.0.0.66.jar:?] > at > kafka.server.AbstractFetcherManager.kafka$server$AbstractFetcherManager$$addAndStartFetcherThread$1(AbstractFetcherManager.scala:132) > ~[kafka_2.11-2.0.0.66.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:146) > ~[kafka_2.11-2.0.0.66.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:137) > ~[kafka_2.11-2.0.0.66.jar:?] > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > ~[scala-library-2.11.12.jar:?] > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:137) > ~[kafka_2.11-2.0.0.66.jar:?] > at > kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:1333) > ~[kafka_2.11-2.0.0.66.jar:?] > at > kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1107) > ~[kafka_2.11-2.0.0.66.jar:?] > at > kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:194) > ~[kafka_2.11-2.0.0.66.jar:?] > at kafka.server.KafkaApis.handle(KafkaApis.scala:110) > ~[kafka_2.11-2.0.0.66.jar:?] > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > ~[kafka_2.11-2.0.0.66.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] > {noformat} > The fix should be adding a try-finally block for selector.close() to make > sure sensors.close() will be called even an exception is thrown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)