For #1, one potential cause is that someone was consuming a wrong topic name (e.g., datapoints-wr/ite-34). In 0.7, this will accidentally create a directory datapoints-wr in the kafka log root dir. The problem has been fixed in 0.8 by restricting the topic names to alpha-numeric plus "-" and "_".
Thanks, Jun On Fri, Jul 19, 2013 at 7:45 AM, Blake Smith <blake.sm...@tempo-db.com>wrote: > Just to close the loop here for posterity: > > 1. For the directory topic name corruption, it looks like there's still an > outstanding issue in JIRA: https://issues.apache.org/jira/browse/KAFKA-411 > 2. Ensuring log recovery is run seems to be fixed in > commit 75fc5eab35aa33cffd9c09a2070dfe287db0ef4e ( > https://issues.apache.org/jira/browse/KAFKA-188) > > Thanks for your help Jay, it looks like we'll have to start the 0.7.1 -> > 0.8 upgrade process sooner than I thought. > > > On Wed, Jul 17, 2013 at 12:32 PM, Blake Smith <blake.sm...@tempo-db.com > >wrote: > > > 1. Cool. For now I'll write this off as a fluke side-effect of a unsafe > > hardware shutdown. If I come up with a reproducible test case, I'll send > it > > (or a patch) your way. > > > > 2. After the process came up after the crash, recovery tried to run on > > each partition, but because of the malformed directory issue it looks > like > > recovery never ran on partition 11 (the log with the corrupt message). > > > > 40472 [main] INFO kafka.log.Log - Loading the last segment > > /var/kafka/datapoints-write-33/00000000093417473922.kafka in mutable > mode, > > recovery true > > 44032 [main] INFO kafka.message.FileMessageSet - recover high water > > mark:199897864 > > 44032 [main] INFO kafka.message.FileMessageSet - Recovery succeeded in > 3 > > seconds. 0 bytes truncated. > > 44033 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-write-35' > > 44044 [main] INFO kafka.log.Log - Loading the last segment > > /var/kafka/datapoints-write-35/00000000092880891040.kafka in mutable > mode, > > recovery true > > 48836 [main] INFO kafka.message.FileMessageSet - recover high water > > mark:268123698 > > 48836 [main] INFO kafka.message.FileMessageSet - Recovery succeeded in > 4 > > seconds. 0 bytes truncated. > > 48859 [main] INFO kafka.log.LogManager - Loading log 'datapoints-wr' > > 48911 [main] FATAL kafka.server.KafkaServerStartable - Fatal error > during > > KafkaServerStable startup. Prepare to shutdown > > java.lang.NumberFormatException: For input string: "wr" > > at > > > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > > at java.lang.Integer.parseInt(Integer.java:481) > > at java.lang.Integer.parseInt(Integer.java:514) > > at > > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) > > at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) > > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > > at > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > at kafka.log.LogManager.<init>(LogManager.scala:65) > > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > > at > > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > > at kafka.Kafka$.main(Kafka.scala:50) > > at kafka.Kafka.main(Kafka.scala) > > 48912 [main] INFO kafka.server.KafkaServer - Shutting down Kafka server > > 48913 [main] INFO kafka.utils.KafkaScheduler - shutdown scheduler > > kafka-logcleaner- > > 48913 [main] INFO kafka.server.KafkaServer - Kafka server shut down > > completed > > 0 [main] INFO kafka.utils.Utils$ - The number of partitions for > topic > > datapoints-write : 50 > > 2 [main] INFO kafka.utils.Utils$ - The number of partitions for > topic > > datapoints-increment : 50 > > 2 [main] INFO kafka.utils.Utils$ - The number of partitions for > topic > > series-index : 5 > > 5 [main] INFO kafka.server.KafkaServer - Starting Kafka server... > > 17 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-write-18' > > 32 [main] INFO kafka.log.Log - Loading the last segment > > /var/kafka/datapoints-write-18/00000000095029068903.kafka in mutable > mode, > > recovery false > > 46 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-increment-15' > > 46 [main] INFO kafka.log.Log - Loading the last segment > > /var/kafka/datapoints-increment-15/00000000000000000000.kafka in mutable > > mode, recovery false > > 47 [main] INFO kafka.log.LogManager - Loading log > > 'datapoints-increment-25' > > 47 [main] INFO kafka.log.Log - Loading the last segment > > /var/kafka/datapoints-increment-25/00000000000000000000.kafka in mutable > > mode, recovery false > > 47 [main] INFO kafka.log.LogManager - Loading log > 'datapoints-write-27' > > 48 [main] INFO kafka.log.Log - Loading the last segment > > /var/kafka/datapoints-write-27/00000000093417873484.kafka in mutable > mode, > > recovery false > > > > How is recovery state tracked? From the behavior I'm seeing, it looks > like > > it is only triggered on the first restart after an unclean shutdown. > > > > Blake > > > > > > On Tue, Jul 16, 2013 at 6:52 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > >> 1. I don't think this is something kafka can do. We never rename > >> directories once we create them. I'm a little at a loss as to how/why > the > >> os would do it either, though. > >> 2. Can you check if it ran recovery or not (you would see a bunch of > >> messages about recovering each topic). This process goes through each > >> message sequentially and checks the CRC. If that did happen it is > unlikely > >> that there was a problem in the log itself. > >> > >> -Jay > >> > >> > >> On Tue, Jul 16, 2013 at 3:04 PM, Blake Smith <blake.sm...@tempo-db.com > >> >wrote: > >> > >> > Thanks for the response Jay, > >> > > >> > 1. Do you think the corrupt partition directory names is happening at > >> the > >> > kafka level, or the file system level? If it's at the file system > level, > >> > perhaps we can investigate either changing FS options or selecting a > >> > different filesystem for the log files. Is there a recommended > >> filesystem / > >> > filesystem options? > >> > > >> > 2. I'm a little confused about terminology, but from what I observed > the > >> > disk corruption was in the last segment for the partition. At the > time I > >> > investigated last segment file for the bad partition was ~300mb large > >> and > >> > the corrupt message was around around 288mb byte offset (Our segment > >> files > >> > seem to get rotated at 512mb, which I believe is the default). When > the > >> > server came back up, the corrupt last segment file was still being > >> written > >> > to beyond the corrupt message. Is this the behavior you're describing? > >> > > >> > Blake > >> > > >> > > >> > On Tue, Jul 16, 2013 at 3:29 PM, Jay Kreps <jay.kr...@gmail.com> > wrote: > >> > > >> > > 1. The corrupt directory names is not something we can really > handle. > >> The > >> > > names have to work with zookeeper so if weird characters get > inserted > >> I > >> > > think the best we can do is give an error. I believe we have fixed > >> the 0 > >> > > byte file problem in 0.8. > >> > > 2. The assumption for log recovery is that data that has been > flushed > >> to > >> > > disk is stable, but data which has not been flushed can be in any > >> state > >> > > (garbage bytes, missing, partial writes, etc). Recovery runs only on > >> the > >> > > last segment of unflushed data (e.g. the file with the highest > number) > >> > and > >> > > only in the case of an unclean shutdown. This is basically an > >> > > optimization--if we recover all the log on startup startup can take > >> hours > >> > > for large logs (e.g. 5TB at say 10MB/sec). From what you are saying > it > >> > > sounds like you had disk corruption prior to the last segment--in > >> other > >> > > words flushed data became corrupt. This is not handled in 0.7. In > 0.8 > >> you > >> > > would have the option of just deleting the problematic data and > >> restoring > >> > > from replicas. > >> > > > >> > > -Jay > >> > > > >> > > > >> > > On Tue, Jul 16, 2013 at 1:10 PM, Blake Smith < > >> blake.sm...@tempo-db.com > >> > > >wrote: > >> > > > >> > > > Hi everyone, > >> > > > > >> > > > Last week, one of our production Kafka 0.7.1 servers had a > hardware > >> > > failure > >> > > > that resulted in an unclean restart. When the server came back up > 5 > >> > > minutes > >> > > > later, there were two topic corruption problems that we had to > >> handle > >> > to > >> > > > get the pipeline working again. > >> > > > > >> > > > 1. The kafka log directory had malformed partition directory names > >> [1]. > >> > > The > >> > > > partition directories were shortened names of our topic naming > >> scheme. > >> > > > Inside some of these directories was a 0 byte .kafka file. This > >> > prevented > >> > > > the kafka server process from starting. To resolve this, we > manually > >> > > > removed the malformed topic directories and the empty log files. > >> > > > > >> > > > Is this a known issue? If so, has it been addressed in 0.8? This > >> seems > >> > > like > >> > > > a bug. > >> > > > > >> > > > 2. One of the partitions had a corrupt log message (crc32 check > >> > failure) > >> > > > that prevented consumers from advancing on that partition [2]. To > >> > resolve > >> > > > this issue we had to hexdump the log file seeking to the byte > offset > >> > > > provided by the consumer exception and remove the corrupt message > at > >> > the > >> > > > frame boundary. After removing the corrupt message, we created a > >> task > >> > to > >> > > > parse the log file and put it back into our producer / consumer > >> > pipeline. > >> > > > The InvalidMessageException never bubbled to our consumer thread, > >> > > > preventing us from handling this error without manual manipulation > >> of > >> > the > >> > > > log files (our consumer code was catching InvalidMessageException, > >> but > >> > it > >> > > > never reached that point). > >> > > > > >> > > > Is a corrupt log file a state that Kafka is supposed to handle, or > >> is > >> > our > >> > > > consumer code supposed to handle it? I'm confused about how we can > >> > avoid > >> > > > manually cleaning up log files with a hex editor if we get into > this > >> > > state > >> > > > again. > >> > > > > >> > > > Thanks! > >> > > > > >> > > > Blake > >> > > > > >> > > > [1] Corrupt partition directories in the broker data directories: > >> > > > > >> > > > 0 [main] INFO kafka.utils.Utils$ - The number of partitions > for > >> > > > topic datapoints-write : 50 > >> > > > 1 [main] INFO kafka.utils.Utils$ - The number of partitions > for > >> > > > topic datapoints-increment : 50 > >> > > > 2 [main] INFO kafka.utils.Utils$ - The number of partitions > for > >> > > > topic series-index : 5 > >> > > > 5 [main] INFO kafka.server.KafkaServer - Starting Kafka > >> server... > >> > > > 17 [main] INFO kafka.log.LogManager - Loading log > >> > > 'datapoints-write-18' > >> > > > 32 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-write-18/00000000095029068903.kafka in > mutable > >> > > > mode, recovery false > >> > > > 46 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-15' > >> > > > 46 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-15/00000000000000000000.kafka in > >> > > > mutable mode, recovery false > >> > > > 47 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-25' > >> > > > 47 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-25/00000000000000000000.kafka in > >> > > > mutable mode, recovery false > >> > > > 47 [main] INFO kafka.log.LogManager - Loading log > >> > > 'datapoints-write-27' > >> > > > 48 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-write-27/00000000093417873484.kafka in > mutable > >> > > > mode, recovery false > >> > > > 48 [main] INFO kafka.log.LogManager - Loading log > >> > > 'datapoints-write-9' > >> > > > 48 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-write-9/00000000092880448733.kafka in > mutable > >> > > > mode, recovery false > >> > > > 49 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-8' > >> > > > 49 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-8/00000000000000000000.kafka in > >> > > > mutable mode, recovery false > >> > > > 49 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-29' > >> > > > 49 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-29/00000000008053064625.kafka in > >> > > > mutable mode, recovery false > >> > > > 50 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-34' > >> > > > 50 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-34/00000000000000000000.kafka in > >> > > > mutable mode, recovery false > >> > > > 50 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-19' > >> > > > 51 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-19/00000000000000000000.kafka in > >> > > > mutable mode, recovery false > >> > > > 51 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-16' > >> > > > 51 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-16/00000000000000000000.kafka in > >> > > > mutable mode, recovery false > >> > > > 51 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-27' > >> > > > 52 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-27/00000000000000000000.kafka in > >> > > > mutable mode, recovery false > >> > > > 52 [main] INFO kafka.log.LogManager - Loading log > >> > > 'datapoints-write-34' > >> > > > 52 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-write-34/00000000092880904403.kafka in > mutable > >> > > > mode, recovery false > >> > > > 53 [main] INFO kafka.log.LogManager - Loading log > >> > > 'datapoints-write-21' > >> > > > 53 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-write-21/00000000095028969469.kafka in > mutable > >> > > > mode, recovery false > >> > > > 53 [main] INFO kafka.log.LogManager - Loading log > >> > > 'datapoints-write-48' > >> > > > 54 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-write-48/00000000094491048876.kafka in > mutable > >> > > > mode, recovery false > >> > > > 54 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-40' > >> > > > 55 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-40/00000000000000000000.kafka in > >> > > > mutable mode, recovery false > >> > > > 55 [main] INFO kafka.log.LogManager - Loading log > >> > > > 'datapoints-increment-6' > >> > > > 55 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-increment-6/00000000000000000000.kafka in > >> > > > mutable mode, recovery false > >> > > > 56 [main] INFO kafka.log.LogManager - Loading log > >> > > 'datapoints-write-25' > >> > > > 56 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-write-25/00000000093417394574.kafka in > mutable > >> > > > mode, recovery false > >> > > > 56 [main] INFO kafka.log.LogManager - Loading log > >> > > 'datapoints-write-33' > >> > > > 57 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-write-33/00000000093417473922.kafka in > mutable > >> > > > mode, recovery false > >> > > > 57 [main] INFO kafka.log.LogManager - Loading log > >> > > 'datapoints-write-35' > >> > > > 58 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-write-35/00000000092880891040.kafka in > mutable > >> > > > mode, recovery false > >> > > > 58 [main] INFO kafka.log.LogManager - Loading log > >> 'datapoints-wr' > >> > > > 58 [main] INFO kafka.log.Log - Loading the last segment > >> > > > /var/kafka/datapoints-wr/00000000000000000000.kafka in mutable > mode, > >> > > > recovery false > >> > > > 60 [main] FATAL kafka.server.KafkaServerStartable - Fatal error > >> > > > during KafkaServerStable startup. Prepare to shutdown > >> > > > java.lang.NumberFormatException: For input string: "wr" > >> > > > at > >> > > > > >> > > > >> > > >> > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > >> > > > at java.lang.Integer.parseInt(Integer.java:481) > >> > > > at java.lang.Integer.parseInt(Integer.java:514) > >> > > > at > >> > > > > >> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) > >> > > > at > >> > scala.collection.immutable.StringOps.toInt(StringOps.scala:31) > >> > > > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > >> > > > at > >> kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > >> > > > at > >> kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > >> > > > at > >> > > > > >> > > > >> > > >> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > >> > > > at > >> scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > >> > > > at kafka.log.LogManager.<init>(LogManager.scala:65) > >> > > > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > >> > > > at > >> > > > > >> > > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > >> > > > at kafka.Kafka$.main(Kafka.scala:50) > >> > > > at kafka.Kafka.main(Kafka.scala) > >> > > > 61 [main] INFO kafka.server.KafkaServer - Shutting down Kafka > >> > server > >> > > > 62 [main] INFO kafka.utils.KafkaScheduler - shutdown scheduler > >> > > > kafka-logcleaner- > >> > > > 62 [main] INFO kafka.server.KafkaServer - Kafka server shut > down > >> > > > completed > >> > > > > >> > > > [blake@kafka03 kafka]$ ls > >> > > > dat datapoints-increment-40 > >> datapoints-write-26 > >> > > > datapoints-increment-0 datapoints-increment-41 > >> datapoints-write-27 > >> > > > datapoints-increment-1 datapoints-increment-42 > >> datapoints-write-28 > >> > > > datapoints-increment-10 datapoints-increment-43 > >> datapoints-write-29 > >> > > > datapoints-increment-11 datapoints-increment-44 > datapoints-write-3 > >> > > > datapoints-increment-12 datapoints-increment-45 > >> datapoints-write-30 > >> > > > datapoints-increment-13 datapoints-increment-46 > >> datapoints-write-31 > >> > > > datapoints-increment-14 datapoints-increment-47 > >> datapoints-write-32 > >> > > > datapoints-increment-15 datapoints-increment-48 > >> datapoints-write-33 > >> > > > datapoints-increment-16 datapoints-increment-49 > >> datapoints-write-34 > >> > > > datapoints-increment-17 datapoints-increment-5 > >> datapoints-write-35 > >> > > > datapoints-increment-18 datapoints-increment-6 > >> datapoints-write-36 > >> > > > datapoints-increment-19 datapoints-increment-7 > >> datapoints-write-37 > >> > > > datapoints-increment-2 datapoints-increment-8 > >> datapoints-write-38 > >> > > > datapoints-increment-20 datapoints-increment-9 > >> datapoints-write-39 > >> > > > datapoints-increment-21 datapoints-wr > datapoints-write-4 > >> > > > datapoints-increment-22 datapoints-writ > >> datapoints-write-40 > >> > > > datapoints-increment-23 datapoints-write-0 > >> datapoints-write-41 > >> > > > datapoints-increment-24 datapoints-write-1 > >> datapoints-write-42 > >> > > > datapoints-increment-25 datapoints-write-10 > >> datapoints-write-43 > >> > > > datapoints-increment-26 datapoints-write-11 > >> datapoints-write-44 > >> > > > datapoints-increment-27 datapoints-write-12 > >> datapoints-write-45 > >> > > > datapoints-increment-28 datapoints-write-13 > >> datapoints-write-46 > >> > > > datapoints-increment-29 datapoints-write-14 > >> datapoints-write-47 > >> > > > datapoints-increment-3 datapoints-write-15 > >> datapoints-write-48 > >> > > > datapoints-increment-30 datapoints-write-16 > >> datapoints-write-49 > >> > > > datapoints-increment-31 datapoints-write-17 > datapoints-write-5 > >> > > > datapoints-increment-32 datapoints-write-18 > datapoints-write-6 > >> > > > datapoints-increment-33 datapoints-write-19 > datapoints-write-7 > >> > > > datapoints-increment-34 datapoints-write-2 > datapoints-write-8 > >> > > > datapoints-increment-35 datapoints-write-20 > datapoints-write-9 > >> > > > datapoints-increment-36 datapoints-write-21 series-index-0 > >> > > > datapoints-increment-37 datapoints-write-22 series-index-1 > >> > > > datapoints-increment-38 datapoints-write-23 series-index-2 > >> > > > datapoints-increment-39 datapoints-write-24 series-index-3 > >> > > > datapoints-increment-4 datapoints-write-25 series-index-4 > >> > > > [blake@kafka03 kafka]$ cd datapoints-wr > >> > > > [blake@kafka03 datapoints-wr]$ ls -lah > >> > > > total 8.0K > >> > > > drwxr-xr-x 2 kafka kafka 4.0K Jul 11 18:51 . > >> > > > drwxr-xr-x 110 kafka kafka 4.0K Jul 11 19:05 .. > >> > > > -rw-r--r-- 1 kafka kafka 0 Jul 11 18:51 > >> 00000000000000000000.kafka > >> > > > > >> > > > > >> > > > > >> > > > [2] In our consumer logs: > >> > > > > >> > > > 013-07-11 21:33:01,136 61867 [FetchRunnable-0] ERROR > >> > > > kafka.consumer.FetcherRunnable - error in FetcherRunnable > >> > > > kafka.message.InvalidMessageException: message is invalid, > >> compression > >> > > > codec: NoCompressionCodec size: 79 curr offset: 93706191248 init > >> > > > offset: 93706191165 > >> > > > at > >> > > > > >> > > > >> > > >> > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > >> > > > at > >> > > > > >> > > > >> > > >> > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > >> > > > at > >> > > > > >> > > > >> > > >> > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > >> > > > at > >> > > > > >> > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > >> > > > at > >> > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > >> > > > at > >> > > > > >> > > > >> > > >> > kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) > >> > > > at > >> > > > > >> > > > >> > > >> > kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) > >> > > > at > >> > > > > >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) > >> > > > at > >> > > > > >> > > > >> > > >> > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) > >> > > > at > >> > > > > >> > > > >> > > >> > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) > >> > > > at > >> > > > > >> > > > >> > > >> > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > >> > > > at scala.collection.immutable.List.foreach(List.scala:76) > >> > > > at > >> kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) > >> > > > > >> > > > >> > > >> > > > > >