[ https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15618009#comment-15618009 ]
Michal Borowiecki edited comment on KAFKA-4355 at 10/29/16 12:18 PM: --------------------------------------------------------------------- Trying to work around this issue by calling System.exit from the UncaughtExceptionHandler (once the app dies, it will be re-started by our infrastructure). We are adding a shutdown hook as per example here: http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ {code:java} Runtime.getRuntime().addShutdownHook(new Thread(schedulerStreams::close)); {code} However, even though both stream threads report completion of shutdown: {noformat} [2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) [2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) {noformat} and before that report the closing of their producers and consumers, the app is not stopped. At least the following 2 threads remain active and keep logging: {noformat} [2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590021 after 0ms (org.apache.zookeeper.ClientCnxn) [2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] Sending metadata request {topics=[scheduler]} to node 0 (org.apache.kafka.clients.NetworkClient) [2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = [Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = [0,])]) (org.apache.kafka.clients.Metadata) [2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590022 after 0ms (org.apache.zookeeper.ClientCnxn) {noformat} "Stopped Kafka Stream process" is never logged, so the close method remains blocked on the join here, I suspect: https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227 PS. When we don't add the shutdown hook to call close(), then the app exits correctly on System.exit(). I think it is pretty bad behaviour if the close() method blocks indefinitely, so I'll raise a separate ticket, unless I find one exists for that already. It should be easier to reproduce hopefully. was (Author: mihbor): Trying to work around this issue by calling System.exit from the UncaughtExceptionHandler (once the app dies, it will be re-started by our infrastructure). We are adding a shutdown hook as per example here: http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ {code:java} Runtime.getRuntime().addShutdownHook(new Thread(schedulerStreams::close)); {code} However, even though both stream threads report completion of shutdown: {noformat} [2016-10-29 12:32:10,616] INFO [StreamThread-2] stream-thread [StreamThread-2] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) [2016-10-29 12:32:20,490] INFO [StreamThread-1] stream-thread [StreamThread-1] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) {noformat} and before that report the closing of their producers and consumers, the app is not stopped. At least the following 2 threads remain active and keep logging: {noformat} [2016-10-29 12:37:05,625] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590021 after 0ms (org.apache.zookeeper.ClientCnxn) [2016-10-29 12:37:09,815] DEBUG [kafka-producer-network-thread | producer-1] Sending metadata request {topics=[scheduler]} to node 0 (org.apache.kafka.clients.NetworkClient) [2016-10-29 12:37:09,818] DEBUG [kafka-producer-network-thread | producer-1] Updated cluster metadata version 15 to Cluster(id = enenZ_SbQKaRlOyJKQMn_g, nodes = [lp02485.openbet:19373 (id: 0 rack: null)], partitions = [Partition(topic = scheduler, partition = 0, leader = 0, replicas = [0,], isr = [0,])]) (org.apache.kafka.clients.Metadata) [2016-10-29 12:37:12,945] DEBUG [main-SendThread(localhost:19374)] Got ping response for sessionid: 0x158101fc9590022 after 0ms (org.apache.zookeeper.ClientCnxn) {noformat} "Stopped Kafka Stream process" is never logged, so the close method remains blocked on the join here, I suspect: https://github.com/apache/kafka/blob/e876df8b37fc6ea54b0a0571306c4a833c919cda/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L227 > StreamThread intermittently dies with "Topic not found during partition > assignment" when broker restarted > --------------------------------------------------------------------------------------------------------- > > Key: KAFKA-4355 > URL: https://issues.apache.org/jira/browse/KAFKA-4355 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.0, 0.10.0.0 > Environment: kafka 0.10.0.0 > kafka 0.10.1.0 > uname -a > Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC > 2016 x86_64 x86_64 x86_64 GNU/Linux > java -version > java version "1.8.0_92" > Java(TM) SE Runtime Environment (build 1.8.0_92-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode) > Reporter: Michal Borowiecki > Assignee: Guozhang Wang > > When (a) starting kafka streams app before the broker or > (b) restarting the broker while the streams app is running: > the stream thread intermittently dies with "Topic not found during partition > assignment" StreamsException. > This happens about between one in 5 or one in 10 times. > Stack trace: > {noformat} > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Topic not found during > partition assignment: scheduler > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81) > at > org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > {noformat} > Our app has 2 streams in it, consuming from 2 different topics. > Sometimes the exception happens on both stream threads. Sometimes only on one > of the stream threads. > The exception is preceded by: > {noformat} > [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group > pool-scheduler > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator > lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group > pool-scheduler > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator > lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group > pool-scheduler > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread > [StreamThread-2] Completed validating internal topics in partition assignor > (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) > [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread > [StreamThread-2] Shutting down > (org.apache.kafka.streams.processor.internals.StreamThread) > [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread > [StreamThread-2] Shutting down > (org.apache.kafka.streams.processor.internals.StreamThread) > [2016-10-28 16:17:55,358] INFO [StreamThread-2] Closing the Kafka producer > with timeoutMillis = 9223372036854775807 ms. > (org.apache.kafka.clients.producer.KafkaProducer) > [2016-10-28 16:17:55,364] INFO [StreamThread-2] stream-thread > [StreamThread-2] Removing all active tasks [[]] > (org.apache.kafka.streams.processor.internals.StreamThread) > [2016-10-28 16:17:55,364] INFO [StreamThread-2] stream-thread > [StreamThread-2] Removing all active tasks [[]] > (org.apache.kafka.streams.processor.internals.StreamThread) > [2016-10-28 16:17:55,365] INFO [StreamThread-2] stream-thread > [StreamThread-2] Removing all standby tasks [[]] > (org.apache.kafka.streams.processor.internals.StreamThread) > [2016-10-28 16:17:55,365] INFO [StreamThread-2] stream-thread > [StreamThread-2] Removing all standby tasks [[]] > (org.apache.kafka.streams.processor.internals.StreamThread) > [2016-10-28 16:17:55,365] INFO [StreamThread-2] stream-thread > [StreamThread-2] Stream thread shutdown complete > (org.apache.kafka.streams.processor.internals.StreamThread) > [2016-10-28 16:17:55,365] INFO [StreamThread-2] stream-thread > [StreamThread-2] Stream thread shutdown complete > (org.apache.kafka.streams.processor.internals.StreamThread) > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Topic not found during > partition assignment: scheduler > {noformat} > This is happening regardless if we use kafka streams and broker versions > 0.10.0.0 or 0.10.1.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)