[ 
https://issues.apache.org/jira/browse/KAFKA-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Niemitz updated KAFKA-4523:
---------------------------------
    Description: 
If I begin a controlled shutdown of a broker that is a coordinator for a 
consumer group, often the shutdown will fail with the following error:

{code}
[2016-12-12 16:24:15,424] INFO [Replica Manager on Broker 10]: Shut down 
completely (kafka.server.ReplicaManager)
[2016-12-12 16:24:15,424] INFO [ExpirationReaper-10], Shutting down 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Stopped  
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Shutdown completed 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-12 16:24:15,451] INFO Shutting down. (kafka.log.LogManager)
[2016-12-12 16:24:31,241] INFO [GroupCoordinator 10]: Preparing to restabilize 
group my-consumer-group with old generation 2673 
(kafka.coordinator.GroupCoordinator)
[2016-12-12 16:24:32,499] INFO [GroupCoordinator 10]: Group my-consumer-group 
with generation 2674 is now empty (kafka.coordinator.GroupCoordinator)
[2016-12-12 16:24:32,515] FATAL [Replica Manager on Broker 10]: Halting due to 
unrecoverable I/O error while handling produce request:  
(kafka.server.ReplicaManager)
kafka.common.KafkaStorageException: I/O exception in append to log 
'__consumer_offsets-33'
        at kafka.log.Log.append(Log.scala:349)
        at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
        at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
        at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
        at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
        at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
        at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
        at 
kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:251)
        at 
kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
        at 
kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
        at scala.Option.foreach(Option.scala:236)
        at 
kafka.coordinator.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:726)
        at kafka.coordinator.DelayedJoin.onComplete(DelayedJoin.scala:39)
        at 
kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
        at 
kafka.coordinator.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:37)
        at 
kafka.coordinator.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:672)
        at kafka.coordinator.DelayedJoin.tryComplete(DelayedJoin.scala:37)
        at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315)
        at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
        at 
kafka.coordinator.GroupCoordinator.onMemberFailure(GroupCoordinator.scala:665)
        at 
kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:740)
        at 
kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33)
        at kafka.server.DelayedOperation.run(DelayedOperation.scala:107)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
        at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300)
        at kafka.log.FileMessageSet.truncateTo(FileMessageSet.scala:405)
        at kafka.log.FileMessageSet.trim(FileMessageSet.scala:378)
        at kafka.log.Log.roll(Log.scala:773)
        at kafka.log.Log.maybeRoll(Log.scala:742)
        at kafka.log.Log.append(Log.scala:405)
        ... 35 more
{code}

This then causes the broker to attempt to run recovery on all log segments on 
the next startup, which obviously is not ideal.  It looks like the group 
coodinator is shutdown after the log manager [1], should the order be reversed?

[1] 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L588

  was:
If I begin a controlled shutdown of a broker that is a coordinator for a 
consumer group, often the shutdown will fail with the following error:

{code}
[2016-12-12 16:24:15,424] INFO [Replica Manager on Broker 10]: Shut down 
completely (kafka.server.ReplicaManager)
[2016-12-12 16:24:15,424] INFO [ExpirationReaper-10], Shutting down 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Stopped  
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Shutdown completed 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-12 16:24:15,451] INFO Shutting down. (kafka.log.LogManager)
[2016-12-12 16:24:31,241] INFO [GroupCoordinator 10]: Preparing to restabilize 
group my-consumer-group with old generation 2673 
(kafka.coordinator.GroupCoordinator)
[2016-12-12 16:24:32,499] INFO [GroupCoordinator 10]: Group my-consumer-group 
with generation 2674 is now empty (kafka.coordinator.GroupCoordinator)
[2016-12-12 16:24:32,515] FATAL [Replica Manager on Broker 10]: Halting due to 
unrecoverable I/O error while handling produce request:  
(kafka.server.ReplicaManager)
kafka.common.KafkaStorageException: I/O exception in append to log 
'__consumer_offsets-33'
        at kafka.log.Log.append(Log.scala:349)
        at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
        at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
        at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
        at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
        at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
        at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
        at 
kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:251)
        at 
kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
        at 
kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
        at scala.Option.foreach(Option.scala:236)
        at 
kafka.coordinator.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:726)
        at kafka.coordinator.DelayedJoin.onComplete(DelayedJoin.scala:39)
        at 
kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
        at 
kafka.coordinator.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:37)
        at 
kafka.coordinator.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:672)
        at kafka.coordinator.DelayedJoin.tryComplete(DelayedJoin.scala:37)
        at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315)
        at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
        at 
kafka.coordinator.GroupCoordinator.onMemberFailure(GroupCoordinator.scala:665)
        at 
kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:740)
        at 
kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33)
        at kafka.server.DelayedOperation.run(DelayedOperation.scala:107)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
        at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300)
        at kafka.log.FileMessageSet.truncateTo(FileMessageSet.scala:405)
        at kafka.log.FileMessageSet.trim(FileMessageSet.scala:378)
        at kafka.log.Log.roll(Log.scala:773)
        at kafka.log.Log.maybeRoll(Log.scala:742)
        at kafka.log.Log.append(Log.scala:405)
        ... 35 more
{code}

This then causes the broker to attempt to run recovery on all log segments on 
the next startup, which obviously is not ideal.


> Controlled shutdown fails if consumer group restabilizes during shutdown
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-4523
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4523
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.1.0
>            Reporter: Steve Niemitz
>
> If I begin a controlled shutdown of a broker that is a coordinator for a 
> consumer group, often the shutdown will fail with the following error:
> {code}
> [2016-12-12 16:24:15,424] INFO [Replica Manager on Broker 10]: Shut down 
> completely (kafka.server.ReplicaManager)
> [2016-12-12 16:24:15,424] INFO [ExpirationReaper-10], Shutting down 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Stopped  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Shutdown completed 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-12-12 16:24:15,451] INFO Shutting down. (kafka.log.LogManager)
> [2016-12-12 16:24:31,241] INFO [GroupCoordinator 10]: Preparing to 
> restabilize group my-consumer-group with old generation 2673 
> (kafka.coordinator.GroupCoordinator)
> [2016-12-12 16:24:32,499] INFO [GroupCoordinator 10]: Group my-consumer-group 
> with generation 2674 is now empty (kafka.coordinator.GroupCoordinator)
> [2016-12-12 16:24:32,515] FATAL [Replica Manager on Broker 10]: Halting due 
> to unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> '__consumer_offsets-33'
>       at kafka.log.Log.append(Log.scala:349)
>       at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>       at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>       at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>       at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>       at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>       at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>       at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>       at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>       at 
> kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:251)
>       at 
> kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
>       at 
> kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> kafka.coordinator.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:726)
>       at kafka.coordinator.DelayedJoin.onComplete(DelayedJoin.scala:39)
>       at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>       at 
> kafka.coordinator.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:37)
>       at 
> kafka.coordinator.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:672)
>       at kafka.coordinator.DelayedJoin.tryComplete(DelayedJoin.scala:37)
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
>       at 
> kafka.coordinator.GroupCoordinator.onMemberFailure(GroupCoordinator.scala:665)
>       at 
> kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:740)
>       at 
> kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33)
>       at kafka.server.DelayedOperation.run(DelayedOperation.scala:107)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
>       at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>       at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300)
>       at kafka.log.FileMessageSet.truncateTo(FileMessageSet.scala:405)
>       at kafka.log.FileMessageSet.trim(FileMessageSet.scala:378)
>       at kafka.log.Log.roll(Log.scala:773)
>       at kafka.log.Log.maybeRoll(Log.scala:742)
>       at kafka.log.Log.append(Log.scala:405)
>       ... 35 more
> {code}
> This then causes the broker to attempt to run recovery on all log segments on 
> the next startup, which obviously is not ideal.  It looks like the group 
> coodinator is shutdown after the log manager [1], should the order be 
> reversed?
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L588



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to