Hi Guozhang,

The reason the fetcher thread died was because it expects the FetchRequest
to be always fetching from the current log end offset. We could loose this
restriction.

I agree with the current design that he replica fetcher thread should not
die so the other partitions will not lag behind. I am not sure about the
right thing to do for the partition that contains a corrupted message.
Leaving a hole in the followers is a simple solution but seems breaking the
guarantee provided by Kafka. From the consumer side it is probably fine
because a consumer cannot process the corrupted message anyways. But from
the producer's perspective this might be a problem if people really care
about the message and send with acks=-1. If we let the partition stop
making progress, the producer will not able to make progress and notice
something went wrong. This will indeed have a bigger impact, though.

Theoretically we can have some auto recovery mechanism. e.g. If a follower
keep fetching from the same offset for 3 times, let the leader check the
log to ensure the messages are not corrupted, if it is, truncate the log
and give up the leadership. But this would be a bigger change.

Thanks,

Jiangjie (Becket) Qin


On Wed, Nov 9, 2016 at 10:33 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Jun,
>
> Thanks for reporting this issue. I looked through the code and I agree the
> logic you found with 0.9.0.1 also exists in 0.10.0+. However, I think the
> process is designed intentionally like this to skip the message set that is
> causing such fatal error, since otherwise the thread will keep fetching
> this message set and throwing the same exceptions. It will cause the
> follower to have a "hole" in this log, but this is arguably better than
> falls the replica fetcher thread in a bad state.
>
> As for the out of range issue following up the skipping of the message set,
> I think they should be handled in the "handleOffsetOutOfRange" logic, of
> the ReplicaFetcherThread class, to reset the fetching offset to the log end
> offset of the leader and then retry. So I do not know why you observed that
> the thread actually stopped because of this issue. Could you check the
> source code as well as stack trace to see why this happens?
>
>
> Guozhang
>
>
> On Wed, Nov 2, 2016 at 4:38 AM, Jun H. <junhe...@gmail.com> wrote:
>
> > Hi all,
> >
> > We recently discovered an issue in Kafka 0.9.0.1 (), where
> > ReplicaFetcherThread stopped after ReplicaFetcherThread received a
> > corrupted message. As the same logic exists also in Kafka 0.10.0.0 and
> > 0.10.0.1, they may have the similar issue.
> >
> > Here are system logs related to this issue.
> >
> > > 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474
> > > ReplicaFetcherThread.apply - Found invalid messages during fetch for
> > > partition [logs,41] offset 39021512238 error Message is corrupt (stored
> > crc
> > > = 2028421553, computed crc = 3577227678)
> > > 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474
> > > ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474],
> Error
> > due
> > > to kafka.common.KafkaException: - error processing data for partition
> > > [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException:
> > > Offset mismatch: fetched offset = 39021512301, log end offset =
> > 39021512238.
> >
> >
> > First, ReplicaFetcherThread got a corrupted message (offset 39021512238)
> > due to some blip.
> >
> > Line
> > https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> > main/scala/kafka/server/AbstractFetcherThread.scala#L138
> > threw exception
> >
> > Then, Line
> > https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> > main/scala/kafka/server/AbstractFetcherThread.scala#L145
> > caught it and logged this error.
> >
> > Because
> > https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> > main/scala/kafka/server/AbstractFetcherThread.scala#L134
> > updated the topic partition offset to the fetched latest one in
> > partitionMap. So ReplicaFetcherThread skipped the batch with corrupted
> > messages.
> >
> > Based on
> > https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> > main/scala/kafka/server/AbstractFetcherThread.scala#L84,
> > the ReplicaFetcherThread then directly fetched the next batch of messages
> > (with offset 39021512301)
> >
> > Next, ReplicaFetcherThread stopped because the log end offset (still
> > 39021512238) didn't match the fetched message (offset 39021512301).
> >
> > A quick fix is to move line 134 to be after line 138.
> >
> > Would be great to have your comments and please let me know if a Jira
> issue
> > is needed. Thanks.
> >
> > Best,
> >
> > Jun
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to