Youcef Sebiat created KAFKA-14374:
-------------------------------------

             Summary: Kafka streams losing messages in State Store during first 
launch of app
                 Key: KAFKA-14374
                 URL: https://issues.apache.org/jira/browse/KAFKA-14374
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.2.0, 2.8.0
            Reporter: Youcef Sebiat
         Attachments: Screenshot 2022-11-09 at 14.56.00.png

We have been using Kafka Streams to implement a CDC based app. Attached is the 
sub topology of interest.

`table2` topic is created by Debezium who is connected to a SQL DB. It contains 
26K lines. We take `table2` and create a key which is only a conversion of the 
key from `string` to `int`. This means that we should expect that 
#table2=#repartition-topic=#state-store; which actually is not verified. What 
we end up with is the following #table2=#repartition-topic, but  
#repartition-topic>#state-store. We actually lose messages and thus corrupt the 
state store, which makes the app live in incorrect state. (Please note that 
there is no insertion in `table2` as we paused the connector to verify the 
cardinality.)

The above happens only during the first launch, i.e. the app has never been 
launched before, so internal topics do not exist yet. Restarts of pre-existing 
apps do not yield any problems.

We have:
1. Broker on Kafka 3.2.
2. Client app on 2.8|3.2 (we tried both and we faced the same issue).
2. All parameters are by default except `CACHE_MAX_BYTES_BUFFERING_CONFIG` set 
to `0` and `NUM_STREAM_THREADS_CONFIG` set to `>1`.

 

*What actually worked*
1. Use a monothread at first launch: using one thread solves the problem. The 
#table2=#repartition-topic=#state-store is verified.
2. Pre-creating kafka internal topics: we noticed that whenever there is 
rebalance during the first launch of Kafka Streams app, the state stores ended 
up missing values. This also happens when you launch multiple pods in K8s for 
example. When we read through the logs, we noticed that there is a rebalance 
that is triggered when we first launch the app. This comes from the fact that 
the internal topics get created and assigned, thus the rebalance. So by 
creating the internal topics before, we avoid the rebalance and we end up by 
#table2=#repartition-topic=#state-store.


*What we noticed from the logs*
On multi-thread mode, we noticed that it is the partitions that are assigned to 
the thread chosen by the Coordinator to be the Leader of consumers that suffer 
the data loss. What we think is happening is the following:
1. Consumers threads are launched and inform the coordinator.
2. Coordinator assign topics and choses the Leader among the threads.
3. App create internal topics.
4. Consumers/producers process data. Specifically the Consumer leader consumes 
from the repartition topic, which triggers the delete of those messages without 
flushing them to changelog topic.
5. Leader notified of new assignment with internal topics. Triggers rebalance.
6. Leader pauses partitions. 
7. Rebalance finished. The leader resumes partitions.
8. Leader fetches the oldest offset of repartition partitions he got assigned. 
He will not start from zero, but instead from where he got interrupted in 4. 
The chunk of early messages are thus lost.

Please note, that on mono-thread mode, there is no data loss which is weird 
since the leader is actually the unique thread. 

So my questions are:
1. Are we understanding wrongly what's happening?
2. What can be the origin of this problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to