[ https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851490#comment-17851490 ]
Ganesh Sadanala commented on KAFKA-16876: ----------------------------------------- [~rohanpd] Can you share the `TaskManagerException` stack trace as well? > TaskManager.handleRevocation doesn't handle errors thrown from > task.prepareCommit > --------------------------------------------------------------------------------- > > Key: KAFKA-16876 > URL: https://issues.apache.org/jira/browse/KAFKA-16876 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.6.0 > Reporter: Rohan Desai > Assignee: Ganesh Sadanala > Priority: Minor > > `TaskManager.handleRevocation` does not handle exceptions thrown by > `task.prepareCommit`. In the particular instance I observed, `pepareCommit` > flushed caches which led to downstream `producer.send` calls that threw a > `TaskMigratedException`. This means that the tasks that need to be revoked > are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the > thrown exception and then moves on to the other task assignment callbacks. > One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks > and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks > if close fails so we don't leak any tasks. I think there's maybe two bugs > here: > # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. > It should try not to leave any revoked tasks in an unsuspended state. > # The `ConsumerCoordinator` just throws the first exception that it sees. > But it seems bad to throw the `TaskMigratedException` and drop the > `IllegalStateException` (though in this case I think its relatively benign). > I think on `IllegalStateException` we really want the streams thread to exit. > One idea here is to have `ConsumerCoordinator` throw an exception type that > includes the other exceptions that it has seen in another field. But this > breaks the contract for clients that catch specific exceptions. I'm not sure > of a clean solution, but I think its at least worth recording that it would > be preferable to have the caller of `poll` handle all the thrown exceptions > rather than just the first one. > > Here is the IllegalStateException stack trace I observed: > {code:java} > [ 508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 > [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - > stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St > reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining > tasks before re-throwing: > [ 508.535] [service_application2] [inf] > java.lang.IllegalStateException: Illegal state RUNNING while closing active > task 0_3 > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673) > ~[kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546) > ~[kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295) > ~[kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > dev.responsive.kafka.internal.clients.DelegatingConsumer.poll(DelegatingConsumer.java:94) > [kafka-client-0.24.0-dc9acd1.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > [kafka-streams-3.6.0.jar:?] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)