Hello Mangat,

What you've encountered is a "zombie writer" issue, that is, Thread-2 did
not know there's already a new rebalance and hence its partitions have been
migrated out, until it tries to commit and then got notified of the
illegal-generation error and realize itself is the "zombie" already. This
case would still persist even with incremental rebalancing.

I've filed https://issues.apache.org/jira/browse/KAFKA-12693 to summarize
the situation. Please LMK if that explanation is clear to you.

On Mon, Apr 19, 2021 at 12:58 AM mangat rai <mangatm...@gmail.com> wrote:

> Thanks, Guozhang,
>
> I was knocking myself with Kafka's various consumer rebalancing algorithms
> in the last 2 days. Could I generalize this problem as
>
>
>
> *Any in-memory state store backed by a changelog topic will always risk
> having interleaved writes from two different writers during rebalancing?*
> In our case, CPU throttling made it worse as thread-2 didn't try to commit
> for a long time. Also,
>
> 1. Do you think if we disable the incremental rebalancing, we will not have
> this issue because If I understood correctly Thread-4 will not start
> processing until the state is completely transferred from Thread-2.
> 2. If yes, how can we disable it without downgrading the client?
>
> Since we have a very low scale and no real-time computing requirement, we
> will be happy to sacrifice the availability to have consistency.
>
> Regards,
> Mangat
>
>
>
> On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Mangat:
> >
> > I think I found the issue of your problem here.
> >
> > It seems thread-2's partition was assigned to thread-4 while thread-2 was
> > not aware (because it missed a rebalance, this is normal scenario); in
> > other words, thread2 becomes a "zombie". It would stay in that zombie
> state
> > until it tried to commit, in which it would get an error from the brokers
> > and realize its zombie identity and re-joins the group.
> >
> > During that period of time, before the commit was issued, it would
> continue
> > trying to write to its local states; here are several scenarios:
> >
> > 1) if thread-2/4 are belonging to two different nodes then that is fine,
> > since they will write to different local state stores.
> > 2) if they belong to the same nodes, and
> >    a) the state stores are persistent then they would have risks of
> > contention; this is guarded by the state directory locks (as file locks)
> in
> > which case the new owner thread-4 should not be able to get on the local
> > state files.
> >    b) the state stores are in-memory, in which case that is fine since
> the
> > in-memory stores are kept separate as well.
> >
> > In your case: 2.b), the issue is that the changelog would still be shared
> > between the two --- but note that this is the same case as in case 1) as
> > well. And this means at that time the changelog is shared by two writers
> > sending records interleaving. And if there’s a tombstone that was
> intended
> > for a record A, but when it was written interleaving and there’s another
> > record B in between, that tombstone would effectively delete record B.
> The
> > key here is that, when we replay the changelogs, we replay it completely
> > following offset ordering.
> >
> >
> >
> > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <mangatm...@gmail.com> wrote:
> >
> > > Guozhang,
> > >
> > > Yes, you are correct. We have our own group processor. I have more
> > > information now.
> > >
> > > 1. I added ThreadId in the data when the app persists into the
> changelog
> > > topic.
> > > 2. Thread-2 which was working with partition-0 had a timeout issue.
> > > 4. Thread-4 picked up this partition-0 as I can see its Id in the
> > > changelog.
> > > 5. *But then Thread-2 and Thread-4 both were writing into the
> partition-0
> > > of the changelog, that too for the same key.*
> > >
> > > So I was clearly able to see that two threads were overwriting data of
> > one
> > > another into the state store leading to a corrupted state. This
> confirms
> > my
> > > theory that it was an issue of concurrent update. This was something
> > > totally unexpected. I suspect that Thread-2 continues to persist its
> > > in-memory state, maybe because It wasn't stopped after the timeout
> > > exception. Is there a configuration possible in the Kafka stream which
> > > could lead to this?
> > >
> > > There was no network issue, our CPU was highly throttled by Kubernetes.
> > We
> > > gave more resources, also decreased the fetch-size so we have more I/O
> to
> > > Cpu time ratio than before, and then there was no timeout issue, hence
> no
> > > reassignment and hence no corrupted state.
> > >
> > > I really appreciate your help here...
> > > Thanks!
> > > Mangat
> > >
> > >
> > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hey Mangat,
> > > >
> > > > A. With at least once, Streams does not make sure atomicity of 1) /
> 2);
> > > > with exactly once, atomicity is indeed guaranteed with transactional
> > > > messaging.
> > > >
> > > > B. If you are using processor API, then I'm assuming you did your own
> > > > group-by processor right? In that case, the partition key would just
> be
> > > the
> > > > record key when you are sending to the repartition topic.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <mangatm...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks again, that makes things clear. I still have some questions
> > here
> > > > > then.
> > > > >
> > > > > A.  For each record we read, we do two updates
> > > > >       1. Changelog topic of the state store.
> > > > >       2. Output topic aka sink.
> > > > >       Does the Kafka stream app make sure that either both are
> > > committed
> > > > or
> > > > > neither?
> > > > >
> > > > > B.  Out Input topic actually has the as (a,b,c), but we partition
> > with
> > > > only
> > > > > (a). We do this because we have different compaction requirements
> > than
> > > > the
> > > > > partitions. It will still work as all (a,b,c) records will go to
> the
> > > same
> > > > > partition. Now in aggregation, we group by (a,b,c). In such case
> what
> > > > will
> > > > > be the partition key for the changelog topic?
> > > > >
> > > > > Note that we use low-level processor API and don't commit
> ourselves.
> > > > >
> > > > > Regards,
> > > > > Mangat
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Mangat,
> > > > > >
> > > > > > Please see my replies inline below.
> > > > > >
> > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <mangatm...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > @Guozhang Wang
> > > > > > >
> > > > > > > Thanks for the reply.  Indeed I am finding it difficult to
> > explain
> > > > this
> > > > > > > state. I checked the code many times. There can be a bug but I
> > fail
> > > > to
> > > > > > see
> > > > > > > it. There are several things about the Kafka streams that I
> don't
> > > > > > > understand, which makes it harder for me to reason.
> > > > > > >
> > > > > > > 1. What is the partition key for the changelog topics? Is it
> the
> > > same
> > > > > as
> > > > > > > the Input key or the state store key? Or maybe the thread
> > specifies
> > > > the
> > > > > > > partition as it knows the input partition it is subscribed to?
> If
> > > the
> > > > > > input
> > > > > > > topic and state store are differently partitioned then we can
> > > explain
> > > > > the
> > > > > > > issue here.
> > > > > > >
> > > > > >
> > > > > > In Kafka Stream's changelog, the "partition key" of Kafka
> messages
> > is
> > > > the
> > > > > > same as the "message key" itself. And the message key is the same
> > as
> > > > the
> > > > > > state store key.
> > > > > >
> > > > > > Since the state store here should be storing the running
> aggregate,
> > > it
> > > > > > means that the partition key is the same as the aggregated key.
> > > > > >
> > > > > > If you are doing a group-by aggregation here, where the group-by
> > keys
> > > > are
> > > > > > different from the source input topic's keys, hence the state
> store
> > > > keys
> > > > > > would be different with the input topic keys.
> > > > > >
> > > > > >
> > > > > > > 2. Is there a background thread to persist in the state store
> > when
> > > > > > caching
> > > > > > > is disabled? When will the app commit the log for the input
> > topic?
> > > Is
> > > > > it
> > > > > > > when sink writes into the output topic or when the state store
> > > writes
> > > > > > into
> > > > > > > the changelog topic? Because, if the app commits the record
> > before
> > > > the
> > > > > > data
> > > > > > > was written to changelog topic then we can again explain this
> > state
> > > > > > >
> > > > > > > The commit happens *after* the local state store, as well as
> the
> > > > > > changelog records sent by the Streams' producers, have been
> > flushed.
> > > > I.e.
> > > > > > if there's a failure in between, you would re-process some source
> > > > records
> > > > > > and hence cause duplicates, but no data loss (a.k.a. the
> > > at_least_once
> > > > > > semantics).
> > > > > >
> > > > > >
> > > > > >
> > > > > > > >You may also consider upgrading to 2.6.x or higher version and
> > see
> > > > if
> > > > > > this
> > > > > > > issue goes away.
> > > > > > > Do you mean the client or the Kafka broker? I will be upgrading
> > the
> > > > > > client
> > > > > > > to 2.7.0 soon.
> > > > > > >
> > > > > > > I meant the client.
> > > > > >
> > > > > >
> > > > > > > Sadly looking into the timestamp will not help much as we use
> > some
> > > > > > business
> > > > > > > time field to set the record timestamp. If I am right, there is
> > no
> > > > way
> > > > > > now
> > > > > > > to know that when a Producer wrote a record in a Kafka topic.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Mangat
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hello Mangat,
> > > > > > > >
> > > > > > > > With at least once, although some records maybe processed
> > > multiple
> > > > > > times
> > > > > > > > their process ordering should not be violated, so what you
> > > observed
> > > > > is
> > > > > > > not
> > > > > > > > expected. What caught my eyes are this section in your output
> > > > > > changelogs
> > > > > > > > (high-lighted):
> > > > > > > >
> > > > > > > > Key1, V1
> > > > > > > > Key1, null
> > > > > > > > Key1, V1
> > > > > > > > Key1, null  (processed again)
> > > > > > > > Key1, V2
> > > > > > > > Key1, null
> > > > > > > >
> > > > > > > > *Key1, V1Key1,V2*
> > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > > > reprocessed
> > > > > > > V1
> > > > > > > > again due to reassignment)
> > > > > > > >
> > > > > > > > They seem to be the result of first receiving a tombstone
> which
> > > > > removes
> > > > > > > V1
> > > > > > > > and then a new record that adds V2. However, since caching is
> > > > > disabled
> > > > > > > you
> > > > > > > > should get
> > > > > > > >
> > > > > > > > *Key1,V1*
> > > > > > > > *Key1,null*
> > > > > > > > *Key1,V2*
> > > > > > > >
> > > > > > > > instead; without the actual code snippet I cannot tell more
> > > what's
> > > > > > > > happening here. If you can look into the logs you can record
> > each
> > > > > time
> > > > > > > when
> > > > > > > > partition migrates, how many records from the changelog was
> > > > replayed
> > > > > to
> > > > > > > > restore the store, and from which offset on the input topic
> > does
> > > > > > Streams
> > > > > > > > resume processing. You may also consider upgrading to 2.6.x
> or
> > > > higher
> > > > > > > > version and see if this issue goes away.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
> > mangatm...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey,
> > > > > > > > >
> > > > > > > > > We have the following setup in our infrastructure.
> > > > > > > > >
> > > > > > > > >    1. Kafka - 2.5.1
> > > > > > > > >    2. Apps use kafka streams `org.apache.kafka` version
> 2.5.1
> > > > > library
> > > > > > > > >    3. Low level processor API is used with *atleast-once*
> > > > semantics
> > > > > > > > >    4. State stores are *in-memory* with *caching disabled*
> > and
> > > > > > > *changelog
> > > > > > > > >    enabled*
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Is it possible that during state replication and partition
> > > > > > > reassignment,
> > > > > > > > > the input data is not always applied to the state store?
> > > > > > > > >
> > > > > > > > > 1. Let's say the input topic is having records like
> following
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null (tombstone)
> > > > > > > > > Key1, V2
> > > > > > > > > Key1, null
> > > > > > > > > Key1, V3
> > > > > > > > > Key1, V4
> > > > > > > > > ```
> > > > > > > > > 2. The app has an aggregation function which takes these
> > record
> > > > and
> > > > > > > > update
> > > > > > > > > the state store so that changelog shall be
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null (tombstone)
> > > > > > > > > Key1, V2
> > > > > > > > > Key1, null
> > > > > > > > > Key1, V3
> > > > > > > > > Key1, V3 + V4
> > > > > > > > > ```
> > > > > > > > > Let's say the partition responsible for processing the
> above
> > > key
> > > > > was
> > > > > > > > > several times reallocated to different threads due to some
> > > infra
> > > > > > issues
> > > > > > > > we
> > > > > > > > > are having(in Kubernetes where we run the app, not the
> Kafka
> > > > > > cluster).
> > > > > > > > >
> > > > > > > > > I see the following record in the changelogs
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null  (processed again)
> > > > > > > > > Key1, V2
> > > > > > > > > Key1, null
> > > > > > > > > Key1, V1
> > > > > > > > > Key1,V2
> > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > > > > reprocessed
> > > > > > > > V1
> > > > > > > > > again due to reassignment)
> > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1
> > > > tombstone
> > > > > > > > should
> > > > > > > > > have been applied also!!)
> > > > > > > > > Key1, V2+V1 (it is back!!!)
> > > > > > > > > Key1,V1
> > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > If you see this means several things
> > > > > > > > > 1. The state is always correctly applied locally (in
> > developer
> > > > > > laptop),
> > > > > > > > > where there were no reassignments.
> > > > > > > > > 2. The records are processed multiple times, which is
> > > > > understandable
> > > > > > as
> > > > > > > > we
> > > > > > > > > have at least symantics here.
> > > > > > > > > 3. As long as we re-apply the same events in the same
> orders
> > we
> > > > are
> > > > > > > > golden
> > > > > > > > > but looks like some records are skipped, but here it looks
> as
> > > if
> > > > we
> > > > > > > have
> > > > > > > > > multiple consumers reading and update the same topics,
> > leading
> > > to
> > > > > > race
> > > > > > > > > conditions.
> > > > > > > > >
> > > > > > > > > Is there any way, Kafka streams' state replication could
> lead
> > > to
> > > > > > such a
> > > > > > > > > race condition?
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Mangat
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to