AlexeyASF created KAFKA-16319:
---------------------------------
Summary: Wrong broker destinations for DeleteRecords requests when
more than one topic is involved and the topics/partitions are led by different
brokers
Key: KAFKA-16319
URL: https://issues.apache.org/jira/browse/KAFKA-16319
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.6.1
Reporter: AlexeyASF
h2. Context
Kafka streams applications send, time after time, {{DeleteRecords}} requests,
via
{{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}}
method. Such requests may involve more than 1 topic (or partition), and such
requests are supposed to be sent to partitions' leaders brokers.
h2. Observed behaviour
In case when {{DeleteRecords}} request includes more than one topic (let's say
2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different
brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is
sent to only one broker (let’s say {{{}broker1{}}}), leading to partial
not_leader_or_follower errors. As not the whole request was successful
({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the
_same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the response
will be partially faulty again and again. It also may (and does) happen that
there is a “mirrored” half-faulty request - in this case, to {{{}broker2{}}},
where {{topic2}} operation is successful, but {{topic1}} operation fails.
Here’s an anonymised logs example from a production system (“direct” and
“mirrored” requests, one after another):
{code:java}
[AdminClient clientId=worker-admin]
Sending DeleteRecordsRequestData(topics=[
DeleteRecordsTopic(
name='topic1',
partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
),
DeleteRecordsTopic(
name='topic2',
partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
)], timeoutMs=60000)
to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1
correlationId=42003907, timeoutMs=30000
[AdminClient clientId=worker-admin]
Sending DeleteRecordsRequestData(topics=[
DeleteRecordsTopic(
name='topic1',
partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
),
DeleteRecordsTopic(
name='topic2',
partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
)], timeoutMs=60000)
to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2
correlationId=42003906, timeoutMs=30000 {code}
Such request results in the following response (in this case, only for the
"direct" response):
{code:java}
[AdminClient clientId=worker-admin]
Call(
callName=deleteRecords(api=DELETE_RECORDS),
deadlineMs=...,
tries=..., // Can be hundreds
nextAllowedTryMs=...)
got response DeleteRecordsResponseData(
throttleTimeMs=0,
topics=[
DeleteRecordsTopicResult(
name='topic2',
partitions=[DeleteRecordsPartitionResult(
partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the
errorCode 6, which is not_leader_or_follower
DeleteRecordsTopicResult(
name='topic1',
partitions=[DeleteRecordsPartitionResult(
partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the
errorCode 0, which means the operation was successful
]
) {code}
h2. Expected behaviour
{{DeleteRecords}} requests are sent to corresponding partitions' leaders
brokers when more than 1 topic/partition is involved and they are led by
different brokers.
h2. Notes
* {_}presumably{_}, introduced in 3.6.1 via
[https://github.com/apache/kafka/pull/13760] .
--
This message was sent by Atlassian Jira
(v8.20.10#820010)