[ https://issues.apache.org/jira/browse/KAFKA-1558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14165358#comment-14165358 ]
Sriharsha Chintalapani commented on KAFKA-1558: ----------------------------------------------- [~nehanarkhede] [~guozhang] [~junrao] I ran tests for simultaneously running preferred replica election tool and deleting multiple topics. I kept running into KAFKA-1305 but by increasing controller.message.queue.size to 1000 I was able to run these tests successfully. While testing this couple of things caught my eye. following code in KafkaController.PreferredReplicaElectionListener {code} val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) if(partitionsForTopicsToBeDeleted.size > 0) { error("Skipping preferred replica election for partitions %s since the respective topics are being deleted" .format(partitionsForTopicsToBeDeleted)) } else controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted) } {code} It doesn't need a else part there since its calling onPreferredReplicaElection by removing partitionsForTopicsToBeDeleted In PartitionStateMachine.DeleteTopicListener {code} if(topicsToBeDeleted.size > 0) { info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(",")) // add topic to deletion list controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted) // mark topic ineligible for deletion if other state changes are in progress topicsToBeDeleted.foreach { topic => val preferredReplicaElectionInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic) val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic) if(preferredReplicaElectionInProgress || partitionReassignmentInProgress) controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) } } {code} The above code enqueueTopicsForDeletion which calls resumeTopicDeletionThread() to start the deletion of topics mark topic ineligible should be before the enqueueTopicsForDeletion. This way deletion of the topic won't happen if there is preferred replica election or partitions reassignment going for the said topics. I am testing these changes. Let me know what you think of these changes. Thanks. > AdminUtils.deleteTopic does not work > ------------------------------------ > > Key: KAFKA-1558 > URL: https://issues.apache.org/jira/browse/KAFKA-1558 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.1.1 > Reporter: Henning Schmiedehausen > Assignee: Sriharsha Chintalapani > Priority: Blocker > Fix For: 0.8.2 > > Attachments: kafka-thread-dump.log > > > the AdminUtils:.deleteTopic method is implemented as > {code} > def deleteTopic(zkClient: ZkClient, topic: String) { > ZkUtils.createPersistentPath(zkClient, > ZkUtils.getDeleteTopicPath(topic)) > } > {code} > but the DeleteTopicCommand actually does > {code} > zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) > zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) > {code} > so I guess, that the 'createPersistentPath' above should actually be > {code} > def deleteTopic(zkClient: ZkClient, topic: String) { > ZkUtils.deletePathRecursive(zkClient, ZkUtils.getTopicPath(topic)) > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)