[ 
https://issues.apache.org/jira/browse/KAFKA-330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13620816#comment-13620816
 ] 

Swapnil Ghike commented on KAFKA-330:
-------------------------------------

Patch v1 attached. 

How topics are deleted: 
1. The DeleteTopicsCommand writes to /admin/delete_topics in zk and exits.
2. The DeleteTopicsCommand complains if a topic that is being deleted is absent 
in zookeeper. It won't run even if at least one of the topics specified is 
actually present in the zookeeper. 
3. A DeleteTopicsListener is triggered in controller. It moves the replicas and 
partitions to Offline->NonExistent states, deletes the partitions from 
controller's memory, sends StopReplicaRequests with deletePartition=true.
4. Brokers on receiving the StopReplicaRequest remove the partition from their 
own memory and delete the logs.
5. If all the partitions were successfully deleted, the topic path is deleted 
from zookeeper.
6. Controller always deletes the admin/delete_topics path at the end. It checks 
in removeFromTopicsBeingDeleted() whether each topic has been deleted from 
zookeeper, at which point it declares victory or logs a warning of shame.


How to validate that the topics have been deleted:
1. Rerun the DeleteTopicsCommand, it should complain that the topics are absent 
in zookeeper.


Special comments:
A. TopicChangeListener:
1. I think that we should not handle deleted topics here. We should rather 
modify the controller's memory in NonExistentPartition state change. This is 
because the controller will release its lock between DeleteTopics listener and 
TopicChangeListener, we should want the controller's memory to be up-to-date 
when the lock is released with the completion of DeleteTopics listener.
2. Probably there is no need to add the new topics' partititon-replica 
assignment to controllerContext.partitionReplicaAssignment, because 
onNewTopicCreation() will do that. I put a TODO there. Please correct if I am 
wrong.


Handling failures:

A. What happens when controller fails:
1. Before OfflineReplica state change: New controller context will be 
initialized and initializeAndMaybeTriggerTopicDeletion() will delete the topics.
2. After OfflineReplica state change and before OfflinePartition state change: 
Initialization of controller context will re-insert replicas into ISR, and 
initializeAndMaybeTriggerTopicDeletion() will delete the topics.
3. After OfflinePartition state change and before NonExistentReplica state 
change: Ditto as 2.
4. After NonExistentReplica state change and before NonExistentPartition state 
change: The replicas that were deleted will be restarted on individual brokers, 
then the topics will be deleted.
5. After NonExistentPartition state change and before deleting topics from zk: 
Ditto as 3. (The NonExistentPartition state change in partition state machine 
currently does not delete the partitions from zk, it assumes that the 
controller will delete them, which is similar to what we do for some other 
state changes as of now).
I think the deletion should proceed smoothly even if the controller fails over 
in the middle of 1,2,3,4 or 5 above.

B. What happens if a topic is deleted when a broker that has a replica of that 
topic's partition is down? =>
i. When the broker comes back up and the topic has been deleted from zk, the 
controller can only tell the broker which topics are currently alive. The 
broker should delete the dead logs when it receives the first leaderAndIsr 
request. This can be done just before starting the hw checkpointing thread. 
ii. This will also be useful in replica reassignment for a partition. When the 
replica reassignment algorithms sends a StopReplica request with delete=true, 
the receiving broker could be down. After the broker is back up, it will 
realize that it needs to delete the logs for certain partitions that are no 
longer assigned to it.


Possible corner cases:
1. What happens to hw checkpointing for deleted partitions? => 
checkpointHighWatermarks() reads the current allPartitions() on a broker and 
writes the hw. So the hw for deleted partitions will disappear.

2. What happens to Produce/Fetch requests in purgatory? => 
i. After the topics have been deleted, produce requests in purgatory will 
expire because there will no fetchers, fetch requests will expire because 
producer requests would fail in appendToLocalLog() and no more data will be 
appended.
ii. Expiration of producer requests is harmless. 
iii. Expiration of fetch requests will try to send whatever data is remaining, 
but it will not be able to send any data because the replica would be dead. We 
could think of forcing the delayed fetch requests to expire before the replica 
is deleted and remove the expired requests from the delayed queue, but that 
would probably require synchronizing on the delayed queue. Thoughts?


Other unrelated changes: 
A. ReplicaStateMachine
1. Moved NonExistentReplica to the bottom of cases to maintain the same order 
as PartitionStateMachine.
2. Deleted a redundant replicaState.put(replica,OnlineReplica) statement.
3. Even if a replica is not in the ISR, it should always be moved to 
OfflineReplica state.

B. Utils.scala:
1. Bug fix in seqToJson().  

Testing done:
1. Bring up one broker, create topics, delete topics, verify zk, verify that 
logs are gone. 
2. Bring up two brokers, create topics, delete topics, verify zk, verify that 
logs are gone from both brokers.
3. Repeat the above 1 and 2 with more than one partition per topic.
4. Write to admin/delete_paths, bring up the controller, watch the topic and 
logs get deleted.
5. Bring up two brokers, create two topics with replication factor of two, 
verify that the logs get created. Now, shut down broker 1 and delete a topic. 
Verify that the topic disappears from zk and logs of broker 0. Bring up broker 
1, verify that the topic disappears from the logs of broker 1 because 
controller (broker 0) will send leaderAndIsr request for the remaining topic.
6. Validate error inputs.
7. Validate that the tool prints error when a non-existent topic is being 
deleted.

Is it ok if I write unit tests after this patch is checked in, in case there 
are modifications?
                
> Add delete topic support 
> -------------------------
>
>                 Key: KAFKA-330
>                 URL: https://issues.apache.org/jira/browse/KAFKA-330
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Swapnil Ghike
>            Priority: Blocker
>              Labels: features, kafka-0.8, p2, project
>
> One proposal of this API is here - 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2#KafkareplicationdetaileddesignV2-Deletetopic

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to