[ 
https://issues.apache.org/jira/browse/KAFKA-17455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890484#comment-17890484
 ] 

David Jacot commented on KAFKA-17455:
-------------------------------------

We were able to root cause the issue based on logs share by [~coltmcnealy-lh] 
privately. Posting my analysis here for the record.

In the logs, I see the following two entries:

1.
{code:java}
17:21:09 [30mTRACE[m [KAFKA] TransactionManager - [Producer 
clientId=my-cluster-1-core-StreamThread-1-producer, 
transactionalId=my-cluster-core-00000001-0000-0000-0000-000000000000-1] Request 
AddOffsetsToTxnRequestData(transactionalId='my-cluster-core-00000001-0000-0000-0000-000000000000-1',
 producerId=2, producerEpoch=32, groupId='my-cluster-core') dequeued for 
sending{code}

2.
{code:java}
17:22:09 [36mDEBUG[m [KAFKA] Sender - [Producer 
clientId=my-cluster-1-core-StreamThread-1-producer, 
transactionalId=my-cluster-core-00000001-0000-0000-0000-000000000000-1] Sending 
transactional request 
AddOffsetsToTxnRequestData(transactionalId='my-cluster-core-00000001-0000-0000-0000-000000000000-1',
 producerId=2, producerEpoch=32, groupId='my-cluster-core') to node 
localhost:9092 (id: 1 rack: null) with correlation ID 360{code}
Here is the code between those two log lines:
{code:java}
        TransactionManager.TxnRequestHandler nextRequestHandler = 
transactionManager.nextRequest(accumulator.hasIncomplete());
        if (nextRequestHandler == null)
            return false;
        AbstractRequest.Builder<?> requestBuilder = 
nextRequestHandler.requestBuilder();
        Node targetNode = null;
        try {
            FindCoordinatorRequest.CoordinatorType coordinatorType = 
nextRequestHandler.coordinatorType();
            targetNode = coordinatorType != null ?
                    transactionManager.coordinator(coordinatorType) :
                    client.leastLoadedNode(time.milliseconds()).node();
            if (targetNode != null) {
                if (!awaitNodeReady(targetNode, coordinatorType)) {
                    log.trace("Target node {} not ready within request timeout, 
will retry when node is ready.", targetNode);
                    maybeFindCoordinatorAndRetry(nextRequestHandler);
                    return true;
                }
            } else if (coordinatorType != null) {
                log.trace("Coordinator not known for {}, will retry {} after 
finding coordinator.", coordinatorType, requestBuilder.apiKey());
                maybeFindCoordinatorAndRetry(nextRequestHandler);
                return true;
            } else {
                log.trace("No nodes available to send requests, will poll and 
retry when until a node is ready.");
                transactionManager.retry(nextRequestHandler);
                client.poll(retryBackoffMs, time.milliseconds());
                return true;
            }
            if (nextRequestHandler.isRetry())
                time.sleep(nextRequestHandler.retryBackoffMs());
            long currentTimeMs = time.milliseconds();
            ClientRequest clientRequest = 
client.newClientRequest(targetNode.idString(), requestBuilder, currentTimeMs,
                true, requestTimeoutMs, nextRequestHandler);
            log.debug("Sending transactional request {} to node {} with 
correlation ID {}", requestBuilder, targetNode, clientRequest.correlationId());
            client.send(clientRequest, currentTimeMs);
{code}

The first log line is printed in transactionManager.nextRequest.

Hence I wonder if the sender waits in awaitNodeReady. That one calls 
NetworkClientUtils.awaitReady which looks like this:
{code:java}
        if (timeoutMs < 0) {
            throw new IllegalArgumentException("Timeout needs to be greater 
than 0");
        }
        long startTime = time.milliseconds();
        if (isReady(client, node, startTime) ||  client.ready(node, startTime))
            return true;
        long attemptStartTime = time.milliseconds();
        while (!client.isReady(node, attemptStartTime) && attemptStartTime - 
startTime < timeoutMs) {
            if (client.connectionFailed(node)) {
                throw new IOException("Connection to " + node + " failed.");
            }
            long pollTimeout = timeoutMs - (attemptStartTime - startTime); // 
initialize in this order to avoid overflow
            client.poll(pollTimeout, attemptStartTime);
            if (client.authenticationException(node) != null)
                throw client.authenticationException(node);
            attemptStartTime = time.milliseconds();
        }
        return client.isReady(node, attemptStartTime);
{code}
 

Basically, if the node is not ready, we call client.poll with the pollTimeout 
which is basically the timeoutMs received by the method in the beginning. Guess 
what? The timeout is 60000 and we have 60s between the two log lines. If it 
ends up there and there are no response received while it is, it waits until 
the timeout.

Why would the node not be ready? I saw the following log line as bit before:
{code:java}
17:21:09 [30mTRACE[m [KAFKA] NetworkClient - [Producer 
clientId=my-cluster-1-core-StreamThread-1-producer, 
transactionalId=my-cluster-core-00000001-0000-0000-0000-000000000000-1] 
Connection to node 1 is throttled for 13 ms until timestamp 1729038069730{code}
It means that the connection was throttled. In client.ready, we check the 
throttle time and consider the node not ready if the throttle time has not 
passed yet.

So my suspicion is that the node was not ready due to the throttle time so it 
ended up waiting in poll for the request timeout.

> `TaskCorruptedException` After Client Quota Throttling
> ------------------------------------------------------
>
>                 Key: KAFKA-17455
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17455
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, streams
>    Affects Versions: 3.8.0
>            Reporter: Colt McNealy
>            Assignee: Colt McNealy
>            Priority: Major
>
> When running a Kafka Streams EOS app that goes slightly above a configured 
> user quota, we can reliably reproduce `TaskCorruptedException`s after 
> throttling. This is the case even with an application that goes only 5-10% 
> above the configured quota.
>  
> The root cause is a `TimeoutException` encountered in the 
> `TaskExecutor.commitOffsetsOrTransaction`.
>  
> Stacktrace provided below:
>  
> ```
> 19:45:28 ERROR [KAFKA] TaskExecutor - stream-thread 
> [basic-tls-0-core-StreamThread-2] Committing task(s) 1_2 failed. 
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 60000ms while awaiting AddOffsetsToTxn 19:45:28 WARN [KAFKA] StreamThread - 
> stream-thread [basic-tls-0-core-StreamThread-2] Detected the states of tasks 
> [1_2] are corrupted. Will close the task as dirty and re-create and bootstrap 
> from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks 
> [1_2] are corrupted and hence need to be re-initialized at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:249)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1915)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1882)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1384)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1033)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  [server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?]
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to