Hi [~junrao], has there been any more discussion or progress on this issue?
Thanks, Andy On Tue, 12 Jul 2016 at 10:11, Andy Coates (JIRA) <j...@apache.org> wrote: > > [ > https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372565#comment-15372565 > ] > > Andy Coates commented on KAFKA-3919: > ------------------------------------ > > [~junrao] Good stuff. Look forward to hearing from you and getting > involved more =) > > > Broker faills to start after ungraceful shutdown due to > non-monotonically incrementing offsets in logs > > > ------------------------------------------------------------------------------------------------------ > > > > Key: KAFKA-3919 > > URL: https://issues.apache.org/jira/browse/KAFKA-3919 > > Project: Kafka > > Issue Type: Bug > > Components: core > > Affects Versions: 0.9.0.1 > > Reporter: Andy Coates > > > > Hi All, > > I encountered an issue with Kafka following a power outage that saw a > proportion of our cluster disappear. When the power came back on several > brokers halted on start up with the error: > > {noformat} > > Fatal error during KafkaServerStartable startup. Prepare to > shutdown” > > kafka.common.InvalidOffsetException: Attempt to append an offset > (1239742691) to position 35728 no larger than the last offset appended > (1239742822) to > /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index. > > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > > at > kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > > at > kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > > at kafka.log.LogSegment.recover(LogSegment.scala:188) > > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > > at kafka.log.Log.loadSegments(Log.scala:160) > > at kafka.log.Log.<init>(Log.scala:90) > > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > {noformat} > > The only way to recover the brokers was to delete the log files that > contained non monotonically incrementing offsets. > > We've spent some time digging through the logs and I feel I may have > worked out the sequence of events leading to this issue, (though this is > based on some assumptions I've made about the way Kafka is working, which > may be wrong). > > First off, we have unclean leadership elections disable. (We did later > enable them to help get around some other issues we were having, but this > was several hours after this issue manifested), and we're producing to the > topic with gzip compression and acks=1 > > We looked through the data logs that were causing the brokers to not > start. What we found the initial part of the log has monotonically > increasing offset, where each compressed batch normally contained one or > two records. Then the is a batch that contains many records, whose first > records have an offset below the previous batch and whose last record has > an offset above the previous batch. Following on from this there continues > a period of large batches, with monotonically increasing offsets, and then > the log returns to batches with one or two records. > > Our working assumption here is that the period before the offset dip, > with the small batches, is pre-outage normal operation. The period of > larger batches is from just after the outage, where producers have a back > log to processes when the partition becomes available, and then things > return to normal batch sizes again once the back log clears. > > We did also look through the Kafka's application logs to try and piece > together the series of events leading up to this. Here’s what we know > happened, with regards to one partition that has issues, from the logs: > > Prior to outage: > > * Replicas for the partition are brokers 2011, 2012, 2024, with 2024 > being the preferred leader. > > * Producers using acks=1, compression=gzip > > * Brokers configured with unclean.elections=false, zk.session-timeout=36s > > Post outage: > > * 2011 comes up first, (also as the Controller), recovers unflushed log > segment 1239444214, completes load with offset 1239740602, and becomes > leader of the partition. > > * 2012 comes up next, recovers its log, recovers unflushed log segment > 1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of > the recovered offset of the current leader), and starts following. > > * 2024 comes up quickly after 2012. recovers unflushed log segment > 1239444214, truncates to offset 1239742250, (thats 1,648 records ahead of > the recovered offset of the current leader), and starts following. > > * The Controller adds 2024 to the replica set just before 2024 halts due > to another partition having an offset greater than the leader. > > * The Controller adds 2012 to the replica set just before 2012 halts due > to another partition having an offset greater than the leader. > > * When 2012 is next restarted, it fails to fully start as its > complaining of invalid offsets in the log. > > You’ll notice that the offset the brokers truncate to are different for > each of the three brokers. > > We're assuming that when the 2012 starts up and follows the leader it > request records from its truncated offsets, but that the logs have diverged > on these two brokers to the point that the requested offset corresponds > within the leader's log to the middle of a compressed record set, not at a > record set boundary. The leader then returns the whole compressed set, > which the follower appends to its log - unknowingly introducing a dip in > its otherwise monotonically incrementing offsets. > > Several of our brokers were unlucky enough to have this dip at the 4K > boundary used by the offset indexer, causing a protracted outage. We’ve > written a little utility that shows several more brokers have a dip outside > of the 4K boundary. > > There are some assumptions in there, which I’ve not got around to > confirming / denying. (A quick attempt to recreate this failed and I've not > found the time to invest more). > > Of course I'd really appreciate the community / experts stepping in and > commenting on whether our assumptions are right or wrong, or if there is > another explanation to the problem. > > Obviously, the fact the broker got into this state and then won’t start > is obviously a bug, and one I’d like to fix. A Kafka broker should not > corrupt its own log during normal operation to the point that it can’t > restart! :D > > A secondary issue is if we think the divergent logs are acceptable? This > may be deemed acceptable given the producers have chosen availability over > consistency when they produced with acks = 1? Though personally, the > system having diverging replicas of an immutable commit log just doesn't > sit right. > > I see us having a few options here: > > * Have the replicas detect the divergence of their logs e.g. a follower > compares the checksum of its last record with the same offset on the > leader. The follower can then workout that its log has diverged from the > leader. At which point it could either halt, stop replicating that > partition or search backwards to find the point of divergence, truncate and > recover. (possibly saving the truncated part somewhere). This would be a > protocol change for Kafka. This solution trades availability, (you’ve got > less ISRs during the extended re-sync process), for consistency. > > * Leave the logs as they are and have the indexing of offsets in the log > on start up handle such a situation gracefully. This leaves logs in a > divergent state between replicas, (meaning replays would yield different > messages if the leader was up to down), but gives better availability, (no > time spent not being an ISR while it repairs any divergence). > > * Support multiple options and allow it be tuned, ideally by topic. > > * Something else... > > I’m happy/keen to contribute here. But I’d like to first discuss which > option should be investigated. > > Andy > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) >