[
https://issues.apache.org/jira/browse/KAFKA-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16632280#comment-16632280
]
Zhanxiang (Patrick) Huang commented on KAFKA-6051:
--------------------------------------------------
[~mchinavan]
[~junrao]
I have a follow up of this issue.
In 2.0 deployment, we saw the following log when shutting down the
ReplicaManager in broker cleaned shutdown:
{noformat}
2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
java.lang.IllegalArgumentException: null
at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
~[?:1.8.0_121]
at
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
~[kafka-clients-2.0.0.22.jar:?]
at
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806)
~[kafka-clients-2.0.0.22.jar:?]
at
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107)
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.doClose(Selector.java:751)
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:739)
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:701)
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:315)
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595)
~[kafka-clients-2.0.0.22.jar:?]
at
kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
~[kafka_2.11-2.0.0.22.jar:?]
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
~[scala-library-2.11.12.jar:?]
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
~[scala-library-2.11.12.jar:?]
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
~[scala-library-2.11.12.jar:?]
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
~[scala-library-2.11.12.jar:?]
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
~[scala-library-2.11.12.jar:?]
at
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616)
~[kafka_2.11-2.0.0.22.jar:?]
{noformat}
After that, we noticed that some of the replica fetcher thread fail to shutdown:
{noformat}
2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel]
[ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log
segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
~[?:1.8.0_121]
at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300)
~[?:1.8.0_121]
at
org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244)
~[kafka-clients-2.0.0.22.jar:?]
at
org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206)
~[kafka-clients-2.0.0.22.jar:?]
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493)
~[kafka_2.11-2.0.0.22.jar:?]
at scala.Option.foreach(Option.scala:257) ~[scala-library-2.11.12.jar:?]
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1493)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.log.Log.maybeHandleIOException(Log.scala:1856)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.log.Log.roll(Log.scala:1479) ~[kafka_2.11-2.0.0.22.jar:?]
at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1465)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.log.Log$$anonfun$append$2.apply(Log.scala:762)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.log.Log.maybeHandleIOException(Log.scala:1856)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.log.Log.append(Log.scala:762) ~[kafka_2.11-2.0.0.22.jar:?]
at kafka.log.Log.appendAsFollower(Log.scala:743)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.cluster.Partition$$anonfun$doAppendRecordsToFollowerOrFutureReplica$1.apply(Partition.scala:601)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:588)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:608)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:188)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
~[kafka_2.11-2.0.0.22.jar:?]
at scala.Option.foreach(Option.scala:257) ~[scala-library-2.11.12.jar:?]
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
~[kafka_2.11-2.0.0.22.jar:?]
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
~[scala-library-2.11.12.jar:?]
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
~[kafka_2.11-2.0.0.22.jar:?]
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:115)
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
~[kafka_2.11-2.0.0.22.jar:?]{noformat}
Worse more, we found out that if there is a exception thrown in
ReplicaFetcherManager shutdown, we basically will skip purgatory shutdown and
HW checkpoint and in our case we didn't see the "Shut down completely" log:
{code:java}
def shutdown(checkpointHW: Boolean = true) {
info("Shutting down")
removeMetrics()
if (logDirFailureHandler != null)
logDirFailureHandler.shutdown()
replicaFetcherManager.shutdown()
replicaAlterLogDirsManager.shutdown()
delayedFetchPurgatory.shutdown()
delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown()
if (checkpointHW)
checkpointHighWatermarks()
info("Shut down completely")
}
{code}
The reason why we see this is that we close leaderEndPoint in replica fetcher
thread initiateShutdown to try to preempt in-progress fetch request and
accelerate repica fetcher thread shutdown. However, leaderEndpoint can throw an
Exception when the replica fetcher thread is still actively fetching. I am
wondering whether we should try to catch the exception thrown in
"leaderEndpoint.close()" instead of letting it throw up in the call stack. In
my opinion, it is safe to do so because ReplicaFetcherThread.initiateShutdown
will be called when:
1. Server shutdown -- In this case we will shut down the process anyway so even
though we fail to close leader enpoint cleanly there is no harm.
2. shutdownIdleFetcherThread -- In this case the fetcher thread is idle and we
will not use it again anyway so there is no harm either.
> ReplicaFetcherThread should close the ReplicaFetcherBlockingSend earlier on
> shutdown
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-6051
> URL: https://issues.apache.org/jira/browse/KAFKA-6051
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0,
> 0.10.2.1, 0.11.0.0
> Reporter: Maytee Chinavanichkit
> Assignee: Maytee Chinavanichkit
> Priority: Major
> Fix For: 1.1.0
>
>
> The ReplicaFetcherBlockingSend works as designed and will blocks until it is
> able to get data. This becomes a problem when we are gracefully shutting down
> a broker. The controller will attempt to shutdown the fetchers and elect new
> leaders. When the last fetch of partition is removed, as part of the
> {{replicaManager.becomeLeaderOrFollower}} call will proceed to shut down any
> idle ReplicaFetcherThread. The shutdown process here can block up to until
> the last fetch request completes. This blocking delay is a big problem
> because the {{replicaStateChangeLock}}, and {{mapLock}} in
> {{AbstractFetcherManager}} is still locked causing latency spikes on multiple
> brokers.
> At this point in time, we do not need the last response as the fetcher is
> shutting down. We should close the leaderEndpoint early during
> {{initiateShutdown()}} instead of after {{super.shutdown()}}.
> For example we see here the shutdown blocked the broker from processing more
> replica changes for ~500 ms
> {code}
> [2017-09-01 18:11:42,879] INFO [ReplicaFetcherThread-0-2], Shutting down
> (kafka.server.ReplicaFetcherThread)
> [2017-09-01 18:11:43,314] INFO [ReplicaFetcherThread-0-2], Stopped
> (kafka.server.ReplicaFetcherThread)
> [2017-09-01 18:11:43,314] INFO [ReplicaFetcherThread-0-2], Shutdown completed
> (kafka.server.ReplicaFetcherThread)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)