[~junrao] i thought that the indexer took the offset of the first record in a compressed set. Looking at `LogSegment.recover` in the 0.9.0.1 code base that does indeed seen to be the case.
I haven't dumped the offsets again, but I can of you still need it? On Wed, 6 Jul 2016, 09:04 Andrew Coates, <big.andy.coa...@gmail.com> wrote: > [~junrao] will double check > > On Tue, 5 Jul 2016, 18:53 Jun Rao (JIRA), <j...@apache.org> wrote: > >> >> [ >> https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362865#comment-15362865 >> ] >> >> Jun Rao commented on KAFKA-3919: >> -------------------------------- >> >> [~BigAndy], thanks for the investigation and the additional information. >> >> Let me first explain how log reconciliation normal works. Each broker >> maintains the last committed offset for each partition and stores that >> information in a checkpoint file replication-offset-checkpoint. A message >> is only considered committed if it's received by all in-sync replicas. The >> leader advances the last committed offset and propagates it to the >> followers. So, the follower's last committed offset is always <= the >> leader's. When a replica becomes a leader, it won't do any truncation to >> its log and will instead try to commit all messages in its local log. When >> a replica becomes a follower, it will first truncate its log to the last >> committed offset stored in its local checkpoint file and then start >> replicating from that offset. If unclean leader election is disabled, after >> truncation, the follower's last offset should always be <= the leader's >> last offset. >> >> Another thing we do is that if a broker is shut down forcefully, on >> startup, we will do log recovery to remove any corrupted messages. In your >> case, it seems what happens is that when the new leader (2011) comes up, >> its log is actually corrupted and therefore it has to truncate some >> messages. This potentially will remove messages that have been previously >> committed. Then, when broker 2012 comes up and becomes the follower. It's >> possible that after the follower truncates its log to its local last >> committed offset, that offset is actually larger than the last recovered >> offset in the leader. If no new messages have been appended to the new >> leader, the follower will realize that its offset is out of range and will >> truncate its log again to the leader's last offset. However, in this case, >> it seems some new messages have been published to the new leader and the >> follower's offset is actually in range. It's just that the follower's >> offset may now point to a completely new set of messages. In this case if >> the follower's offset points to the middle of a compressed message set, the >> follower will get the whole compressed message set and append it to its >> local log. Currently, in the follower, will only ensure that the last >> offset in a compressed message set be larger than the last offset in the >> log, but not the first offset. This seems to be the situation that you are >> in. >> >> There is still one thing not very clear to me. When building indexes >> during log recovery, we actually only add index entries at the boundary of >> compressed message set. So as long as the last offset of each compressed >> set keeps increasing, we won't hit the InvalidOffsetException in the >> description. Could you check whether 1239742691 is the last offset of a >> compressed set and if so whether there is a case that the last offset of a >> compressed set is out of order in the log? >> >> > Broker faills to start after ungraceful shutdown due to >> non-monotonically incrementing offsets in logs >> > >> ------------------------------------------------------------------------------------------------------ >> > >> > Key: KAFKA-3919 >> > URL: https://issues.apache.org/jira/browse/KAFKA-3919 >> > Project: Kafka >> > Issue Type: Bug >> > Components: core >> > Affects Versions: 0.9.0.1 >> > Reporter: Andy Coates >> > >> > Hi All, >> > I encountered an issue with Kafka following a power outage that saw a >> proportion of our cluster disappear. When the power came back on several >> brokers halted on start up with the error: >> > {noformat} >> > Fatal error during KafkaServerStartable startup. Prepare to >> shutdown” >> > kafka.common.InvalidOffsetException: Attempt to append an offset >> (1239742691) to position 35728 no larger than the last offset appended >> (1239742822) to >> /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index. >> > at >> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) >> > at >> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) >> > at >> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) >> > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) >> > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) >> > at kafka.log.LogSegment.recover(LogSegment.scala:188) >> > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) >> > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) >> > at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> > at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> > at >> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> > at >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> > at kafka.log.Log.loadSegments(Log.scala:160) >> > at kafka.log.Log.<init>(Log.scala:90) >> > at >> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) >> > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) >> > at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> > at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> > at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> > at java.lang.Thread.run(Thread.java:745) >> > {noformat} >> > The only way to recover the brokers was to delete the log files that >> contained non monotonically incrementing offsets. >> > We've spent some time digging through the logs and I feel I may have >> worked out the sequence of events leading to this issue, (though this is >> based on some assumptions I've made about the way Kafka is working, which >> may be wrong). >> > First off, we have unclean leadership elections disable. (We did later >> enable them to help get around some other issues we were having, but this >> was several hours after this issue manifested), and we're producing to the >> topic with gzip compression and acks=1 >> > We looked through the data logs that were causing the brokers to not >> start. What we found the initial part of the log has monotonically >> increasing offset, where each compressed batch normally contained one or >> two records. Then the is a batch that contains many records, whose first >> records have an offset below the previous batch and whose last record has >> an offset above the previous batch. Following on from this there continues >> a period of large batches, with monotonically increasing offsets, and then >> the log returns to batches with one or two records. >> > Our working assumption here is that the period before the offset dip, >> with the small batches, is pre-outage normal operation. The period of >> larger batches is from just after the outage, where producers have a back >> log to processes when the partition becomes available, and then things >> return to normal batch sizes again once the back log clears. >> > We did also look through the Kafka's application logs to try and piece >> together the series of events leading up to this. Here’s what we know >> happened, with regards to one partition that has issues, from the logs: >> > Prior to outage: >> > * Replicas for the partition are brokers 2011, 2012, 2024, with 2024 >> being the preferred leader. >> > * Producers using acks=1, compression=gzip >> > * Brokers configured with unclean.elections=false, >> zk.session-timeout=36s >> > Post outage: >> > * 2011 comes up first, (also as the Controller), recovers unflushed log >> segment 1239444214, completes load with offset 1239740602, and becomes >> leader of the partition. >> > * 2012 comes up next, recovers its log, recovers unflushed log segment >> 1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of >> the recovered offset of the current leader), and starts following. >> > * 2024 comes up quickly after 2012. recovers unflushed log segment >> 1239444214, truncates to offset 1239742250, (thats 1,648 records ahead of >> the recovered offset of the current leader), and starts following. >> > * The Controller adds 2024 to the replica set just before 2024 halts >> due to another partition having an offset greater than the leader. >> > * The Controller adds 2012 to the replica set just before 2012 halts >> due to another partition having an offset greater than the leader. >> > * When 2012 is next restarted, it fails to fully start as its >> complaining of invalid offsets in the log. >> > You’ll notice that the offset the brokers truncate to are different for >> each of the three brokers. >> > We're assuming that when the 2012 starts up and follows the leader it >> request records from its truncated offsets, but that the logs have diverged >> on these two brokers to the point that the requested offset corresponds >> within the leader's log to the middle of a compressed record set, not at a >> record set boundary. The leader then returns the whole compressed set, >> which the follower appends to its log - unknowingly introducing a dip in >> its otherwise monotonically incrementing offsets. >> > Several of our brokers were unlucky enough to have this dip at the 4K >> boundary used by the offset indexer, causing a protracted outage. We’ve >> written a little utility that shows several more brokers have a dip outside >> of the 4K boundary. >> > There are some assumptions in there, which I’ve not got around to >> confirming / denying. (A quick attempt to recreate this failed and I've not >> found the time to invest more). >> > Of course I'd really appreciate the community / experts stepping in and >> commenting on whether our assumptions are right or wrong, or if there is >> another explanation to the problem. >> > Obviously, the fact the broker got into this state and then won’t start >> is obviously a bug, and one I’d like to fix. A Kafka broker should not >> corrupt its own log during normal operation to the point that it can’t >> restart! :D >> > A secondary issue is if we think the divergent logs are acceptable? >> This may be deemed acceptable given the producers have chosen availability >> over consistency when they produced with acks = 1? Though personally, the >> system having diverging replicas of an immutable commit log just doesn't >> sit right. >> > I see us having a few options here: >> > * Have the replicas detect the divergence of their logs e.g. a follower >> compares the checksum of its last record with the same offset on the >> leader. The follower can then workout that its log has diverged from the >> leader. At which point it could either halt, stop replicating that >> partition or search backwards to find the point of divergence, truncate and >> recover. (possibly saving the truncated part somewhere). This would be a >> protocol change for Kafka. This solution trades availability, (you’ve got >> less ISRs during the extended re-sync process), for consistency. >> > * Leave the logs as they are and have the indexing of offsets in the >> log on start up handle such a situation gracefully. This leaves logs in a >> divergent state between replicas, (meaning replays would yield different >> messages if the leader was up to down), but gives better availability, (no >> time spent not being an ISR while it repairs any divergence). >> > * Support multiple options and allow it be tuned, ideally by topic. >> > * Something else... >> > I’m happy/keen to contribute here. But I’d like to first discuss which >> option should be investigated. >> > Andy >> >> >> >> -- >> This message was sent by Atlassian JIRA >> (v6.3.4#6332) >> >