[ https://issues.apache.org/jira/browse/KAFKA-7181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16548365#comment-16548365 ]
Matthias J. Sax commented on KAFKA-7181: ---------------------------------------- Thanks for reporting the issue. Couple of comments: - if a thread dies, it's correct that we don't start a new thread, and so far there are no plans to change this (might be a feature request but not a bug) - it's questionable, if we should set the state to RUNNING, because RUNNING indicates that all thread are running; however, one thread died and thus, this condition is not met any longer While I agree, that we should allow to restart a died thread in general, I am not sure if this should happen automatically. For the state transition, I also don't think we should do anything automatically – however, if might make sense to allow users to "acknowledge" a dead thread; this "acknowledgment" could be used to either restart or just cleanup/remove the dead thread from the pool and thus it would allow us to transit back into state RUNNING. The reasoning is, that users should make a decision how to proceed if a thread dies, and if users tells Kafka Streams to cleanup and resume processing, it's ok to transit back to RUNNING – otherwise, the state should indicate that something is wrong. I agree, that REBALACING is not a good indicator for a dead thread – however, one should register an uncaught exception handler to learn about dying threads. Just my 2 cents. Would this work for you? > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters IllegalStateException > ------------------------------------------------------------------------------------------------------- > > Key: KAFKA-7181 > URL: https://issues.apache.org/jira/browse/KAFKA-7181 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Romil Kumar Vasani > Priority: Major > > One the StreamThread encounters an IllegalStateException and is marked DEAD, > shut down. > The application doesn't spawn a new thread in it's place, the partitions of > that thread are assigned to a different thread and it synchronizes. But the > application is stuck in REBALANCING state, as not all StreamThreads are in > RUNNING state. > Excepted: New thread should come up and after synchronization/rebalancing it > the KafkaStream.State should be RUNNING > Since all the active threads (that are not marked DEAD) are in RUNNING state, > the KafkaStreams.State should be RUNNING > P.S. I am reporting an issue for the first time. If there is more information > needed I can provide. > Below are the logs from the IllegalStateException: > 2018-07-18 03:02:27.510 ERROR 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [prd1565.prod.nuke.ops.v1-StreamThread-2] Encountered the following error > during processing: > java.lang.IllegalStateException: No current assignment for partition > consumerGroup-stateStore-changelog-10 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] State transition from RUNNING to > PENDING_SHUTDOWN > 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] Shutting down > 2018-07-18 03:02:27.571 INFO 1 — [-StreamThread-2] > o.a.k.clients.producer.KafkaProducer : [Producer > clientId=consumerGroup-StreamThread-2-producer] Closing the Kafka producer > with timeoutMillis = 9223372036854775807 ms. > 2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD > 2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] Shutdown complete > 2018-07-18 03:02:27.579 ERROR 1 — [-StreamThread-2] xxx.xxx.xxx.AppRunner : > Unhandled exception in thread: 43:consumerGroup-StreamThread-2 > java.lang.IllegalStateException: No current assignment for partition > consumerGroup-inventoryStore-changelog-10 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) -- This message was sent by Atlassian JIRA (v7.6.3#76005)