[ https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
José Armando García Sancio resolved KAFKA-15100. ------------------------------------------------ Resolution: Fixed > Unsafe to call tryCompleteFetchResponse on request timeout > ---------------------------------------------------------- > > Key: KAFKA-15100 > URL: https://issues.apache.org/jira/browse/KAFKA-15100 > Project: Kafka > Issue Type: Bug > Components: kraft > Reporter: José Armando García Sancio > Assignee: José Armando García Sancio > Priority: Major > Fix For: 3.6.0, 3.4.2, 3.5.2 > > > When the fetch request times out the future is completed from the > "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that > tryCompleteFetchResponse is always called from the same thread. This > invariant is violated in this case. > {code:java} > return future.handle((completionTimeMs, exception) -> { > if (exception != null) { > Throwable cause = exception instanceof ExecutionException ? > exception.getCause() : exception; // > If the fetch timed out in purgatory, it means no new data is available, > // and we will complete the fetch successfully. Otherwise, > if there was > // any other error, we need to return it. > Errors error = Errors.forException(cause); > if (error != Errors.REQUEST_TIMED_OUT) { > logger.info("Failed to handle fetch from {} at {} due > to {}", > replicaId, fetchPartition.fetchOffset(), error); > return buildEmptyFetchResponse(error, Optional.empty()); > } > } // FIXME: `completionTimeMs`, which can be null > logger.trace("Completing delayed fetch from {} starting at > offset {} at {}", > replicaId, fetchPartition.fetchOffset(), completionTimeMs); > return tryCompleteFetchRequest(replicaId, fetchPartition, > time.milliseconds()); > }); > {code} > One solution is to always build an empty response if the future was completed > exceptionally. This works because the ExpirationService completes the future > with a `TimeoutException`. > A longer-term solution is to use a more flexible event executor service. This > would be a service that allows more kinds of event to get scheduled/submitted > to the KRaft thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)