[
https://issues.apache.org/jira/browse/KAFKA-17455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890484#comment-17890484
]
David Jacot commented on KAFKA-17455:
-------------------------------------
We were able to root cause the issue based on logs share by [~coltmcnealy-lh]
privately. Posting my analysis here for the record.
In the logs, I see the following two entries:
1.
{code:java}
17:21:09 [30mTRACE[m [KAFKA] TransactionManager - [Producer
clientId=my-cluster-1-core-StreamThread-1-producer,
transactionalId=my-cluster-core-00000001-0000-0000-0000-000000000000-1] Request
AddOffsetsToTxnRequestData(transactionalId='my-cluster-core-00000001-0000-0000-0000-000000000000-1',
producerId=2, producerEpoch=32, groupId='my-cluster-core') dequeued for
sending{code}
2.
{code:java}
17:22:09 [36mDEBUG[m [KAFKA] Sender - [Producer
clientId=my-cluster-1-core-StreamThread-1-producer,
transactionalId=my-cluster-core-00000001-0000-0000-0000-000000000000-1] Sending
transactional request
AddOffsetsToTxnRequestData(transactionalId='my-cluster-core-00000001-0000-0000-0000-000000000000-1',
producerId=2, producerEpoch=32, groupId='my-cluster-core') to node
localhost:9092 (id: 1 rack: null) with correlation ID 360{code}
Here is the code between those two log lines:
{code:java}
TransactionManager.TxnRequestHandler nextRequestHandler =
transactionManager.nextRequest(accumulator.hasIncomplete());
if (nextRequestHandler == null)
return false;
AbstractRequest.Builder<?> requestBuilder =
nextRequestHandler.requestBuilder();
Node targetNode = null;
try {
FindCoordinatorRequest.CoordinatorType coordinatorType =
nextRequestHandler.coordinatorType();
targetNode = coordinatorType != null ?
transactionManager.coordinator(coordinatorType) :
client.leastLoadedNode(time.milliseconds()).node();
if (targetNode != null) {
if (!awaitNodeReady(targetNode, coordinatorType)) {
log.trace("Target node {} not ready within request timeout,
will retry when node is ready.", targetNode);
maybeFindCoordinatorAndRetry(nextRequestHandler);
return true;
}
} else if (coordinatorType != null) {
log.trace("Coordinator not known for {}, will retry {} after
finding coordinator.", coordinatorType, requestBuilder.apiKey());
maybeFindCoordinatorAndRetry(nextRequestHandler);
return true;
} else {
log.trace("No nodes available to send requests, will poll and
retry when until a node is ready.");
transactionManager.retry(nextRequestHandler);
client.poll(retryBackoffMs, time.milliseconds());
return true;
}
if (nextRequestHandler.isRetry())
time.sleep(nextRequestHandler.retryBackoffMs());
long currentTimeMs = time.milliseconds();
ClientRequest clientRequest =
client.newClientRequest(targetNode.idString(), requestBuilder, currentTimeMs,
true, requestTimeoutMs, nextRequestHandler);
log.debug("Sending transactional request {} to node {} with
correlation ID {}", requestBuilder, targetNode, clientRequest.correlationId());
client.send(clientRequest, currentTimeMs);
{code}
The first log line is printed in transactionManager.nextRequest.
Hence I wonder if the sender waits in awaitNodeReady. That one calls
NetworkClientUtils.awaitReady which looks like this:
{code:java}
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout needs to be greater
than 0");
}
long startTime = time.milliseconds();
if (isReady(client, node, startTime) || client.ready(node, startTime))
return true;
long attemptStartTime = time.milliseconds();
while (!client.isReady(node, attemptStartTime) && attemptStartTime -
startTime < timeoutMs) {
if (client.connectionFailed(node)) {
throw new IOException("Connection to " + node + " failed.");
}
long pollTimeout = timeoutMs - (attemptStartTime - startTime); //
initialize in this order to avoid overflow
client.poll(pollTimeout, attemptStartTime);
if (client.authenticationException(node) != null)
throw client.authenticationException(node);
attemptStartTime = time.milliseconds();
}
return client.isReady(node, attemptStartTime);
{code}
Basically, if the node is not ready, we call client.poll with the pollTimeout
which is basically the timeoutMs received by the method in the beginning. Guess
what? The timeout is 60000 and we have 60s between the two log lines. If it
ends up there and there are no response received while it is, it waits until
the timeout.
Why would the node not be ready? I saw the following log line as bit before:
{code:java}
17:21:09 [30mTRACE[m [KAFKA] NetworkClient - [Producer
clientId=my-cluster-1-core-StreamThread-1-producer,
transactionalId=my-cluster-core-00000001-0000-0000-0000-000000000000-1]
Connection to node 1 is throttled for 13 ms until timestamp 1729038069730{code}
It means that the connection was throttled. In client.ready, we check the
throttle time and consider the node not ready if the throttle time has not
passed yet.
So my suspicion is that the node was not ready due to the throttle time so it
ended up waiting in poll for the request timeout.
> `TaskCorruptedException` After Client Quota Throttling
> ------------------------------------------------------
>
> Key: KAFKA-17455
> URL: https://issues.apache.org/jira/browse/KAFKA-17455
> Project: Kafka
> Issue Type: Bug
> Components: clients, streams
> Affects Versions: 3.8.0
> Reporter: Colt McNealy
> Assignee: Colt McNealy
> Priority: Major
>
> When running a Kafka Streams EOS app that goes slightly above a configured
> user quota, we can reliably reproduce `TaskCorruptedException`s after
> throttling. This is the case even with an application that goes only 5-10%
> above the configured quota.
>
> The root cause is a `TimeoutException` encountered in the
> `TaskExecutor.commitOffsetsOrTransaction`.
>
> Stacktrace provided below:
>
> ```
> 19:45:28 ERROR [KAFKA] TaskExecutor - stream-thread
> [basic-tls-0-core-StreamThread-2] Committing task(s) 1_2 failed.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> 60000ms while awaiting AddOffsetsToTxn 19:45:28 WARN [KAFKA] StreamThread -
> stream-thread [basic-tls-0-core-StreamThread-2] Detected the states of tasks
> [1_2] are corrupted. Will close the task as dirty and re-create and bootstrap
> from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks
> [1_2] are corrupted and hence need to be re-initialized at
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:249)
> ~[server.jar:?] at
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154)
> ~[server.jar:?] at
> org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1915)
> ~[server.jar:?] at
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1882)
> ~[server.jar:?] at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1384)
> ~[server.jar:?] at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1033)
> ~[server.jar:?] at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
> [server.jar:?] at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> [server.jar:?]
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)