
Andy Coates updated KAFKA-3919:
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

        Fatal error during KafkaServerStartable startup. Prepare to shutdown”
        kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
        at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
        at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
        at kafka.log.LogSegment.recover(LogSegment.scala:188)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at kafka.log.Log.loadSegments(Log.scala:160)
        at kafka.log.Log.<init>(Log.scala:90)
        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:745)

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

We've spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong).

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested), and we're producing to the topic with gzip 
compression and acks=1

We looked through the data logs that were causing the brokers to not start. 
What we found the initial part of the log has monotonically increasing offset, 
where each compressed batch normally contained one or two records. Then the is 
a batch that contains many records, whose first records have an offset below 
the previous batch and whose last record has an offset above the previous 
batch. Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip, with the 
small batches, is pre-outage normal operation. The period of larger batches is 
from just after the outage, where producers have a back log to processes when 
the partition becomes available, and then things return to normal batch sizes 
again once the back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this. Here’s what we know happened, with 
regards to one partition that has issues, from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log segment 
1239444214, completes load with offset 1239740602, and becomes leader of the 
2012 comes up next, recovers its log,  recovers unflushed log segment 
1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of the 
recovered offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log segment 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set just before 2024 halts due to 
another partition having an offset greater than the leader.
The Controller adds 2012 to the replica set just before 2012 halts due to 
another partition having an offset greater than the leader.
When 2012 is next restarted, it fails to fully start as its complaining of 
invalid offsets in the log.

You’ll notice that the offset the brokers truncate to are different for each of 
the three brokers. 

We're assuming that when the 2012 starts up and follows the leader it request 
records from its truncated offsets, but that the logs have diverged on these 
two brokers to the point that the requested offset corresponds within the 
leader's log to the middle of a compressed record set, not at a record set 
boundary.  The leader then returns the whole compressed set, which the follower 
appends to its log - unknowingly introducing a dip in its otherwise 
monotonically incrementing offsets.

Several of our brokers were unlucky enough to have this dip at the 4K boundary 
used by the offset indexer, causing a protracted outage.  We’ve written a 
little utility that shows several more brokers have a dip outside of the 4K 

There are some assumptions in there, which I’ve not got around to confirming / 
denying. (A quick attempt to recreate this failed and I've not found the time 
to invest more).

Of course I'd really appreciate the community / experts stepping in and 
commenting on whether our assumptions are right or wrong, or if there is 
another explanation to the problem. 

Obviously, the fact the broker got into this state and then won’t start is 
obviously a bug, and one I’d like to fix.  A Kafka broker should not corrupt 
its own log during normal operation to the point that it can’t restart! :D

A secondary issue is if we think the divergent logs are acceptable? This may be 
deemed acceptable given the producers have chosen availability over consistency 
when they produced with acks = 1?  Though personally, the system having 
diverging replicas of an immutable commit log just doesn't sit right.

I see us having a few options here:
* Have the replicas detect the divergence of their logs e.g. a follower 
compares the checksum of its last record with the same offset on the leader. 
The follower can then workout that its log has diverged from the leader.  At 
which point it could either halt, stop replicating that partition or search 
backwards to find the point of divergence, truncate and recover. (possibly 
saving the truncated part somewhere). This would be a protocol change for 
Kafka.  This solution trades availability, (you’ve got less ISRs during the 
extended re-sync process), for consistency.
* Leave the logs as they are and have the indexing of offsets in the log on 
start up handle such a situation gracefully.  This leaves logs in a divergent 
state between replicas, (meaning replays would yield different messages if the 
leader was up to down), but gives better availability, (no time spent not being 
an ISR while it repairs any divergence).
* Support multiple options and allow it be tuned, ideally by topic.
* Something else...

I’m happy/keen to contribute here. But I’d like to first discuss which option 
should be investigated.


