[ 
https://issues.apache.org/jira/browse/KAFKA-18943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-18943:
------------------------------------
    Description: 
We found a very rare edge case in Kafka Streams commit logic, that may lead to 
duplicates for EOSv2 under certain circumstances.

Background:

When tasks are revoked cleanly, we need to commit the progress the revoked 
tasks made before we can release them. For EOSv2, if we commit, a thread always 
needs to commit all tasks it owns. Thus, during revocation, if a single revoked 
tasks did make progress, we need to include all non-revoked task in the commit.

To avoid unnecessary commits (which are expensive), we have an optimization in 
place, that is supposed to skip the commit, if no revoked task did make 
progress (ie, for revoked tasks w/o progress, we can release them safely w/o 
the need to commit, and if all tasks can be revoked w/o a commit, we don't 
commit at all).

This optimization has a bug, and may commit an open TX, even if no revoked task 
did make progress, and because the commit is done accidentally, we do not add 
offsets to the TX. Thus, we may commit result data of non-revoked tasks, w/o 
including the non-revoked tasks advanced offsets.

If this happens, and an fatal error happens right afterwards (ie before a 
consecutive commit for the same non-revoked tasks was done, which would mask 
the error by commit the correct offsets with the consecutive TX), we would seek 
to an incorrect (too old) offset after recovery, and re-process the same input 
data a second time, producing duplicate results.

These issue results in duplicates if the following conditions are all met at 
the same time:
 * a thread has more than one task assigned
 * when tasks are revoked, at least one task is not revoked
 * all revoked tasks did not make any progress (or to rephrase: no revoked task 
did make progress)
 * at least one non-revoked task did make progress
 * after the incorrect commit, and before the next successful commit, a fatal 
error happen, triggering a rebalance, task re-assignment and a "fetch offsets" 
happen after the rebalance

The bug was introduced via https://issues.apache.org/jira/browse/KAFKA-14294 in 
3.4.0 release.

Looking into the details, it actually seems that there is some other issue, 
that KAFKA-14294 did not address: KAFKA-14294 attempts to commit tasks after 
output was produced by a punctuation, even if offsets did not advance. However, 
it does apply this logic only for regular commits, but for the 
task-revoked-case, we don't commit. Thus, if a revoked task did not advance 
offsets, but did produce output data, no commit happens (while it seems we 
should commit the open TX for this case, too; this might even apply to the ALOS 
case, too).

Thus, there is two possible fixes.
 # Try to identify all corner case and patch it up
 # Drop the optimization and just commit all tasks on 
`TaskManager#handleRevocation()`

  was:
We found a very rare edge case in Kafka Streams commit logic, that may lead to 
duplicates for EOSv2 under certain circumstances.

Background:

When tasks are revoked cleanly, we need to commit the progress the revoked 
tasks made before we can release them. For EOSv2, if we commit, a thread always 
needs to commit all tasks it owns. Thus, during revocation, if a single revoked 
tasks did make progress, we need to include all non-revoked task in the commit.

To avoid unnecessary commits (which are expensive), we have an optimization in 
place, that is supposed to skip the commit, if no revoked task did make 
progress (ie, for revoked tasks w/o progress, we can release them safely w/o 
the need to commit, and if all tasks can be revoked w/o a commit, we don't 
commit at all).

This optimization has a bug, and may commit an open TX, even if no revoked task 
did make progress, and because the commit is done accidentally, we do not add 
offsets to the TX. Thus, we may commit result data of non-revoked tasks, w/o 
including the non-revoked tasks advanced offsets.

If this happens, and an fatal error happens right afterwards (ie before a 
consecutive commit for the same non-revoked tasks was done, which would mask 
the error by commit the correct offsets with the consecutive TX), we would seek 
to an incorrect (too old) offset after recovery, and re-process the same input 
data a second time, producing duplicate results.

These issue results in duplicates if the following conditions are all met at 
the same time:
 * a thread has more than one task assigned
 * when tasks are revoked, at least one task is not revoked
 * all revoked tasks did not make any progress (or to rephrase: no revoked task 
did make progress)
 * at least one non-revoked task did make progress
 * after the incorrect commit, and before the next successful commit, a fatal 
error happen, triggering a rebalance, task re-assignment and a "fetch offsets" 
happen after the rebalance

The bug was introduced via https://issues.apache.org/jira/browse/KAFKA-14294 in 
3.4.0 release.

Looking into the details, it actually seems that there is some other issue, 
that KAFKA-14294 did not address: KAFKA-14294 attempts to commit tasks after 
output was produced by a punctuation, even if offsets did not advance. However, 
it does apply this logic only for regular commits, but for the 
task-revoked-case, we don't commit. Thus, if a revoked task did not advance 
offsets, but did produce output data, no commit happens (while it seems we 
should commit the open TX for this case, too; this might even apply to the 
ALOS, case, too).

Thus, there is two possible fixes.
 # Try to identify all corner case and patch it up
 # Drop the optimization and just commit all tasks on 
`TaskManager#handleRevocation()`


> Kafka Streams incorrectly commits TX during task revokation
> -----------------------------------------------------------
>
>                 Key: KAFKA-18943
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18943
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.4.0
>            Reporter: Matthias J. Sax
>            Assignee: Matthias J. Sax
>            Priority: Blocker
>
> We found a very rare edge case in Kafka Streams commit logic, that may lead 
> to duplicates for EOSv2 under certain circumstances.
> Background:
> When tasks are revoked cleanly, we need to commit the progress the revoked 
> tasks made before we can release them. For EOSv2, if we commit, a thread 
> always needs to commit all tasks it owns. Thus, during revocation, if a 
> single revoked tasks did make progress, we need to include all non-revoked 
> task in the commit.
> To avoid unnecessary commits (which are expensive), we have an optimization 
> in place, that is supposed to skip the commit, if no revoked task did make 
> progress (ie, for revoked tasks w/o progress, we can release them safely w/o 
> the need to commit, and if all tasks can be revoked w/o a commit, we don't 
> commit at all).
> This optimization has a bug, and may commit an open TX, even if no revoked 
> task did make progress, and because the commit is done accidentally, we do 
> not add offsets to the TX. Thus, we may commit result data of non-revoked 
> tasks, w/o including the non-revoked tasks advanced offsets.
> If this happens, and an fatal error happens right afterwards (ie before a 
> consecutive commit for the same non-revoked tasks was done, which would mask 
> the error by commit the correct offsets with the consecutive TX), we would 
> seek to an incorrect (too old) offset after recovery, and re-process the same 
> input data a second time, producing duplicate results.
> These issue results in duplicates if the following conditions are all met at 
> the same time:
>  * a thread has more than one task assigned
>  * when tasks are revoked, at least one task is not revoked
>  * all revoked tasks did not make any progress (or to rephrase: no revoked 
> task did make progress)
>  * at least one non-revoked task did make progress
>  * after the incorrect commit, and before the next successful commit, a fatal 
> error happen, triggering a rebalance, task re-assignment and a "fetch 
> offsets" happen after the rebalance
> The bug was introduced via https://issues.apache.org/jira/browse/KAFKA-14294 
> in 3.4.0 release.
> Looking into the details, it actually seems that there is some other issue, 
> that KAFKA-14294 did not address: KAFKA-14294 attempts to commit tasks after 
> output was produced by a punctuation, even if offsets did not advance. 
> However, it does apply this logic only for regular commits, but for the 
> task-revoked-case, we don't commit. Thus, if a revoked task did not advance 
> offsets, but did produce output data, no commit happens (while it seems we 
> should commit the open TX for this case, too; this might even apply to the 
> ALOS case, too).
> Thus, there is two possible fixes.
>  # Try to identify all corner case and patch it up
>  # Drop the optimization and just commit all tasks on 
> `TaskManager#handleRevocation()`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to