Guozhang Wang created KAFKA-7285:
------------------------------------

             Summary: Streams should be more fencing-sensitive during task 
suspension under EOS
                 Key: KAFKA-7285
                 URL: https://issues.apache.org/jira/browse/KAFKA-7285
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Guozhang Wang


When EOS is turned on, Streams did the following steps:

1. InitTxn in task creation.
2. BeginTxn in topology initialization.
3. AbortTxn in clean shutdown.
4. CommitTxn in commit(), which is called in suspend() as well.

Now consider this situation, with two thread (Ta) and (Tb) and one task:

1. originally Ta owns the task, consumer generation is 1.
2. Ta is un-responsive to send heartbeats, and gets kicked out, a new 
generation 2 is formed with Tb in it. The task is migrated to Tb while Ta does 
not know.
3. Ta finally calls `consumer.poll` and was aware of the rebalance, it re-joins 
the group, forming a new generation of 3. And during the rebalance the leader 
decides to assign the task back to Ta.
4.a) Ta calls onPartitionRevoked on the task, suspending it and call commit. 
However if there is no data ever sent since `BeginTxn`, this commit call will 
become a no-op.
4.b) Ta then calls onPartitionAssigned on the task, resuming it, and then calls 
BeginTxn. Then it was encountered a ProducerFencedException, incorrectly.

The root cause is that, Ta does not trigger InitTxn to claim "I'm the newest 
for this txnId, and am going to fence everyone else with the same txnId", so it 
was mistakenly treated as the old client than Tb.

Note that this issue is not common, since we need to encounter a txn that did 
not send any data at all to make its commitTxn call a no-op, and hence not 
being fenced earlier on.

One proposal for this issue is to close the producer and recreates a new one in 
`suspend` after the commitTxn call succeeded and `startNewTxn` is false, so 
that the new producer will always `initTxn` to fence others.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to