Hi Guozhang, The reason the fetcher thread died was because it expects the FetchRequest to be always fetching from the current log end offset. We could loose this restriction.
I agree with the current design that he replica fetcher thread should not die so the other partitions will not lag behind. I am not sure about the right thing to do for the partition that contains a corrupted message. Leaving a hole in the followers is a simple solution but seems breaking the guarantee provided by Kafka. From the consumer side it is probably fine because a consumer cannot process the corrupted message anyways. But from the producer's perspective this might be a problem if people really care about the message and send with acks=-1. If we let the partition stop making progress, the producer will not able to make progress and notice something went wrong. This will indeed have a bigger impact, though. Theoretically we can have some auto recovery mechanism. e.g. If a follower keep fetching from the same offset for 3 times, let the leader check the log to ensure the messages are not corrupted, if it is, truncate the log and give up the leadership. But this would be a bigger change. Thanks, Jiangjie (Becket) Qin On Wed, Nov 9, 2016 at 10:33 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Jun, > > Thanks for reporting this issue. I looked through the code and I agree the > logic you found with 0.9.0.1 also exists in 0.10.0+. However, I think the > process is designed intentionally like this to skip the message set that is > causing such fatal error, since otherwise the thread will keep fetching > this message set and throwing the same exceptions. It will cause the > follower to have a "hole" in this log, but this is arguably better than > falls the replica fetcher thread in a bad state. > > As for the out of range issue following up the skipping of the message set, > I think they should be handled in the "handleOffsetOutOfRange" logic, of > the ReplicaFetcherThread class, to reset the fetching offset to the log end > offset of the leader and then retry. So I do not know why you observed that > the thread actually stopped because of this issue. Could you check the > source code as well as stack trace to see why this happens? > > > Guozhang > > > On Wed, Nov 2, 2016 at 4:38 AM, Jun H. <junhe...@gmail.com> wrote: > > > Hi all, > > > > We recently discovered an issue in Kafka 0.9.0.1 (), where > > ReplicaFetcherThread stopped after ReplicaFetcherThread received a > > corrupted message. As the same logic exists also in Kafka 0.10.0.0 and > > 0.10.0.1, they may have the similar issue. > > > > Here are system logs related to this issue. > > > > > 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 > > > ReplicaFetcherThread.apply - Found invalid messages during fetch for > > > partition [logs,41] offset 39021512238 error Message is corrupt (stored > > crc > > > = 2028421553, computed crc = 3577227678) > > > 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 > > > ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], > Error > > due > > > to kafka.common.KafkaException: - error processing data for partition > > > [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: > > > Offset mismatch: fetched offset = 39021512301, log end offset = > > 39021512238. > > > > > > First, ReplicaFetcherThread got a corrupted message (offset 39021512238) > > due to some blip. > > > > Line > > https://github.com/apache/kafka/blob/0.9.0.1/core/src/ > > main/scala/kafka/server/AbstractFetcherThread.scala#L138 > > threw exception > > > > Then, Line > > https://github.com/apache/kafka/blob/0.9.0.1/core/src/ > > main/scala/kafka/server/AbstractFetcherThread.scala#L145 > > caught it and logged this error. > > > > Because > > https://github.com/apache/kafka/blob/0.9.0.1/core/src/ > > main/scala/kafka/server/AbstractFetcherThread.scala#L134 > > updated the topic partition offset to the fetched latest one in > > partitionMap. So ReplicaFetcherThread skipped the batch with corrupted > > messages. > > > > Based on > > https://github.com/apache/kafka/blob/0.9.0.1/core/src/ > > main/scala/kafka/server/AbstractFetcherThread.scala#L84, > > the ReplicaFetcherThread then directly fetched the next batch of messages > > (with offset 39021512301) > > > > Next, ReplicaFetcherThread stopped because the log end offset (still > > 39021512238) didn't match the fetched message (offset 39021512301). > > > > A quick fix is to move line 134 to be after line 138. > > > > Would be great to have your comments and please let me know if a Jira > issue > > is needed. Thanks. > > > > Best, > > > > Jun > > > > > > -- > -- Guozhang >