[ 
https://issues.apache.org/jira/browse/KAFKA-1558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14165358#comment-14165358
 ] 

Sriharsha Chintalapani commented on KAFKA-1558:
-----------------------------------------------

[~nehanarkhede] [~guozhang] [~junrao] I ran tests for simultaneously running  
preferred replica election tool and deleting multiple topics.
I kept running into KAFKA-1305 but by increasing controller.message.queue.size 
to 1000 I was able to run these tests successfully.
While testing this couple of things caught my eye.
following code in KafkaController.PreferredReplicaElectionListener
{code}
      val partitions = partitionsForPreferredReplicaElection -- 
controllerContext.partitionsUndergoingPreferredReplicaElection
      val partitionsForTopicsToBeDeleted = partitions.filter(p => 
controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
      if(partitionsForTopicsToBeDeleted.size > 0) {
        error("Skipping preferred replica election for partitions %s since the 
respective topics are being deleted"
          .format(partitionsForTopicsToBeDeleted))
      }
      else
        controller.onPreferredReplicaElection(partitions -- 
partitionsForTopicsToBeDeleted)
    }
{code}
It doesn't need a else part there since its calling onPreferredReplicaElection 
by removing partitionsForTopicsToBeDeleted

In PartitionStateMachine.DeleteTopicListener
{code}
        if(topicsToBeDeleted.size > 0) {
          info("Starting topic deletion for topics " + 
topicsToBeDeleted.mkString(","))
          // add topic to deletion list
          
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
          // mark topic ineligible for deletion if other state changes are in 
progress
          topicsToBeDeleted.foreach { topic =>
            val preferredReplicaElectionInProgress =
              
controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
            val partitionReassignmentInProgress =
              
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
            if(preferredReplicaElectionInProgress || 
partitionReassignmentInProgress)
              
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
          }
        }
{code}

The above code  enqueueTopicsForDeletion which calls 
resumeTopicDeletionThread() to start the deletion of topics 
mark topic ineligible should be before the enqueueTopicsForDeletion. This way 
deletion of the topic won't happen if there is preferred replica election or 
partitions reassignment going for the said topics. I am testing these changes. 
Let me know what you think of these changes. Thanks.


> AdminUtils.deleteTopic does not work
> ------------------------------------
>
>                 Key: KAFKA-1558
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1558
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.1.1
>            Reporter: Henning Schmiedehausen
>            Assignee: Sriharsha Chintalapani
>            Priority: Blocker
>             Fix For: 0.8.2
>
>         Attachments: kafka-thread-dump.log
>
>
> the AdminUtils:.deleteTopic method is implemented as
> {code}
>     def deleteTopic(zkClient: ZkClient, topic: String) {
>         ZkUtils.createPersistentPath(zkClient, 
> ZkUtils.getDeleteTopicPath(topic))
>     }
> {code}
> but the DeleteTopicCommand actually does
> {code}
>     zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
>     zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
> {code}
> so I guess, that the 'createPersistentPath' above should actually be 
> {code}
>     def deleteTopic(zkClient: ZkClient, topic: String) {
>         ZkUtils.deletePathRecursive(zkClient, ZkUtils.getTopicPath(topic))
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to