Thanks every one. I"ll try to clean up the disk space and try again. On Sun, Nov 23, 2014 at 8:47 AM, Jason Rosenberg <j...@squareup.com> wrote:
> Rajiv, > > So, any time a broker's disk fills up, it will shut itself down immediately > (it will do this in response to any IO error on writing to disk). > Unfortunately, this means that the node will not be able to do any > housecleaning before shutdown, which is an 'unclean' shutdown. This means > that when it restarts, it needs to reset the data to the last known > checkpoint. If the partition is replicated, and it can restore it from > another broker, it will try to do that (but it doesn't sound like it can do > that in your case, since all the other nodes are down too). > > There is a fix coming in 0.8.2 that will allow a broker to restore multiple > partitions in parallel (but the current behavior in 0.8.1.1 and prior is to > restore partitions 1 by 1). See: > https://issues.apache.org/jira/browse/KAFKA-1414. This fix should speed > things up greatly when you have a large number of partitions. > > If a disk is full, the broker will refuse to even start up (or will fail > immediately on the first write attempt and shut itself down). So, > generally, in this event, you need to clear some disk space before trying > to restart the server. > > The bottom line is that you don't want any of your brokers to run out of > disk space (thus you need to have good monitoring/alerting for advance > warning on this). Kafka doesn't attempt to detect if it's about to run out > of space and die, so you have to manage that and guard against it outside > of kafka. > > Jason > > On Sat, Nov 22, 2014 at 5:27 PM, Harsha <ka...@harsha.io> wrote: > > > It might logs check your kafka logs dir (server logs) . Kafka can > > produce lot of logs in a quick time make sure thats whats in play here. > > -Harsha > > > > On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote: > > > Actually see a bunch of errors. One of the brokers is out of space and > > > this > > > might be causing everything to spin out of control. > > > > > > Some logs: > > > > > > On *broker 1* (the one that has run out of space): > > > > > > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13 ] > > > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-1-13], > Disk > > > error while replicating data. > > > > > > kafka.common.KafkaStorageException: I/O exception in append to log > > > 'mytopic-633' > > > > > > at kafka.log.Log.append(Log.scala:283) > > > > > > at > > > > > > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > > > > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > > > at kafka.utils.Utils$.inLock(Utils.scala:538) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > > > > > > at > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > > > at > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > Caused by: java.io.IOException: No space left on device > > > > > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > > > > > > at > > > sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) > > > > > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > > > > > > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > > > > > > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) > > > > > > at > > > > > > kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132) > > > > > > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210) > > > > > > at kafka.log.LogSegment.append(LogSegment.scala:80) > > > > > > at kafka.log.Log.append(Log.scala:269) > > > > > > ... 13 more > > > > > > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13 ] > > > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-2-13], > > > Error > > > getting offset for partition [myTopic,0] to broker 13 > > > > > > java.io.IOException: No space left on device > > > > > > at java.io.FileOutputStream.writeBytes(Native Method) > > > > > > at java.io.FileOutputStream.write(FileOutputStream.java:345) > > > > > > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > > > > > > at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) > > > > > > at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) > > > > > > at > java.io.OutputStreamWriter.write(OutputStreamWriter.java:207) > > > > > > at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129) > > > > > > at java.io.BufferedWriter.write(BufferedWriter.java:230) > > > > > > at java.io.Writer.write(Writer.java:157) > > > > > > at > > > > > > kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:50) > > > > > > at > > > > > > kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:49) > > > > > > at > > > > > > scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106) > > > > > > at > > > > > > scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106) > > > > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > scala.collection.immutable.MapLike$$anon$2.foreach(MapLike.scala:106) > > > > > > at > > > kafka.server.OffsetCheckpoint.liftedTree1$1(OffsetCheckpoint.scala:49) > > > > > > at > kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:39) > > > > > > at > > > > > > kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:234) > > > > > > at > > > > > > kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:231) > > > > > > at > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > > > > at > > > > kafka.log.LogManager.checkpointRecoveryPointOffsets(LogManager.scala:231) > > > > > > at kafka.log.LogManager.truncateTo(LogManager.scala:204) > > > > > > at > > > > > > kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:84) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:144) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > > > > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > > > at kafka.utils.Utils$.inLock(Utils.scala:538) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > > > > > > at > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > > > at > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > > > > *On broker 2:* > > > > > > 2014-11-22T21:20:19.629Z ERROR [request-expiration-task ] > > > [kafka.server.KafkaApis ]: [KafkaApi-12] Error when > > > processing > > > fetch request for partition [myTopic,265] offset 415659 from follower > > > with > > > correlation id 0 > > > > > > kafka.common.OffsetOutOfRangeException: Request for offset 415659 but > we > > > only have log segments in the range 0 to 410453. > > > > > > at kafka.log.Log.read(Log.scala:377) > > > > > > at > > > > > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:401) > > > > > > at > > > > > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:347) > > > > > > at > > > > > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:343) > > > > > > at > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > > > at > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > > > > at scala.collection.immutable.HashMap.map(HashMap.scala:35) > > > > > > at > > > > > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:343) > > > > > > at > > > > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:704) > > > > > > at > > > > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:686) > > > > > > at > > > > > > kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216) > > > > > > > > > *On broker 3*: > > > > > > > > > 2014-11-22T21:26:48.216Z INFO [kafka-request-handler-3 ] > > > [fka.controller.PartitionStateMachine]: [Partition state machine on > > > Controller 13]: Invoking state change to OnlinePartition for partitions > > > [myTopic,122] > > > > > > 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3 ] > > > [state.change.logger ]: Controller 13 epoch 132 > > > encountered > > > error while electing leader for partition [myTopic,122] due to: No > other > > > replicas in ISR 13 for [myTopic,122] besides shutting down brokers 13. > > > > > > 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3 ] > > > [state.change.logger ]: Controller 13 epoch 132 > initiated > > > state change for partition [myTopic,122] from OnlinePartition to > > > OnlinePartition failed > > > > > > kafka.common.StateChangeFailedException: encountered error while > electing > > > leader for partition [myTopic,122] due to: No other replicas in ISR 13 > > > for > > > [myTopic,122] besides shutting down brokers 13. > > > > > > at > > > > > > kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:344) > > > > > > at > > > > > > kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:171) > > > > > > at > > > > > > kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111) > > > > > > at > > > > > > kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110) > > > > > > at scala.collection.immutable.Set$Set1.foreach(Set.scala:81) > > > > > > at > > > > > > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110) > > > > > > at > > > > > > kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:227) > > > > > > at > > > > > > kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:223) > > > > > > at scala.Option.foreach(Option.scala:121) > > > > > > at > > > > > > kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:223) > > > > > > at > > > > > > kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:219) > > > > > > at > > > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123) > > > > > > at > > > > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322) > > > > > > at > > > > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322) > > > > > > at > > > > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322) > > > > > > at > > > > > > kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:219) > > > > > > at > > > > > > kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:140) > > > > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:77) > > > > > > at > > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > > > > > > at java.lang.Thread.run(Thread.java:744) > > > > > > Caused by: kafka.common.StateChangeFailedException: No other replicas > in > > > ISR 13 for [myTopic,122] besides shutting down brokers 13 > > > > > > at > > > > > > kafka.controller.ControlledShutdownLeaderSelector.selectLeader(PartitionLeaderSelector.scala:181) > > > > > > at > > > > > > kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:320) > > > > > > ... 19 more > > > > > > On Sat, Nov 22, 2014 at 1:17 PM, Harsha <ka...@harsha.io> wrote: > > > > > > > Rajiv , > > > > which version of kafka are you using and do you see any > > errors > > > > when the server goes down after sending few messages. > > > > -Harsha > > > > > > > > On Sat, Nov 22, 2014, at 01:05 PM, Rajiv Kurian wrote: > > > > > The brokers also seem unavailable while this is going on. Each of > > these > > > > > log messages takes 2-3 seconds so at about 1200 partitions it > takes > > up > > > > > quite a bit of time. Ultimately it does recover though but sadly it > > goes > > > > > down soon enough after I start sending it messages. > > > > > > > > > > On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian < > ra...@signalfuse.com > > > > > > > > wrote: > > > > > > > > > > > A 3 node kafka broker cluster went down yesterday (all nodes) > and I > > > > just > > > > > > noticed it this morning. When I restarted it this morning, I see > a > > > > lengthy > > > > > > list of messages like this: > > > > > > > > > > > > Loading log 'mytopic-partitionNum" > > > > > > Recovering unflushed segment 'some number' of in log > > > > mytopic-partitionNum. > > > > > > Completed load of log mytopic-partitionNum with log end offset > > > > someOffset > > > > > > > > > > > > It's been going on for more than 30 minutes since I restarted the > > > > broker. > > > > > > I have quite a few partitions (over 1000) but I still wouldn't > > expect > > > > it to > > > > > > take such a long time. > > > > > > > > > > > > Any ideas on how I should investigate the problem? > > > > > > > > > > > > Thanks! > > > > > > > > > > > > >