Hey Mangat, A. With at least once, Streams does not make sure atomicity of 1) / 2); with exactly once, atomicity is indeed guaranteed with transactional messaging.
B. If you are using processor API, then I'm assuming you did your own group-by processor right? In that case, the partition key would just be the record key when you are sending to the repartition topic. Guozhang On Thu, Apr 8, 2021 at 9:00 AM mangat rai <mangatm...@gmail.com> wrote: > Thanks again, that makes things clear. I still have some questions here > then. > > A. For each record we read, we do two updates > 1. Changelog topic of the state store. > 2. Output topic aka sink. > Does the Kafka stream app make sure that either both are committed or > neither? > > B. Out Input topic actually has the as (a,b,c), but we partition with only > (a). We do this because we have different compaction requirements than the > partitions. It will still work as all (a,b,c) records will go to the same > partition. Now in aggregation, we group by (a,b,c). In such case what will > be the partition key for the changelog topic? > > Note that we use low-level processor API and don't commit ourselves. > > Regards, > Mangat > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Mangat, > > > > Please see my replies inline below. > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <mangatm...@gmail.com> wrote: > > > > > @Guozhang Wang > > > > > > Thanks for the reply. Indeed I am finding it difficult to explain this > > > state. I checked the code many times. There can be a bug but I fail to > > see > > > it. There are several things about the Kafka streams that I don't > > > understand, which makes it harder for me to reason. > > > > > > 1. What is the partition key for the changelog topics? Is it the same > as > > > the Input key or the state store key? Or maybe the thread specifies the > > > partition as it knows the input partition it is subscribed to? If the > > input > > > topic and state store are differently partitioned then we can explain > the > > > issue here. > > > > > > > In Kafka Stream's changelog, the "partition key" of Kafka messages is the > > same as the "message key" itself. And the message key is the same as the > > state store key. > > > > Since the state store here should be storing the running aggregate, it > > means that the partition key is the same as the aggregated key. > > > > If you are doing a group-by aggregation here, where the group-by keys are > > different from the source input topic's keys, hence the state store keys > > would be different with the input topic keys. > > > > > > > 2. Is there a background thread to persist in the state store when > > caching > > > is disabled? When will the app commit the log for the input topic? Is > it > > > when sink writes into the output topic or when the state store writes > > into > > > the changelog topic? Because, if the app commits the record before the > > data > > > was written to changelog topic then we can again explain this state > > > > > > The commit happens *after* the local state store, as well as the > > changelog records sent by the Streams' producers, have been flushed. I.e. > > if there's a failure in between, you would re-process some source records > > and hence cause duplicates, but no data loss (a.k.a. the at_least_once > > semantics). > > > > > > > > > >You may also consider upgrading to 2.6.x or higher version and see if > > this > > > issue goes away. > > > Do you mean the client or the Kafka broker? I will be upgrading the > > client > > > to 2.7.0 soon. > > > > > > I meant the client. > > > > > > > Sadly looking into the timestamp will not help much as we use some > > business > > > time field to set the record timestamp. If I am right, there is no way > > now > > > to know that when a Producer wrote a record in a Kafka topic. > > > > > > Regards, > > > Mangat > > > > > > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > > > Hello Mangat, > > > > > > > > With at least once, although some records maybe processed multiple > > times > > > > their process ordering should not be violated, so what you observed > is > > > not > > > > expected. What caught my eyes are this section in your output > > changelogs > > > > (high-lighted): > > > > > > > > Key1, V1 > > > > Key1, null > > > > Key1, V1 > > > > Key1, null (processed again) > > > > Key1, V2 > > > > Key1, null > > > > > > > > *Key1, V1Key1,V2* > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but > reprocessed > > > V1 > > > > again due to reassignment) > > > > > > > > They seem to be the result of first receiving a tombstone which > removes > > > V1 > > > > and then a new record that adds V2. However, since caching is > disabled > > > you > > > > should get > > > > > > > > *Key1,V1* > > > > *Key1,null* > > > > *Key1,V2* > > > > > > > > instead; without the actual code snippet I cannot tell more what's > > > > happening here. If you can look into the logs you can record each > time > > > when > > > > partition migrates, how many records from the changelog was replayed > to > > > > restore the store, and from which offset on the input topic does > > Streams > > > > resume processing. You may also consider upgrading to 2.6.x or higher > > > > version and see if this issue goes away. > > > > > > > > > > > > Guozhang > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <mangatm...@gmail.com> > > wrote: > > > > > > > > > Hey, > > > > > > > > > > We have the following setup in our infrastructure. > > > > > > > > > > 1. Kafka - 2.5.1 > > > > > 2. Apps use kafka streams `org.apache.kafka` version 2.5.1 > library > > > > > 3. Low level processor API is used with *atleast-once* semantics > > > > > 4. State stores are *in-memory* with *caching disabled* and > > > *changelog > > > > > enabled* > > > > > > > > > > > > > > > Is it possible that during state replication and partition > > > reassignment, > > > > > the input data is not always applied to the state store? > > > > > > > > > > 1. Let's say the input topic is having records like following > > > > > > > > > > ``` > > > > > Key1, V1 > > > > > Key1, null (tombstone) > > > > > Key1, V2 > > > > > Key1, null > > > > > Key1, V3 > > > > > Key1, V4 > > > > > ``` > > > > > 2. The app has an aggregation function which takes these record and > > > > update > > > > > the state store so that changelog shall be > > > > > > > > > > ``` > > > > > Key1, V1 > > > > > Key1, null (tombstone) > > > > > Key1, V2 > > > > > Key1, null > > > > > Key1, V3 > > > > > Key1, V3 + V4 > > > > > ``` > > > > > Let's say the partition responsible for processing the above key > was > > > > > several times reallocated to different threads due to some infra > > issues > > > > we > > > > > are having(in Kubernetes where we run the app, not the Kafka > > cluster). > > > > > > > > > > I see the following record in the changelogs > > > > > > > > > > ``` > > > > > Key1, V1 > > > > > Key1, null > > > > > Key1, V1 > > > > > Key1, null (processed again) > > > > > Key1, V2 > > > > > Key1, null > > > > > Key1, V1 > > > > > Key1,V2 > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but > > reprocessed > > > > V1 > > > > > again due to reassignment) > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone > > > > should > > > > > have been applied also!!) > > > > > Key1, V2+V1 (it is back!!!) > > > > > Key1,V1 > > > > > Key1, V1 + V2 + V3 (This is the final state)! > > > > > ``` > > > > > > > > > > If you see this means several things > > > > > 1. The state is always correctly applied locally (in developer > > laptop), > > > > > where there were no reassignments. > > > > > 2. The records are processed multiple times, which is > understandable > > as > > > > we > > > > > have at least symantics here. > > > > > 3. As long as we re-apply the same events in the same orders we are > > > > golden > > > > > but looks like some records are skipped, but here it looks as if we > > > have > > > > > multiple consumers reading and update the same topics, leading to > > race > > > > > conditions. > > > > > > > > > > Is there any way, Kafka streams' state replication could lead to > > such a > > > > > race condition? > > > > > > > > > > Regards, > > > > > Mangat > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang