Hi, Mark, Thanks for pointing this out. This issue is fixed in 0.10.0.0 in https://issues.apache.org/jira/browse/KAFKA-725.
In 0.9.0, what's going to happen is the consumer will get an unknown error. Normally, the consumer will only reset the offset if it gets an OffsetOutOfRangeException. If it gets an unknown error, it's supposed to just retry the same fetch request without resetting the offset. Since Connect uses the java consumer, I'd expect it to behave in the same way. Were you able to verify that it behaves differently? In any case, do you think you can try the latest stable release? Jun On Mon, Jan 23, 2017 at 11:53 AM, Mark Smith <m...@qq.is> wrote: > Jun, > > Thanks for the reply. This makes sense, I think. One followup question: > > During the failover when the new-leader has a stale HWM, is it possible > for this broker to return an error to consumers who are consuming > between HWM and LEO? > > I saw a comment on a document that says the leader could, legally, > return an empty response temporarily until the HWM has caught up -- but > I've examined the code (0.9) and this does not seem to be implemented. > Instead, it looks like we get IllegalArgumentException. > > 1) Consumer fetches from original leader, sees latest HWM > 2) Failover happens > 3) New leader has stale HWM > 4) Consumer sends next fetch to new leader, with start offset = latest > HWM (from step 1) > 5) New leader returns IllegalArgumentException because start offset > > stale HWM (from step 3) > > It looks like what then happened is that the client, Kafka Connect in > our case, reset its offset to 0 and started reconsuming from the head of > the log. This causes significant message churn and is unnecessary, > although I'm not entirely sure of the correct fix. > > I'm basing this on the code I'm looking at from 0.9: > > * fetchMessages is called, and fetchOnlyCommitted is true (request is > from a consumer, not a replica) > > * readFromLocalLog is called, sets maxOffsetOpt: > > val maxOffsetOpt = if (readOnlyCommitted) > Some(localReplica.highWatermark.messageOffset) > else > None > > * read is later called: > > log.read(offset, fetchSize, maxOffsetOpt) > > * read then throws the exception: > > // calculate the length of the message set to read based on whether > or not they gave us a maxOffset > val length = > maxOffset match { > case Some(offset) => { > // there is a max offset, translate it to a file position and > use that to calculate the max read size > if(offset < startOffset) > throw new IllegalArgumentException("Attempt to read with a > maximum offset (%d) less than the start offset > (%d).".format(offset, startOffset)) > > * readFromLocalLog catches and returns: > > case e: Throwable => > BrokerTopicStats.getBrokerTopicStats(topic). > failedFetchRequestRate.mark() > BrokerTopicStats.getBrokerAllTopicsStats(). > failedFetchRequestRate.mark() > error("Error processing fetch operation on partition [%s,%d] > offset %d".format(topic, partition, offset), e) > LogReadResult(FetchDataInfo(LogOffsetMetadata. > UnknownOffsetMetadata, > MessageSet.Empty), -1L, fetchSize, false, Some(e)) > > The consumer then restarts back to the beginning. This looks to be the > source of our 'data loss', which isn't actually loss but a bad > interaction of failover and catching a stale HWM leading to errors being > thrown by the broker when it maybe doesn't need to. > > Thoughts? > > -- > Mark Smith > m...@qq.is > > > On Wed, Jan 18, 2017, at 02:11 PM, Jun Rao wrote: > > Hi, Mark, > > > > Your understanding about HWM transition is correct. When the leader > > changes > > due to preferred leader election, the new leader will have all the > > committed messages, but a potentially stale HWM. The new leader won't do > > any truncation to its local log though. Instead, it will try to commit > > all > > messages in its local log. It will advance HWM after all followers's LEO > > have advanced. > > > > Thanks, > > > > Jun > > > > On Wed, Jan 18, 2017 at 12:13 AM, Mark Smith <m...@qq.is> wrote: > > > > > Hi all, > > > > > > Here at Dropbox we're still (off and on) trying to get to the bottom of > > > the data loss that's been hitting our largest cluster during preferred > > > replica elections. It unfortunately has repeated a few times, so now we > > > have a question. > > > > > > To make sure we're understanding, message commit status (replication) > > > basically goes through three phases: > > > > > > * Messages Uncommitted; the leader has received new production, and the > > > followers receive the messages in a Fetch request. Everybody's LEO is > > > incremented, but HWMs are unchanged. No messages are considered > > > committed. > > > > > > * Leader Committed; in the subsequent Fetch, the follower says "my LEO > > > is now X" and the leader records that and then updates its HWM to be > the > > > minimum of all follower's LEO. After this stage, the messages are > > > committed -- only on the leader. The follower's HWM is still in the > > > past. > > > > > > * Leader+Follower Committed; or HWM Replication Fetch; in this final > > > fetch, the follower is informed of the new HWM by the leader and > > > increments its own HWM accordingly. > > > > > > These aren't actually distinct phases of course, they're just part of > > > individual fetch requests, but I think logically the HWM transitions > can > > > be thought of in those way. I.e., "message uncommitted", "leader > > > committed", "leader+follower committed". > > > > > > Is this understanding accurate? > > > > > > If so, is it just a fact that (right now) a preferred replica election > > > has a small change of electing a follower and causing message loss of > > > any "leader committed" messages (i.e., messages that are not considered > > > committed on the follower that is now getting promoted)? > > > > > > We can't find anything in the protocol that would guard against this. > > > I've also been reading KIP-101 and it looks like this is being referred > > > to sort-of in Scenario 1, however, that scenario is mentioning broker > > > failure -- and my concern is that data loss is possible even in the > > > normal scenario with no broker failures. > > > > > > Any thoughts? > > > > > > -- > > > Mark Smith > > > m...@qq.is > > > >