[ https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367781#comment-15367781 ]
Jun Rao commented on KAFKA-3919: -------------------------------- [~BigAndy], yes, the fix could be involved and we haven't nailed down a detailed design yet. Zookeeper has to deal with a similar issue. Let me discuss this with [~fpj] a bit. Once we are comfortable with a design, we can see if you want to pick it up or not. > Broker faills to start after ungraceful shutdown due to non-monotonically > incrementing offsets in logs > ------------------------------------------------------------------------------------------------------ > > Key: KAFKA-3919 > URL: https://issues.apache.org/jira/browse/KAFKA-3919 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.9.0.1 > Reporter: Andy Coates > > Hi All, > I encountered an issue with Kafka following a power outage that saw a > proportion of our cluster disappear. When the power came back on several > brokers halted on start up with the error: > {noformat} > Fatal error during KafkaServerStartable startup. Prepare to shutdown” > kafka.common.InvalidOffsetException: Attempt to append an offset > (1239742691) to position 35728 no larger than the last offset appended > (1239742822) to > /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.<init>(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only way to recover the brokers was to delete the log files that > contained non monotonically incrementing offsets. > We've spent some time digging through the logs and I feel I may have worked > out the sequence of events leading to this issue, (though this is based on > some assumptions I've made about the way Kafka is working, which may be > wrong). > First off, we have unclean leadership elections disable. (We did later enable > them to help get around some other issues we were having, but this was > several hours after this issue manifested), and we're producing to the topic > with gzip compression and acks=1 > We looked through the data logs that were causing the brokers to not start. > What we found the initial part of the log has monotonically increasing > offset, where each compressed batch normally contained one or two records. > Then the is a batch that contains many records, whose first records have an > offset below the previous batch and whose last record has an offset above the > previous batch. Following on from this there continues a period of large > batches, with monotonically increasing offsets, and then the log returns to > batches with one or two records. > Our working assumption here is that the period before the offset dip, with > the small batches, is pre-outage normal operation. The period of larger > batches is from just after the outage, where producers have a back log to > processes when the partition becomes available, and then things return to > normal batch sizes again once the back log clears. > We did also look through the Kafka's application logs to try and piece > together the series of events leading up to this. Here’s what we know > happened, with regards to one partition that has issues, from the logs: > Prior to outage: > * Replicas for the partition are brokers 2011, 2012, 2024, with 2024 being > the preferred leader. > * Producers using acks=1, compression=gzip > * Brokers configured with unclean.elections=false, zk.session-timeout=36s > Post outage: > * 2011 comes up first, (also as the Controller), recovers unflushed log > segment 1239444214, completes load with offset 1239740602, and becomes leader > of the partition. > * 2012 comes up next, recovers its log, recovers unflushed log segment > 1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of the > recovered offset of the current leader), and starts following. > * 2024 comes up quickly after 2012. recovers unflushed log segment > 1239444214, truncates to offset 1239742250, (thats 1,648 records ahead of > the recovered offset of the current leader), and starts following. > * The Controller adds 2024 to the replica set just before 2024 halts due to > another partition having an offset greater than the leader. > * The Controller adds 2012 to the replica set just before 2012 halts due to > another partition having an offset greater than the leader. > * When 2012 is next restarted, it fails to fully start as its complaining of > invalid offsets in the log. > You’ll notice that the offset the brokers truncate to are different for each > of the three brokers. > We're assuming that when the 2012 starts up and follows the leader it request > records from its truncated offsets, but that the logs have diverged on these > two brokers to the point that the requested offset corresponds within the > leader's log to the middle of a compressed record set, not at a record set > boundary. The leader then returns the whole compressed set, which the > follower appends to its log - unknowingly introducing a dip in its otherwise > monotonically incrementing offsets. > Several of our brokers were unlucky enough to have this dip at the 4K > boundary used by the offset indexer, causing a protracted outage. We’ve > written a little utility that shows several more brokers have a dip outside > of the 4K boundary. > There are some assumptions in there, which I’ve not got around to confirming > / denying. (A quick attempt to recreate this failed and I've not found the > time to invest more). > Of course I'd really appreciate the community / experts stepping in and > commenting on whether our assumptions are right or wrong, or if there is > another explanation to the problem. > Obviously, the fact the broker got into this state and then won’t start is > obviously a bug, and one I’d like to fix. A Kafka broker should not corrupt > its own log during normal operation to the point that it can’t restart! :D > A secondary issue is if we think the divergent logs are acceptable? This may > be deemed acceptable given the producers have chosen availability over > consistency when they produced with acks = 1? Though personally, the system > having diverging replicas of an immutable commit log just doesn't sit right. > I see us having a few options here: > * Have the replicas detect the divergence of their logs e.g. a follower > compares the checksum of its last record with the same offset on the leader. > The follower can then workout that its log has diverged from the leader. At > which point it could either halt, stop replicating that partition or search > backwards to find the point of divergence, truncate and recover. (possibly > saving the truncated part somewhere). This would be a protocol change for > Kafka. This solution trades availability, (you’ve got less ISRs during the > extended re-sync process), for consistency. > * Leave the logs as they are and have the indexing of offsets in the log on > start up handle such a situation gracefully. This leaves logs in a divergent > state between replicas, (meaning replays would yield different messages if > the leader was up to down), but gives better availability, (no time spent not > being an ISR while it repairs any divergence). > * Support multiple options and allow it be tuned, ideally by topic. > * Something else... > I’m happy/keen to contribute here. But I’d like to first discuss which option > should be investigated. > Andy -- This message was sent by Atlassian JIRA (v6.3.4#6332)