[ 
https://issues.apache.org/jira/browse/KAFKA-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14157511#comment-14157511
 ] 

Sriharsha Chintalapani commented on KAFKA-1663:
-----------------------------------------------

[~nehanarkhede] I've a question on TopicDeletionManager code
I think the idea is to start the DeleteTopicsThread and wait for the events 
like topics are added to delete set and trigger the doWork() method. 
right now doWork() method calls awaitTopicDeletionNotifictation

  private def awaitTopicDeletionNotification() {
    inLock(deleteLock) {
      while(!deleteTopicsThread.isRunning.get() && 
!deleteTopicStateChanged.compareAndSet(true, false)) {
                       deleteTopicsCond.await()
}
This condition seems to be wrong and it doesn't block DeleteTopicThread once 
the TopicDeletionManager starts the deleteTopicsThread.doWork() continues to 
execute.

The above condition should state
 while(deleteTopicsThread.isRunning.get() && 
deleteTopicStateChanged.compareAndSet(true, false)) {
        deleteTopicsCond.await()
}
whenever there is topic is added we are callign resumeTopicDeletionThread() 
which sets deleteTopicStateChanged to true and sends deleteTopicCond.signal() 
which should wake up doWork() and continue with deletion of the topic.
I am testing with this change, will update with the results. 


> Controller unable to shutdown after a soft failure
> --------------------------------------------------
>
>                 Key: KAFKA-1663
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1663
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Sriharsha Chintalapani
>            Priority: Blocker
>             Fix For: 0.8.2
>
>
> As part of testing KAFKA-1558 I came across a case where inducing soft 
> failure in the current controller elects a new controller  but the old 
> controller doesn't shutdown properly.
> steps to reproduce
> 1) 5 broker cluster
> 2) high number of topics(I tested it with 1000 topics)
> 3) on the current controller do kill -SIGSTOP  pid( broker's process id)
> 4) wait for bit over zookeeper timeout (server.properties)
> 5) kill -SIGCONT pid
> 6) There will be a new controller elected. check old controller's
> log 
> [2014-09-30 15:59:53,398] INFO [SessionExpirationListener on 1], ZK expired; 
> shut down all controller components and try to re-elect 
> (kafka.controller.KafkaController$SessionExpirationListener)
> [2014-09-30 15:59:53,400] INFO [delete-topics-thread-1], Shutting down 
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> If it stops there and the broker  logs keeps printing 
> Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> than the controller shutdown never completes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to