Herbert Wespi created KAFKA-17635:
-------------------------------------

             Summary: Lost events on internal repartition topic when 
excatly_once_v2 is set and producer is fenced
                 Key: KAFKA-17635
                 URL: https://issues.apache.org/jira/browse/KAFKA-17635
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.7.1
            Reporter: Herbert Wespi


In some of the Kafka streams applications we observed that some events are 
missed during processing, when the processing guarantee was set to 
exactly_once_v2.
 
It happened in different kafka stream applications at different places. The 
common pattern is that there was always an internal repartition topic involved 
(e.g. FK joins and aggregations on new key)

With the following simplified example we could reproduce the problem:
{code:java}
inputStream
  .groupBy((k, v) -> v, Grouped.with(String(), String()).withName("group"))
  
.count(Materialized.as("count").withKeySerde(String()).withValueSerde(Long()));
{code}
The analysis showed the following:
 * the event exists in the input topic
 * after repartition the changelog topic does not have always all events 
aggregated.

It happens only occasional on production environment while processing millions 
of events on the initial load.

We were able to seldom reproduce the problem in local environment in debugging 
mode.

Our assumption is that there is a problem with the purging of events for the 
repartition topic.
The StreamTask holds a list of consumedOffsets (used for purging internal 
repartition topics).
After we got a TaskMigratedException (e.g. transaction timeout or similar), the 
stream task will be migrated and closed dirty.
When the task is restored, then the consumedOffset list is not cleared.
The consumedOffset list may contain offsets from aborted transactions.
On the next purge cycle some not yet committed offset might get deleted from 
the repartition topic.
{code:java}
2024-09-27T11:35:10.021+02:00  WARN 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Detected 
that the thread is being fenced. This implies that this thread missed a 
rebalance and dropped out of the consumer group. Will close out all assigned 
tasks and rejoin the consumer group.

org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced 
trying to commit a transaction [stream-thread [main]]; it means all tasks 
belonging to this thread should be migrated.
        at 
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:304)
 ~[kafka-streams-3.7.1.jar:na]
        at 
org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:203)
 ~[kafka-streams-3.7.1.jar:na]
        at 
org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154)
 ~[kafka-streams-3.7.1.jar:na]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1875)
 ~[kafka-streams-3.7.1.jar:na]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1842)
 ~[kafka-streams-3.7.1.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1337)
 ~[kafka-streams-3.7.1.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:986)
 ~[kafka-streams-3.7.1.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
 ~[kafka-streams-3.7.1.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
 ~[kafka-streams-3.7.1.jar:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction 
offset Commit failed due to consumer group metadata mismatch: The coordinator 
is not aware of this member.
        at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1689)
 ~[kafka-clients-3.7.1.jar:na]
        at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236)
 ~[kafka-clients-3.7.1.jar:na]
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) 
~[kafka-clients-3.7.1.jar:na]
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
 ~[kafka-clients-3.7.1.jar:na]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) 
~[kafka-clients-3.7.1.jar:na]
        at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:463)
 ~[kafka-clients-3.7.1.jar:na]
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:339) 
~[kafka-clients-3.7.1.jar:na]
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:253) 
~[kafka-clients-3.7.1.jar:na]
        at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

2024-09-27T11:35:10.021+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task [1_3] 
Suspended from RUNNING
2024-09-27T11:35:11.420+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task [1_3] 
Closed dirty
2024-09-27T11:37:06.782+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] stream-task 
[1_3] State store count did not find checkpoint offset, hence would default to 
the starting offset at changelog processTest-1-count-changelog-3
2024-09-27T11:37:06.783+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task [1_3] 
Initialized
2024-09-27T11:37:06.787+02:00  INFO 38644 --- [sandbox] [-StreamThread-1] 
o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store count in regular mode
2024-09-27T11:37:06.843+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.p.i.StoreChangelogReader         : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] End offset 
for changelog processTest-1-count-changelog-3 initialized as 916.
2024-09-27T11:37:06.843+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer 
clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
 groupId=null] Assigned to partition(s): processTest-1-count-changelog-3, 
processTest-1-count-changelog-1
2024-09-27T11:37:06.843+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.c.c.internals.SubscriptionState    : [Consumer 
clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
 groupId=null] Seeking to earliest offset of partition 
processTest-1-count-changelog-1
2024-09-27T11:37:06.844+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.c.c.internals.SubscriptionState    : [Consumer 
clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
 groupId=null] Resetting offset for partition processTest-1-count-changelog-3 
to position FetchPosition{offset=0, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:9093 (id: 2 rack: 
null)], epoch=0}}.
2024-09-27T11:37:06.850+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.p.i.StoreChangelogReader         : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Finished 
restoring changelog processTest-1-count-changelog-3 to store count with a total 
number of 456 records
2024-09-27T11:37:06.851+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task [1_3] 
Restored and ready to run
2024-09-27T11:37:06.854+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Restoration 
took 334 ms for all active tasks [0_3, 1_3, 0_1, 1_1]
2024-09-27T11:37:06.854+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] State 
transition from PARTITIONS_ASSIGNED to RUNNING
2024-09-27T11:37:06.854+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] 
org.apache.kafka.streams.KafkaStreams    : stream-client 
[processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7] State transition from 
REBALANCING to RUNNING
{code}
In our test we produced the same amount of events to each partition (4)

In the sample test we just count the events, therefore all 4 partition ahould 
have the same count eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to