Jun,

The shallow iterator actually does not check the CRC, right? The CRC is
only checked when the log.append() is called. That is why the exception was
thrown from the processPartitionData().

Thanks,

Jiangjie (Becket) Qin


On Thu, Nov 10, 2016 at 4:27 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Thanks Jiangjie. Makes sense.
>
> Currently we cannot simply distinguish between "corrupted messages over the
> wire, and hence we should retry" and "corrupted messages on leader's log
> files, and hence we should probably just fail fast and notify admin". So
> https://issues.apache.org/jira/browse/KAFKA-4384 's proposed changes looks
> reasonable me for now.
>
>
> Guozhang
>
>
> On Thu, Nov 10, 2016 at 11:26 AM, Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > The reason the fetcher thread died was because it expects the
> FetchRequest
> > to be always fetching from the current log end offset. We could loose
> this
> > restriction.
> >
> > I agree with the current design that he replica fetcher thread should not
> > die so the other partitions will not lag behind. I am not sure about the
> > right thing to do for the partition that contains a corrupted message.
> > Leaving a hole in the followers is a simple solution but seems breaking
> the
> > guarantee provided by Kafka. From the consumer side it is probably fine
> > because a consumer cannot process the corrupted message anyways. But from
> > the producer's perspective this might be a problem if people really care
> > about the message and send with acks=-1. If we let the partition stop
> > making progress, the producer will not able to make progress and notice
> > something went wrong. This will indeed have a bigger impact, though.
> >
> > Theoretically we can have some auto recovery mechanism. e.g. If a
> follower
> > keep fetching from the same offset for 3 times, let the leader check the
> > log to ensure the messages are not corrupted, if it is, truncate the log
> > and give up the leadership. But this would be a bigger change.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Wed, Nov 9, 2016 at 10:33 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Jun,
> > >
> > > Thanks for reporting this issue. I looked through the code and I agree
> > the
> > > logic you found with 0.9.0.1 also exists in 0.10.0+. However, I think
> the
> > > process is designed intentionally like this to skip the message set
> that
> > is
> > > causing such fatal error, since otherwise the thread will keep fetching
> > > this message set and throwing the same exceptions. It will cause the
> > > follower to have a "hole" in this log, but this is arguably better than
> > > falls the replica fetcher thread in a bad state.
> > >
> > > As for the out of range issue following up the skipping of the message
> > set,
> > > I think they should be handled in the "handleOffsetOutOfRange" logic,
> of
> > > the ReplicaFetcherThread class, to reset the fetching offset to the log
> > end
> > > offset of the leader and then retry. So I do not know why you observed
> > that
> > > the thread actually stopped because of this issue. Could you check the
> > > source code as well as stack trace to see why this happens?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Nov 2, 2016 at 4:38 AM, Jun H. <junhe...@gmail.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > We recently discovered an issue in Kafka 0.9.0.1 (), where
> > > > ReplicaFetcherThread stopped after ReplicaFetcherThread received a
> > > > corrupted message. As the same logic exists also in Kafka 0.10.0.0
> and
> > > > 0.10.0.1, they may have the similar issue.
> > > >
> > > > Here are system logs related to this issue.
> > > >
> > > > > 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474
> > > > > ReplicaFetcherThread.apply - Found invalid messages during fetch
> for
> > > > > partition [logs,41] offset 39021512238 error Message is corrupt
> > (stored
> > > > crc
> > > > > = 2028421553, computed crc = 3577227678)
> > > > > 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474
> > > > > ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474],
> > > Error
> > > > due
> > > > > to kafka.common.KafkaException: - error processing data for
> partition
> > > > > [logs,41] offset 39021512301 Caused - by:
> java.lang.RuntimeException:
> > > > > Offset mismatch: fetched offset = 39021512301, log end offset =
> > > > 39021512238.
> > > >
> > > >
> > > > First, ReplicaFetcherThread got a corrupted message (offset
> > 39021512238)
> > > > due to some blip.
> > > >
> > > > Line
> > > > https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> > > > main/scala/kafka/server/AbstractFetcherThread.scala#L138
> > > > threw exception
> > > >
> > > > Then, Line
> > > > https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> > > > main/scala/kafka/server/AbstractFetcherThread.scala#L145
> > > > caught it and logged this error.
> > > >
> > > > Because
> > > > https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> > > > main/scala/kafka/server/AbstractFetcherThread.scala#L134
> > > > updated the topic partition offset to the fetched latest one in
> > > > partitionMap. So ReplicaFetcherThread skipped the batch with
> corrupted
> > > > messages.
> > > >
> > > > Based on
> > > > https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> > > > main/scala/kafka/server/AbstractFetcherThread.scala#L84,
> > > > the ReplicaFetcherThread then directly fetched the next batch of
> > messages
> > > > (with offset 39021512301)
> > > >
> > > > Next, ReplicaFetcherThread stopped because the log end offset (still
> > > > 39021512238) didn't match the fetched message (offset 39021512301).
> > > >
> > > > A quick fix is to move line 134 to be after line 138.
> > > >
> > > > Would be great to have your comments and please let me know if a Jira
> > > issue
> > > > is needed. Thanks.
> > > >
> > > > Best,
> > > >
> > > > Jun
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to