[ https://issues.apache.org/jira/browse/KAFKA-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16987992#comment-16987992 ]
John Roesler commented on KAFKA-9231: ------------------------------------- Opened https://issues.apache.org/jira/browse/KAFKA-9268 as a follow-on for earlier branches. > Streams Threads may die from recoverable errors with EOS enabled > ---------------------------------------------------------------- > > Key: KAFKA-9231 > URL: https://issues.apache.org/jira/browse/KAFKA-9231 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0 > Reporter: John Roesler > Assignee: John Roesler > Priority: Blocker > Fix For: 2.4.0 > > > While testing Streams in EOS mode under frequent and heavy network > partitions, I've encountered the following error, leading to thread death: > {noformat} > [2019-11-26 04:54:02,650] ERROR > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors: > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.StreamsException: stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] Failed > to rebalance. > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:739) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] failed > to suspend stream tasks > at > org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:253) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:116) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707) > at > org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073) > at > org.apache.kafka.streams.processor.internals.StreamThread.enforceRebalance(StreamThread.java:716) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:710) > ... 1 more > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task > [1_1] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000007 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:279) > at > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:175) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:581) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:535) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:660) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:628) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendRunningTasks(AssignedStreamsTasks.java:145) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendOrCloseTasks(AssignedStreamsTasks.java:128) > at > org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:246) > ... 7 more > Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer > attempted an operation with an old epoch. Either there is a newer producer > with the same transactionalId, or the producer's transaction has been expired > by the broker. > [2019-11-26 04:54:02,650] INFO > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] State > transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread) > [2019-11-26 04:54:02,650] INFO > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > Shutting down (org.apache.kafka.streams.processor.internals.StreamThread) > [2019-11-26 04:54:02,650] INFO > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > [Consumer > clientId=stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2-restore-consumer, > groupId=null] Unsubscribed all topics or patterns and assigned partitions > (org.apache.kafka.clients.consumer.KafkaConsumer) > [2019-11-26 04:54:02,653] INFO > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] State > transition from PENDING_SHUTDOWN to DEAD > (org.apache.kafka.streams.processor.internals.StreamThread) > {noformat} > Elsewhere in the code, we catch ProducerFencedExceptions and trigger a > rebalance instead of killing the thread. It seems like one possible avenue > has slipped through the cracks. -- This message was sent by Atlassian Jira (v8.3.4#803005)