[ https://issues.apache.org/jira/browse/KAFKA-9999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Chen updated KAFKA-9999: ------------------------------- Description: We spotted a case in system test failure where the topic already exists but the admin client still attempts to recreate it: {code:java} [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-uwin-cnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-cntByCnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics [SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog, SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made ready with 5 retries left (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,221] ERROR stream-thread [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread) org.apache.kafka.streams.errors.StreamsException: Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743){code} Looking closer it seems that we don't know as of today if a topic is pending deletion or running properly. We could discuss a follow-up effort to reflect that information as part of topic description result. The current solution to this problem is to explicitly trigger a rebalance when we run out of retries to unblock the group, as the short term unavailability should be more likely a broker side unavailability. was: We spotted a case in system test failure where the topic already exists but the admin client still attempts to recreate it: {code:java} [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-uwin-cnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-cntByCnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics [SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog, SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made ready with 5 retries left (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,221] ERROR stream-thread [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread) org.apache.kafka.streams.errors.StreamsException: Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743){code} Looking closer it seems that we don't know as of today if a topic is pending deletion or running properly. We could discuss a follow-up effort to reflect that information as part of topic description result. The current solution to this problem is to explicitly trigger a rebalance when we run out of retries to unblock the group, as the short term unavailability should be more likely a broker side unavailiability. > Internal topic creation failure should be non-fatal and trigger explicit > rebalance > ----------------------------------------------------------------------------------- > > Key: KAFKA-9999 > URL: https://issues.apache.org/jira/browse/KAFKA-9999 > Project: Kafka > Issue Type: Bug > Components: admin, streams > Affects Versions: 2.4.0 > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Major > > We spotted a case in system test failure where the topic already exists but > the admin client still attempts to recreate it: > > {code:java} > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog. Topic is probably > marked for deletion (number of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number > of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-uwin-cnt-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number > of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-cntByCnt-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics > [SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog, > SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made > ready with 5 retries left > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics > after 5 retries. This can happen if the Kafka cluster is temporary not > available. You can increase admin client config `retries` to be resilient > against this error. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,221] ERROR stream-thread > [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered > the following unexpected Kafka exception during processing, this usually > indicate Streams internal errors: > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.StreamsException: Could not create topics > after 5 retries. This can happen if the Kafka cluster is temporary not > available. You can increase admin client config `retries` to be resilient > against this error. > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743){code} > Looking closer it seems that we don't know as of today if a topic is pending > deletion or running properly. We could discuss a follow-up effort to reflect > that information as part of topic description result. > The current solution to this problem is to explicitly trigger a rebalance > when we run out of retries to unblock the group, as the short term > unavailability should be more likely a broker side unavailability. > -- This message was sent by Atlassian Jira (v8.3.4#803005)