Today the Kafka broker does not have any tools for moving data between volumes on a single broker. Also it doesn't do any fancy calculation to determine where to place new partitions, it simply does round-robin placement across the disks.
The way we tackle this at LinkedIn is to run with a single volume that is in RAID 10. This gets rid of any single disk size limitation, and pushes the problem to a broker level. Kafka does have tools to move partitions around between brokers. Let me know if you would like more details. -Clark On Wed, Mar 25, 2015 at 2:48 PM, K Zakee (JIRA) <j...@apache.org> wrote: > > [ > https://issues.apache.org/jira/browse/KAFKA-2038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380850#comment-14380850 > ] > > K Zakee commented on KAFKA-2038: > -------------------------------- > > Thanks, I was able to start the broker after manually moving some > partitions to directories on other volumes. > > Wondering if we would always have to do manual steps every time we reach a > certain disk usage on a volume/directory. Or is there a better way to > distribute the storage. > > In our case, out of 40 topics, only 6 will have about 80 percent of total > data. So what is the best way to choose config so we can avoid manual steps > going on. > > > Unable to restart brokers after it went down with no space left on disk > > ----------------------------------------------------------------------- > > > > Key: KAFKA-2038 > > URL: https://issues.apache.org/jira/browse/KAFKA-2038 > > Project: Kafka > > Issue Type: Bug > > Components: core > > Affects Versions: 0.8.2.1 > > Reporter: K Zakee > > Priority: Blocker > > > > What should happen if one of the log directories configured with broker > is 100% full. Is it expected that brokers will shutdown themselves? > > We ran into the full disk space on one of the volumes (out of 8) on each > of 5 brokers, and brokers shutdown themselves. We still have about 60% of > total disk space provided by 8 volumes/directories. Should n’t the brokers > continue to function as long as they have space left on the last log > directory. > > In this case, how do I fix and restart the broker. Trying to restart > also failed with fatal error. > > Error stack traces: > > ================= > > [2015-03-21 03:12:21,433] FATAL [app=broker] [ReplicaFetcherThread-6-3] > [ReplicaFetcherThread-6-3], Disk error while replicating data. > (kafka.server.ReplicaFetcherThread) > > kafka.common.KafkaStorageException: I/O exception in append to log > ‘Topic-11' > > at kafka.log.Log.append(Log.scala:266) > > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:54) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109) > > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) > > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109) > > at kafka.utils.Utils$.inLock(Utils.scala:535) > > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108) > > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86) > > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > Caused by: java.io.IOException: No space left on device > > at sun.nio.ch.FileDispatcher.write0(Native Method) > > at sun.nio.ch.FileDispatcher.write(FileDispatcher.java:39) > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:69) > > at sun.nio.ch.IOUtil.write(IOUtil.java:40) > > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:198) > > at > kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:133) > > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210) > > at kafka.log.LogSegment.append(LogSegment.scala:85) > > at kafka.log.Log.append(Log.scala:309) > > ... 12 more > > ================= > > [2015-03-21 10:38:25,244] INFO [app=broker] [main] [Kafka Server 5], > shut down completed (kafka.server.KafkaServer) > > [2015-03-21 10:38:25,245] FATAL [app=broker] [main] Fatal error during > KafkaServerStartable startup. Prepare to shutdown > (kafka.server.KafkaServerStartable) > > java.lang.InternalError: a fault occurred in a recent unsafe memory > access operation in compiled Java code > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:39) > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:312) > > at > kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) > > at > kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) > > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > > at > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > > at kafka.log.LogSegment.recover(LogSegment.scala:175) > > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:162) > > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:141) > > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > > at kafka.log.Log.loadSegments(Log.scala:141) > > at kafka.log.Log.<init>(Log.scala:67) > > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142) > > at kafka.utils.Utils$$anon$1.run(Utils.scala:54) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) > > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > > at java.lang.Thread.run(Thread.java:662) > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) >