[ https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rajini Sivaram resolved KAFKA-7576. ----------------------------------- Resolution: Fixed Reviewer: Jason Gustafson > Dynamic update of replica fetcher threads may fail to start/close fetchers > -------------------------------------------------------------------------- > > Key: KAFKA-7576 > URL: https://issues.apache.org/jira/browse/KAFKA-7576 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 1.1.1, 2.0.1, 2.1.0 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > Fix For: 1.1.2, 2.1.1, 2.0.2 > > > KAFKA-6051 moved ReplicaFetcherBlockingSend shutdown earlier in the shutdown > sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers > can now throw an exception because Selector may be closed on a different > thread while data is being written on another thread. KAFKA-7464 changed this > behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and > not propagated to avoid exceptions during broker shutdown. > When config update notification of `num.replica.fetchers` is processed, > partitions are migrated as necessary to increase or decrease the number of > fetcher threads. Existing fetchers are shutdown first before new ones are > created.This migration is performed on the thread processing ZK change > notification. The shutdown of Selector of existing fetchers is not safe since > replica fetcher thread may be processing data at the time using the same > Selector. > Without the fix from KAFKA-7464, another update of the config or broker > restart is required to restart the replica fetchers after dynamic config > update if shutdown encounters an exception. > Exception stack trace: > {code:java} > java.lang.IllegalArgumentException > at java.nio.Buffer.position(Buffer.java:244) > at sun.nio.ch.IOUtil.write(IOUtil.java:68) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:748) > at org.apache.kafka.common.network.Selector.close(Selector.java:736) > at org.apache.kafka.common.network.Selector.close(Selector.java:698) > at org.apache.kafka.common.network.Selector.close(Selector.java:314) > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533) > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90) > at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86) > at > kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76) > at > kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > at > kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72) > at > kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88) > at > kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574) > at > kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) > at > kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) > at scala.collection.immutable.List.foreach(List.scala:392) > at > kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410) > <SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with creation of > replica fetchers during dynamic update. But even for those branches, we > should clean up the Selector to avoid resource leak in the dynamic config > update case (discarding the exception may be sufficient when the broker is > shutdown). -- This message was sent by Atlassian JIRA (v7.6.3#76005)