Hmm, was the corruption detected inside processPartitionData()? Typically, the corruption should be detected at https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L130 when we do a shallow iteration of the message set. If the corruption is detected there, the offset won't be moved and the fetch will fetch at the same offset again. The idea is that if the corruption happens because of network transfer, hopefully it won't happen in the next transfer.
In your case, was the message set already corrupted at the leader? Thanks, Jun On Wed, Nov 2, 2016 at 1:38 AM, Jun H. <junhe...@gmail.com> wrote: > Hi all, > > We recently discovered an issue in Kafka 0.9.0.1 (), where > ReplicaFetcherThread stopped after ReplicaFetcherThread received a > corrupted message. As the same logic exists also in Kafka 0.10.0.0 and > 0.10.0.1, they may have the similar issue. > > Here are system logs related to this issue. > > > 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 > > ReplicaFetcherThread.apply - Found invalid messages during fetch for > > partition [logs,41] offset 39021512238 error Message is corrupt (stored > crc > > = 2028421553, computed crc = 3577227678) > > 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 > > ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error > due > > to kafka.common.KafkaException: - error processing data for partition > > [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: > > Offset mismatch: fetched offset = 39021512301, log end offset = > 39021512238. > > > First, ReplicaFetcherThread got a corrupted message (offset 39021512238) > due to some blip. > > Line > https://github.com/apache/kafka/blob/0.9.0.1/core/src/ > main/scala/kafka/server/AbstractFetcherThread.scala#L138 > threw exception > > Then, Line > https://github.com/apache/kafka/blob/0.9.0.1/core/src/ > main/scala/kafka/server/AbstractFetcherThread.scala#L145 > caught it and logged this error. > > Because > https://github.com/apache/kafka/blob/0.9.0.1/core/src/ > main/scala/kafka/server/AbstractFetcherThread.scala#L134 > updated the topic partition offset to the fetched latest one in > partitionMap. So ReplicaFetcherThread skipped the batch with corrupted > messages. > > Based on > https://github.com/apache/kafka/blob/0.9.0.1/core/src/ > main/scala/kafka/server/AbstractFetcherThread.scala#L84, > the ReplicaFetcherThread then directly fetched the next batch of messages > (with offset 39021512301) > > Next, ReplicaFetcherThread stopped because the log end offset (still > 39021512238) didn't match the fetched message (offset 39021512301). > > A quick fix is to move line 134 to be after line 138. > > Would be great to have your comments and please let me know if a Jira issue > is needed. Thanks. > > Best, > > Jun >