[ 
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851490#comment-17851490
 ] 

Ganesh Sadanala commented on KAFKA-16876:
-----------------------------------------

[~rohanpd] Can you share the `TaskManagerException` stack trace as well?

> TaskManager.handleRevocation doesn't handle errors thrown from 
> task.prepareCommit
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-16876
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16876
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.6.0
>            Reporter: Rohan Desai
>            Assignee: Ganesh Sadanala
>            Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by 
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
> flushed caches which led to downstream `producer.send` calls that threw a 
> `TaskMigratedException`. This means that the tasks that need to be revoked 
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the 
> thrown exception and then moves on to the other task assignment callbacks. 
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks 
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks 
> if close fails so we don't leak any tasks. I think there's maybe two bugs 
> here:
>  # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. 
> It should try not to leave any revoked tasks in an unsuspended state.
>  # The `ConsumerCoordinator` just throws the first exception that it sees. 
> But it seems bad to throw the `TaskMigratedException` and drop the 
> `IllegalStateException` (though in this case I think its relatively benign). 
> I think on `IllegalStateException` we really want the streams thread to exit. 
> One idea here is to have `ConsumerCoordinator` throw an exception type that 
> includes the other exceptions that it has seen in another field. But this 
> breaks the contract for clients that catch specific exceptions. I'm not sure 
> of a clean solution, but I think its at least worth recording that it would 
> be preferable to have the caller of `poll` handle all the thrown exceptions 
> rather than just the first one.
>  
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
> reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
> tasks before re-throwing:
> [       508.535] [service_application2] [inf] 
> java.lang.IllegalStateException: Illegal state RUNNING while closing active 
> task 0_3
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) 
> [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) 
> [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> dev.responsive.kafka.internal.clients.DelegatingConsumer.poll(DelegatingConsumer.java:94)
>  [kafka-client-0.24.0-dc9acd1.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  [kafka-streams-3.6.0.jar:?] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to