[~junrao] will double check On Tue, 5 Jul 2016, 18:53 Jun Rao (JIRA), <j...@apache.org> wrote:
> > [ > https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362865#comment-15362865 > ] > > Jun Rao commented on KAFKA-3919: > -------------------------------- > > [~BigAndy], thanks for the investigation and the additional information. > > Let me first explain how log reconciliation normal works. Each broker > maintains the last committed offset for each partition and stores that > information in a checkpoint file replication-offset-checkpoint. A message > is only considered committed if it's received by all in-sync replicas. The > leader advances the last committed offset and propagates it to the > followers. So, the follower's last committed offset is always <= the > leader's. When a replica becomes a leader, it won't do any truncation to > its log and will instead try to commit all messages in its local log. When > a replica becomes a follower, it will first truncate its log to the last > committed offset stored in its local checkpoint file and then start > replicating from that offset. If unclean leader election is disabled, after > truncation, the follower's last offset should always be <= the leader's > last offset. > > Another thing we do is that if a broker is shut down forcefully, on > startup, we will do log recovery to remove any corrupted messages. In your > case, it seems what happens is that when the new leader (2011) comes up, > its log is actually corrupted and therefore it has to truncate some > messages. This potentially will remove messages that have been previously > committed. Then, when broker 2012 comes up and becomes the follower. It's > possible that after the follower truncates its log to its local last > committed offset, that offset is actually larger than the last recovered > offset in the leader. If no new messages have been appended to the new > leader, the follower will realize that its offset is out of range and will > truncate its log again to the leader's last offset. However, in this case, > it seems some new messages have been published to the new leader and the > follower's offset is actually in range. It's just that the follower's > offset may now point to a completely new set of messages. In this case if > the follower's offset points to the middle of a compressed message set, the > follower will get the whole compressed message set and append it to its > local log. Currently, in the follower, will only ensure that the last > offset in a compressed message set be larger than the last offset in the > log, but not the first offset. This seems to be the situation that you are > in. > > There is still one thing not very clear to me. When building indexes > during log recovery, we actually only add index entries at the boundary of > compressed message set. So as long as the last offset of each compressed > set keeps increasing, we won't hit the InvalidOffsetException in the > description. Could you check whether 1239742691 is the last offset of a > compressed set and if so whether there is a case that the last offset of a > compressed set is out of order in the log? > > > Broker faills to start after ungraceful shutdown due to > non-monotonically incrementing offsets in logs > > > ------------------------------------------------------------------------------------------------------ > > > > Key: KAFKA-3919 > > URL: https://issues.apache.org/jira/browse/KAFKA-3919 > > Project: Kafka > > Issue Type: Bug > > Components: core > > Affects Versions: 0.9.0.1 > > Reporter: Andy Coates > > > > Hi All, > > I encountered an issue with Kafka following a power outage that saw a > proportion of our cluster disappear. When the power came back on several > brokers halted on start up with the error: > > {noformat} > > Fatal error during KafkaServerStartable startup. Prepare to > shutdown” > > kafka.common.InvalidOffsetException: Attempt to append an offset > (1239742691) to position 35728 no larger than the last offset appended > (1239742822) to > /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index. > > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > > at > kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > > at > kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > > at kafka.log.LogSegment.recover(LogSegment.scala:188) > > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > > at kafka.log.Log.loadSegments(Log.scala:160) > > at kafka.log.Log.<init>(Log.scala:90) > > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > {noformat} > > The only way to recover the brokers was to delete the log files that > contained non monotonically incrementing offsets. > > We've spent some time digging through the logs and I feel I may have > worked out the sequence of events leading to this issue, (though this is > based on some assumptions I've made about the way Kafka is working, which > may be wrong). > > First off, we have unclean leadership elections disable. (We did later > enable them to help get around some other issues we were having, but this > was several hours after this issue manifested), and we're producing to the > topic with gzip compression and acks=1 > > We looked through the data logs that were causing the brokers to not > start. What we found the initial part of the log has monotonically > increasing offset, where each compressed batch normally contained one or > two records. Then the is a batch that contains many records, whose first > records have an offset below the previous batch and whose last record has > an offset above the previous batch. Following on from this there continues > a period of large batches, with monotonically increasing offsets, and then > the log returns to batches with one or two records. > > Our working assumption here is that the period before the offset dip, > with the small batches, is pre-outage normal operation. The period of > larger batches is from just after the outage, where producers have a back > log to processes when the partition becomes available, and then things > return to normal batch sizes again once the back log clears. > > We did also look through the Kafka's application logs to try and piece > together the series of events leading up to this. Here’s what we know > happened, with regards to one partition that has issues, from the logs: > > Prior to outage: > > * Replicas for the partition are brokers 2011, 2012, 2024, with 2024 > being the preferred leader. > > * Producers using acks=1, compression=gzip > > * Brokers configured with unclean.elections=false, zk.session-timeout=36s > > Post outage: > > * 2011 comes up first, (also as the Controller), recovers unflushed log > segment 1239444214, completes load with offset 1239740602, and becomes > leader of the partition. > > * 2012 comes up next, recovers its log, recovers unflushed log segment > 1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of > the recovered offset of the current leader), and starts following. > > * 2024 comes up quickly after 2012. recovers unflushed log segment > 1239444214, truncates to offset 1239742250, (thats 1,648 records ahead of > the recovered offset of the current leader), and starts following. > > * The Controller adds 2024 to the replica set just before 2024 halts due > to another partition having an offset greater than the leader. > > * The Controller adds 2012 to the replica set just before 2012 halts due > to another partition having an offset greater than the leader. > > * When 2012 is next restarted, it fails to fully start as its > complaining of invalid offsets in the log. > > You’ll notice that the offset the brokers truncate to are different for > each of the three brokers. > > We're assuming that when the 2012 starts up and follows the leader it > request records from its truncated offsets, but that the logs have diverged > on these two brokers to the point that the requested offset corresponds > within the leader's log to the middle of a compressed record set, not at a > record set boundary. The leader then returns the whole compressed set, > which the follower appends to its log - unknowingly introducing a dip in > its otherwise monotonically incrementing offsets. > > Several of our brokers were unlucky enough to have this dip at the 4K > boundary used by the offset indexer, causing a protracted outage. We’ve > written a little utility that shows several more brokers have a dip outside > of the 4K boundary. > > There are some assumptions in there, which I’ve not got around to > confirming / denying. (A quick attempt to recreate this failed and I've not > found the time to invest more). > > Of course I'd really appreciate the community / experts stepping in and > commenting on whether our assumptions are right or wrong, or if there is > another explanation to the problem. > > Obviously, the fact the broker got into this state and then won’t start > is obviously a bug, and one I’d like to fix. A Kafka broker should not > corrupt its own log during normal operation to the point that it can’t > restart! :D > > A secondary issue is if we think the divergent logs are acceptable? This > may be deemed acceptable given the producers have chosen availability over > consistency when they produced with acks = 1? Though personally, the > system having diverging replicas of an immutable commit log just doesn't > sit right. > > I see us having a few options here: > > * Have the replicas detect the divergence of their logs e.g. a follower > compares the checksum of its last record with the same offset on the > leader. The follower can then workout that its log has diverged from the > leader. At which point it could either halt, stop replicating that > partition or search backwards to find the point of divergence, truncate and > recover. (possibly saving the truncated part somewhere). This would be a > protocol change for Kafka. This solution trades availability, (you’ve got > less ISRs during the extended re-sync process), for consistency. > > * Leave the logs as they are and have the indexing of offsets in the log > on start up handle such a situation gracefully. This leaves logs in a > divergent state between replicas, (meaning replays would yield different > messages if the leader was up to down), but gives better availability, (no > time spent not being an ISR while it repairs any divergence). > > * Support multiple options and allow it be tuned, ideally by topic. > > * Something else... > > I’m happy/keen to contribute here. But I’d like to first discuss which > option should be investigated. > > Andy > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) >