[~junrao] will double check

On Tue, 5 Jul 2016, 18:53 Jun Rao (JIRA), <j...@apache.org> wrote:

>
>     [
> https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362865#comment-15362865
> ]
>
> Jun Rao commented on KAFKA-3919:
> --------------------------------
>
> [~BigAndy], thanks for the investigation and the additional information.
>
> Let me first explain how log reconciliation normal works. Each broker
> maintains the last committed offset for each partition and stores that
> information in a checkpoint file replication-offset-checkpoint. A message
> is only considered committed if it's received by all in-sync replicas. The
> leader advances the last committed offset and propagates it to the
> followers. So, the follower's last committed offset is always <= the
> leader's. When a replica becomes a leader, it won't do any truncation to
> its log and will instead try to commit all messages in its local log. When
> a replica becomes a follower, it will first truncate its log to the last
> committed offset stored in its local checkpoint file and then start
> replicating from that offset. If unclean leader election is disabled, after
> truncation, the follower's last offset should always be <= the leader's
> last offset.
>
> Another thing we do is that if a broker is shut down forcefully, on
> startup, we will do log recovery to remove any corrupted messages. In your
> case, it seems what happens is that when the new leader (2011) comes up,
> its log is actually corrupted and therefore it has to truncate some
> messages. This potentially will remove messages that have been previously
> committed. Then, when broker 2012 comes up and becomes the follower. It's
> possible that after the follower truncates its log to its local last
> committed offset, that offset is actually larger than the last recovered
> offset in the leader. If no new messages have been appended to the new
> leader, the follower will realize that its offset is out of range and will
> truncate its log again to the leader's last offset. However, in this case,
> it seems some new messages have been published to the new leader and the
> follower's offset is actually in range. It's just that the follower's
> offset may now point to a completely new set of messages. In this case if
> the follower's offset points to the middle of a compressed message set, the
> follower will get the whole compressed message set and append it to its
> local log. Currently, in the follower, will only ensure that the last
> offset in a compressed message set be larger than the last offset in the
> log, but not the first offset. This seems to be the situation that you are
> in.
>
> There is still one thing not very clear to me. When building indexes
> during log recovery, we actually only add index entries at the boundary of
> compressed message set. So as long as the last offset of each compressed
> set keeps increasing, we won't hit the InvalidOffsetException in the
> description. Could you check whether 1239742691 is the last offset of a
> compressed set and if so whether there is a case that the last offset of a
> compressed set is out of order in the log?
>
> > Broker faills to start after ungraceful shutdown due to
> non-monotonically incrementing offsets in logs
> >
> ------------------------------------------------------------------------------------------------------
> >
> >                 Key: KAFKA-3919
> >                 URL: https://issues.apache.org/jira/browse/KAFKA-3919
> >             Project: Kafka
> >          Issue Type: Bug
> >          Components: core
> >    Affects Versions: 0.9.0.1
> >            Reporter: Andy Coates
> >
> > Hi All,
> > I encountered an issue with Kafka following a power outage that saw a
> proportion of our cluster disappear. When the power came back on several
> brokers halted on start up with the error:
> > {noformat}
> >       Fatal error during KafkaServerStartable startup. Prepare to
> shutdown”
> >       kafka.common.InvalidOffsetException: Attempt to append an offset
> (1239742691) to position 35728 no larger than the last offset appended
> (1239742822) to
> /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
> >       at
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> >       at
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> >       at
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> >       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> >       at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
> >       at kafka.log.LogSegment.recover(LogSegment.scala:188)
> >       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
> >       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
> >       at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >       at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >       at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >       at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >       at kafka.log.Log.loadSegments(Log.scala:160)
> >       at kafka.log.Log.<init>(Log.scala:90)
> >       at
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
> >       at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
> >       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> >       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >       at java.lang.Thread.run(Thread.java:745)
> > {noformat}
> > The only way to recover the brokers was to delete the log files that
> contained non monotonically incrementing offsets.
> > We've spent some time digging through the logs and I feel I may have
> worked out the sequence of events leading to this issue, (though this is
> based on some assumptions I've made about the way Kafka is working, which
> may be wrong).
> > First off, we have unclean leadership elections disable. (We did later
> enable them to help get around some other issues we were having, but this
> was several hours after this issue manifested), and we're producing to the
> topic with gzip compression and acks=1
> > We looked through the data logs that were causing the brokers to not
> start. What we found the initial part of the log has monotonically
> increasing offset, where each compressed batch normally contained one or
> two records. Then the is a batch that contains many records, whose first
> records have an offset below the previous batch and whose last record has
> an offset above the previous batch. Following on from this there continues
> a period of large batches, with monotonically increasing offsets, and then
> the log returns to batches with one or two records.
> > Our working assumption here is that the period before the offset dip,
> with the small batches, is pre-outage normal operation. The period of
> larger batches is from just after the outage, where producers have a back
> log to processes when the partition becomes available, and then things
> return to normal batch sizes again once the back log clears.
> > We did also look through the Kafka's application logs to try and piece
> together the series of events leading up to this. Here’s what we know
> happened, with regards to one partition that has issues, from the logs:
> > Prior to outage:
> > * Replicas for the partition are brokers 2011, 2012,  2024, with 2024
> being the preferred leader.
> > * Producers using acks=1, compression=gzip
> > * Brokers configured with unclean.elections=false, zk.session-timeout=36s
> > Post outage:
> > * 2011 comes up first, (also as the Controller), recovers unflushed log
> segment 1239444214, completes load with offset 1239740602, and becomes
> leader of the partition.
> > * 2012 comes up next, recovers its log,  recovers unflushed log segment
> 1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of
> the recovered offset of the current leader), and starts following.
> > * 2024 comes up quickly after 2012.  recovers unflushed log segment
> 1239444214, truncates to offset  1239742250, (thats 1,648 records ahead of
> the recovered offset of the current leader), and starts following.
> > * The Controller adds 2024 to the replica set just before 2024 halts due
> to another partition having an offset greater than the leader.
> > * The Controller adds 2012 to the replica set just before 2012 halts due
> to another partition having an offset greater than the leader.
> > * When 2012 is next restarted, it fails to fully start as its
> complaining of invalid offsets in the log.
> > You’ll notice that the offset the brokers truncate to are different for
> each of the three brokers.
> > We're assuming that when the 2012 starts up and follows the leader it
> request records from its truncated offsets, but that the logs have diverged
> on these two brokers to the point that the requested offset corresponds
> within the leader's log to the middle of a compressed record set, not at a
> record set boundary.  The leader then returns the whole compressed set,
> which the follower appends to its log - unknowingly introducing a dip in
> its otherwise monotonically incrementing offsets.
> > Several of our brokers were unlucky enough to have this dip at the 4K
> boundary used by the offset indexer, causing a protracted outage.  We’ve
> written a little utility that shows several more brokers have a dip outside
> of the 4K boundary.
> > There are some assumptions in there, which I’ve not got around to
> confirming / denying. (A quick attempt to recreate this failed and I've not
> found the time to invest more).
> > Of course I'd really appreciate the community / experts stepping in and
> commenting on whether our assumptions are right or wrong, or if there is
> another explanation to the problem.
> > Obviously, the fact the broker got into this state and then won’t start
> is obviously a bug, and one I’d like to fix.  A Kafka broker should not
> corrupt its own log during normal operation to the point that it can’t
> restart! :D
> > A secondary issue is if we think the divergent logs are acceptable? This
> may be deemed acceptable given the producers have chosen availability over
> consistency when they produced with acks = 1?  Though personally, the
> system having diverging replicas of an immutable commit log just doesn't
> sit right.
> > I see us having a few options here:
> > * Have the replicas detect the divergence of their logs e.g. a follower
> compares the checksum of its last record with the same offset on the
> leader. The follower can then workout that its log has diverged from the
> leader.  At which point it could either halt, stop replicating that
> partition or search backwards to find the point of divergence, truncate and
> recover. (possibly saving the truncated part somewhere). This would be a
> protocol change for Kafka.  This solution trades availability, (you’ve got
> less ISRs during the extended re-sync process), for consistency.
> > * Leave the logs as they are and have the indexing of offsets in the log
> on start up handle such a situation gracefully.  This leaves logs in a
> divergent state between replicas, (meaning replays would yield different
> messages if the leader was up to down), but gives better availability, (no
> time spent not being an ISR while it repairs any divergence).
> > * Support multiple options and allow it be tuned, ideally by topic.
> > * Something else...
> > I’m happy/keen to contribute here. But I’d like to first discuss which
> option should be investigated.
> > Andy
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>

Reply via email to