[~junrao] i thought that the indexer took the offset of the first record in
a compressed set. Looking at `LogSegment.recover` in the 0.9.0.1 code base
that does indeed seen to be the case.

I haven't dumped the offsets again, but I can of you still need it?

On Wed, 6 Jul 2016, 09:04 Andrew Coates, <big.andy.coa...@gmail.com> wrote:

> [~junrao] will double check
>
> On Tue, 5 Jul 2016, 18:53 Jun Rao (JIRA), <j...@apache.org> wrote:
>
>>
>>     [
>> https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362865#comment-15362865
>> ]
>>
>> Jun Rao commented on KAFKA-3919:
>> --------------------------------
>>
>> [~BigAndy], thanks for the investigation and the additional information.
>>
>> Let me first explain how log reconciliation normal works. Each broker
>> maintains the last committed offset for each partition and stores that
>> information in a checkpoint file replication-offset-checkpoint. A message
>> is only considered committed if it's received by all in-sync replicas. The
>> leader advances the last committed offset and propagates it to the
>> followers. So, the follower's last committed offset is always <= the
>> leader's. When a replica becomes a leader, it won't do any truncation to
>> its log and will instead try to commit all messages in its local log. When
>> a replica becomes a follower, it will first truncate its log to the last
>> committed offset stored in its local checkpoint file and then start
>> replicating from that offset. If unclean leader election is disabled, after
>> truncation, the follower's last offset should always be <= the leader's
>> last offset.
>>
>> Another thing we do is that if a broker is shut down forcefully, on
>> startup, we will do log recovery to remove any corrupted messages. In your
>> case, it seems what happens is that when the new leader (2011) comes up,
>> its log is actually corrupted and therefore it has to truncate some
>> messages. This potentially will remove messages that have been previously
>> committed. Then, when broker 2012 comes up and becomes the follower. It's
>> possible that after the follower truncates its log to its local last
>> committed offset, that offset is actually larger than the last recovered
>> offset in the leader. If no new messages have been appended to the new
>> leader, the follower will realize that its offset is out of range and will
>> truncate its log again to the leader's last offset. However, in this case,
>> it seems some new messages have been published to the new leader and the
>> follower's offset is actually in range. It's just that the follower's
>> offset may now point to a completely new set of messages. In this case if
>> the follower's offset points to the middle of a compressed message set, the
>> follower will get the whole compressed message set and append it to its
>> local log. Currently, in the follower, will only ensure that the last
>> offset in a compressed message set be larger than the last offset in the
>> log, but not the first offset. This seems to be the situation that you are
>> in.
>>
>> There is still one thing not very clear to me. When building indexes
>> during log recovery, we actually only add index entries at the boundary of
>> compressed message set. So as long as the last offset of each compressed
>> set keeps increasing, we won't hit the InvalidOffsetException in the
>> description. Could you check whether 1239742691 is the last offset of a
>> compressed set and if so whether there is a case that the last offset of a
>> compressed set is out of order in the log?
>>
>> > Broker faills to start after ungraceful shutdown due to
>> non-monotonically incrementing offsets in logs
>> >
>> ------------------------------------------------------------------------------------------------------
>> >
>> >                 Key: KAFKA-3919
>> >                 URL: https://issues.apache.org/jira/browse/KAFKA-3919
>> >             Project: Kafka
>> >          Issue Type: Bug
>> >          Components: core
>> >    Affects Versions: 0.9.0.1
>> >            Reporter: Andy Coates
>> >
>> > Hi All,
>> > I encountered an issue with Kafka following a power outage that saw a
>> proportion of our cluster disappear. When the power came back on several
>> brokers halted on start up with the error:
>> > {noformat}
>> >       Fatal error during KafkaServerStartable startup. Prepare to
>> shutdown”
>> >       kafka.common.InvalidOffsetException: Attempt to append an offset
>> (1239742691) to position 35728 no larger than the last offset appended
>> (1239742822) to
>> /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
>> >       at
>> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>> >       at
>> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>> >       at
>> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>> >       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>> >       at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>> >       at kafka.log.LogSegment.recover(LogSegment.scala:188)
>> >       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>> >       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>> >       at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> >       at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >       at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >       at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> >       at kafka.log.Log.loadSegments(Log.scala:160)
>> >       at kafka.log.Log.<init>(Log.scala:90)
>> >       at
>> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>> >       at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>> >       at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> >       at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> >       at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> >       at java.lang.Thread.run(Thread.java:745)
>> > {noformat}
>> > The only way to recover the brokers was to delete the log files that
>> contained non monotonically incrementing offsets.
>> > We've spent some time digging through the logs and I feel I may have
>> worked out the sequence of events leading to this issue, (though this is
>> based on some assumptions I've made about the way Kafka is working, which
>> may be wrong).
>> > First off, we have unclean leadership elections disable. (We did later
>> enable them to help get around some other issues we were having, but this
>> was several hours after this issue manifested), and we're producing to the
>> topic with gzip compression and acks=1
>> > We looked through the data logs that were causing the brokers to not
>> start. What we found the initial part of the log has monotonically
>> increasing offset, where each compressed batch normally contained one or
>> two records. Then the is a batch that contains many records, whose first
>> records have an offset below the previous batch and whose last record has
>> an offset above the previous batch. Following on from this there continues
>> a period of large batches, with monotonically increasing offsets, and then
>> the log returns to batches with one or two records.
>> > Our working assumption here is that the period before the offset dip,
>> with the small batches, is pre-outage normal operation. The period of
>> larger batches is from just after the outage, where producers have a back
>> log to processes when the partition becomes available, and then things
>> return to normal batch sizes again once the back log clears.
>> > We did also look through the Kafka's application logs to try and piece
>> together the series of events leading up to this. Here’s what we know
>> happened, with regards to one partition that has issues, from the logs:
>> > Prior to outage:
>> > * Replicas for the partition are brokers 2011, 2012,  2024, with 2024
>> being the preferred leader.
>> > * Producers using acks=1, compression=gzip
>> > * Brokers configured with unclean.elections=false,
>> zk.session-timeout=36s
>> > Post outage:
>> > * 2011 comes up first, (also as the Controller), recovers unflushed log
>> segment 1239444214, completes load with offset 1239740602, and becomes
>> leader of the partition.
>> > * 2012 comes up next, recovers its log,  recovers unflushed log segment
>> 1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of
>> the recovered offset of the current leader), and starts following.
>> > * 2024 comes up quickly after 2012.  recovers unflushed log segment
>> 1239444214, truncates to offset  1239742250, (thats 1,648 records ahead of
>> the recovered offset of the current leader), and starts following.
>> > * The Controller adds 2024 to the replica set just before 2024 halts
>> due to another partition having an offset greater than the leader.
>> > * The Controller adds 2012 to the replica set just before 2012 halts
>> due to another partition having an offset greater than the leader.
>> > * When 2012 is next restarted, it fails to fully start as its
>> complaining of invalid offsets in the log.
>> > You’ll notice that the offset the brokers truncate to are different for
>> each of the three brokers.
>> > We're assuming that when the 2012 starts up and follows the leader it
>> request records from its truncated offsets, but that the logs have diverged
>> on these two brokers to the point that the requested offset corresponds
>> within the leader's log to the middle of a compressed record set, not at a
>> record set boundary.  The leader then returns the whole compressed set,
>> which the follower appends to its log - unknowingly introducing a dip in
>> its otherwise monotonically incrementing offsets.
>> > Several of our brokers were unlucky enough to have this dip at the 4K
>> boundary used by the offset indexer, causing a protracted outage.  We’ve
>> written a little utility that shows several more brokers have a dip outside
>> of the 4K boundary.
>> > There are some assumptions in there, which I’ve not got around to
>> confirming / denying. (A quick attempt to recreate this failed and I've not
>> found the time to invest more).
>> > Of course I'd really appreciate the community / experts stepping in and
>> commenting on whether our assumptions are right or wrong, or if there is
>> another explanation to the problem.
>> > Obviously, the fact the broker got into this state and then won’t start
>> is obviously a bug, and one I’d like to fix.  A Kafka broker should not
>> corrupt its own log during normal operation to the point that it can’t
>> restart! :D
>> > A secondary issue is if we think the divergent logs are acceptable?
>> This may be deemed acceptable given the producers have chosen availability
>> over consistency when they produced with acks = 1?  Though personally, the
>> system having diverging replicas of an immutable commit log just doesn't
>> sit right.
>> > I see us having a few options here:
>> > * Have the replicas detect the divergence of their logs e.g. a follower
>> compares the checksum of its last record with the same offset on the
>> leader. The follower can then workout that its log has diverged from the
>> leader.  At which point it could either halt, stop replicating that
>> partition or search backwards to find the point of divergence, truncate and
>> recover. (possibly saving the truncated part somewhere). This would be a
>> protocol change for Kafka.  This solution trades availability, (you’ve got
>> less ISRs during the extended re-sync process), for consistency.
>> > * Leave the logs as they are and have the indexing of offsets in the
>> log on start up handle such a situation gracefully.  This leaves logs in a
>> divergent state between replicas, (meaning replays would yield different
>> messages if the leader was up to down), but gives better availability, (no
>> time spent not being an ISR while it repairs any divergence).
>> > * Support multiple options and allow it be tuned, ideally by topic.
>> > * Something else...
>> > I’m happy/keen to contribute here. But I’d like to first discuss which
>> option should be investigated.
>> > Andy
>>
>>
>>
>> --
>> This message was sent by Atlassian JIRA
>> (v6.3.4#6332)
>>
>

Reply via email to