[
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851514#comment-17851514
]
Ganesh Sadanala edited comment on KAFKA-16876 at 6/3/24 4:21 AM:
-----------------------------------------------------------------
[~rohanpd] Thank your for confirming it! I see the flow is from
`TaskManager#handleRevocation` -> `TaskManager#prepareCommitAndAddOffsetsToMap`
-> `StreamTask#prepareCommit` -> `StreamTask#flush` ->
`ProcessorStateManager#flushCache`
and registered state stores are iterated inside it, I see that in your case
TaskMigratedException is thrown and is caught inside here:
{code:java}
catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException) {
firstException = exception;
} {code}
and finally it is thrown to `StreamTask#flush` method, where I see it is not
caught/handled. Hence, the entire flow leads to Runtime errors and all the
active tasks are not revoked. Please correct me if I am wrong.
So you would want it to be handled inside the `StreamTask#flush` method
appropriately?
Also could you guide me how you produced those exceptions, I want to produce
them in my local to get a better picture.
Anything else you want to share will be beneficial.
Thank you!
was (Author: JIRAUSER305566):
[~rohanpd] Thank your for confirming it! I see the flow is from
`TaskManager#handleRevocation` -> `TaskManager#prepareCommitAndAddOffsetsToMap`
-> `StreamTask#prepareCommit` -> `StreamTask#flush` ->
`ProcessorStateManager#flushCache`
and registered state stores are iterated inside it, I see that in your case
TaskManagerException is thrown and is caught inside here:
{code:java}
catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException) {
firstException = exception;
} {code}
and finally it is thrown to `StreamTask#flush` method, where I see it is not
caught/handled. Hence, the entire flow leads to Runtime errors and all the
active tasks are not revoked. Please correct me if I am wrong.
So you would want it to be handled inside the `StreamTask#flush` method
appropriately?
Also could you guide me how you produced those exceptions, I want to produce
them in my local to get a better picture.
Anything else you want to share will be beneficial.
Thank you!
> TaskManager.handleRevocation doesn't handle errors thrown from
> task.prepareCommit
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-16876
> URL: https://issues.apache.org/jira/browse/KAFKA-16876
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.6.0
> Reporter: Rohan Desai
> Assignee: Ganesh Sadanala
> Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit`
> flushed caches which led to downstream `producer.send` calls that threw a
> `TaskMigratedException`. This means that the tasks that need to be revoked
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the
> thrown exception and then moves on to the other task assignment callbacks.
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks
> if close fails so we don't leak any tasks. I think there's maybe two bugs
> here:
> # `TaskManager.handleRevocation` should handle errors from `prepareCommit`.
> It should try not to leave any revoked tasks in an unsuspended state.
> # The `ConsumerCoordinator` just throws the first exception that it sees.
> But it seems bad to throw the `TaskMigratedException` and drop the
> `IllegalStateException` (though in this case I think its relatively benign).
> I think on `IllegalStateException` we really want the streams thread to exit.
> One idea here is to have `ConsumerCoordinator` throw an exception type that
> includes the other exceptions that it has seen in another field. But this
> breaks the contract for clients that catch specific exceptions. I'm not sure
> of a clean solution, but I think its at least worth recording that it would
> be preferable to have the caller of `poll` handle all the thrown exceptions
> rather than just the first one.
>
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [ 508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager -
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
> reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining
> tasks before re-throwing:
> [ 508.535] [service_application2] [inf]
> java.lang.IllegalStateException: Illegal state RUNNING while closing active
> task 0_3
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
> ~[kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
> ~[kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
> ~[kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> dev.responsive.kafka.internal.clients.DelegatingConsumer.poll(DelegatingConsumer.java:94)
> [kafka-client-0.24.0-dc9acd1.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
> [kafka-streams-3.6.0.jar:?] {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)