[ 
https://issues.apache.org/jira/browse/KAFKA-8066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787597#comment-16787597
 ] 

ASF GitHub Bot commented on KAFKA-8066:
---------------------------------------

hzxa21 commented on pull request #6402: KAFKA-8066: Always close the sensors in 
Selector.close()
URL: https://github.com/apache/kafka/pull/6402
 
 
   When shutting down the ReplicaFetcher thread, we may miss to unregister the 
sensor in selector.close(). When that happened, we will fail to start up the 
ReplicaFetcherThread with the same fetch id again because of the 
IllegalArgumentException in sensor registration. This issue will cause constant 
URPs in the cluster because the ReplicaFetchterThread is gone.
   
   This patch addresses this issue by introducing a try-finally block in 
selector.close() so that we will always unregister the sensors in shutting down 
ReplicaFetcherThreads.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplicaFetcherThread fails to startup because of failing to register the 
> metric.
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-8066
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8066
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.1.0, 1.1.1, 1.1.2, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 
> 2.0.2, 2.1.2
>            Reporter: Zhanxiang (Patrick) Huang
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Major
>
> After KAFKA-6051, we close leaderEndPoint in replica fetcher thread 
> initiateShutdown to try to preempt in-progress fetch request and accelerate 
> repica fetcher thread shutdown. However, the selector may fail to close the 
> channel and throw an Exception when the replica fetcher thread is still 
> actively fetching. In this case, the sensor will not be cleaned up. 
> Basically, if `close(id)` throws an exception in `Selector.close()`, then 
> `sensors.close()` will not be called and thus the sensors will not get 
> unregistered (See codes below).
> {code:java}
>     public void close() {
>         List<String> connections = new ArrayList<>(channels.keySet());
>         for (String id : connections)
>             close(id);
>         try {
>             this.nioSelector.close();
>         } catch (IOException | SecurityException e) {
>             log.error("Exception closing nioSelector:", e);
>         }
>         sensors.close();
>         channelBuilder.close();
>     }
> {code}
> If this happen, when the broker want to start up the ReplicaFetcherThread 
> with the same fetch id to the same destination broker again (e.g. due to 
> leadership changes or new partitions get created), the ReplicaFetcherThread 
> will fail to start up because the selector will throw an 
> IllegalArgumentException if the metric with the same name already exists:
> {noformat}
> 2019/02/27 10:24:26.938 ERROR [KafkaApis] [kafka-request-handler-6] 
> [kafka-server] [] [KafkaApi-38031] Error when handling request {}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-count, group=replica-fetcher-metrics, description=The 
> current number of active connections., tags={broker-id=29712, fetcher-id=3}]' 
> already exists, can't register another one.
>         at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:559) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:502) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:485) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:470) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:963)
>  ~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:170) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:188) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaFetcherBlockingSend.<init>(ReplicaFetcherBlockingSend.scala:61)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at scala.Option.getOrElse(Option.scala:121) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:67) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:32)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.AbstractFetcherManager.kafka$server$AbstractFetcherManager$$addAndStartFetcherThread$1(AbstractFetcherManager.scala:132)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:146)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:137)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:137)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:1333) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1107) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:194) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:110) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> {noformat}
> The fix should be adding a try-finally block for selector.close() to make 
> sure sensors.close() will be called even an exception is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to