[ 
https://issues.apache.org/jira/browse/KAFKA-330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13884468#comment-13884468
 ] 

Neha Narkhede commented on KAFKA-330:
-------------------------------------

Delete topic is a pretty tricky feature and there are multiple ways to solve 
it. I will list the various approaches with the tradeoffs here. Few things to 
think about that make delete topic tricky -

1. How do you handle resuming delete topics during controller failover?
2. How do you handle re-creating topics if brokers that host a subset of the 
replicas are down?
3. If a broker fails during delete topic, how does it know which version of the 
topic it has logs for, when it restarts? This is relevant if we allow 
re-creating topics while a broker is down

Will address these one by one. 

#1 is pretty straightforward to handle and can be achieved in a way similar to 
partition reassignment (through an admin path in zookeeper indicating a topic 
deletion that has not finished)

#2 is an important policy decision that can affect the complexity of the design 
for this feature. If you allow topics to be deleted while brokers are down, the 
broker needs a way to know that it's version of the topic is too old. This is 
mainly an issue since a topic can be re-created and written to, while a broker 
is down. We need to ensure that a broker does not join the quorum with an older 
version of the log. There are 2 ways to solve this problem that I could think 
off -
   1. Do not allow topic deletion to succeed if a broker hosting a replica is 
down. Here, the controller keeps track of the state of each replica during 
topic deletion    (TopicDeletionStarted, TopicDeletionSuccessful, 
TopicDeletionFailed) and only marks the topic as deleted if all replicas for 
all partitions of that topic are successfully deleted. 
   2. Allow a topic to be deleted while a broker is down and keep track of the 
"generation" of the topic in a fault tolerant, highly available and consistent 
log. This log can either be zookeeper or a Kafka topic. The main issue here is 
how many generations would we have to keep track off for a topic. In other 
words, can this "generation" information ever be garbage collected. There isn't 
a good bound on this since it is unclear when the failed broker will come back 
online and when a topic will be re-created. That would mean keeping this 
generation information for potentially a very long time and incurring overhead 
during recovery or bootstrap of generation information during controller or 
broker fail overs. This is especially a problem for use cases or tests that 
keep creating and deleting a lot of short lived topics. Essentially, this 
solution is not scalable unless we figure out an intuitive way to garbage 
collect this topic metadata. It would require us to introduce a config for 
controlling when a topic's generation metadata can be garbage collected. Note 
that this config is different from the topic TTL feature which controls when a 
topic, that is currently not in use, can be deleted. Overall, this alternative 
is unnecessarily complex for the benefit of deleting topics while a broker is 
down.

#3 is related to the policy decision made about #2. If a topic is not marked 
deleted successfully while a broker is down, the controller will automatically 
resume topic deletion when a broker restarts. 

This patch follows the previous approach of not calling a topic deletion 
successful until all replicas have confirmed the deletion of local state for 
that topic. This requires the following changes -
1. TopicCommand issues topic deletion by creating a new admin path 
/admin/delete_topics/<topic>

2. The controller listens for child changes on /admin/delete_topic and starts 
topic deletion for the respective topics

3. The controller has a background thread that handles topic deletion. The 
purpose of having this background thread is to accommodate the TTL feature, 
when we have it. This thread is signaled whenever deletion for a topic needs to 
be started or resumed. Currently, a topic's deletion can be started only by the 
onPartitionDeletion callback on the controller. In the future, it can be 
triggered based on the configured TTL for the topic. A topic's deletion will be 
halted in the following scenarios -
* broker hosting one of the replicas for that topic goes down
* partition reassignment for partitions of that topic is in progress
* preferred replica election for partitions of that topic is in progress 
(though this is not strictly required since it holds the controller lock for 
the entire duration from start to end)

4. Topic deletion is resumed when -
* broker hosting one of the replicas for that topic is started
* preferred replica election for partitions of that topic completes
* partition reassignment for partitions of that topic completes 

5. Every replica for a topic being deleted is in either of the 3 states - 
* TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the 
onPartitionDeletion callback is invoked. This happens when the child change 
watch for /admin/delete_topics fires on the controller. As part of this state 
change, the controller sends StopReplicaRequests to all replicas. It registers 
a callback for the StopReplicaResponse when deletePartition=true thereby 
invoking a callback when a response for delete replica is received from every 
replica)
* TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas from 
TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in 
StopReplicaResponse)
* TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from 
TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in 
StopReplicaResponse. In general, if a broker dies and if it hosted replicas for 
topics being deleted, the controller marks the respective replicas in 
TopicDeletionFailed state in the onBrokerFailure callback. The reason is that 
if a broker fails before the request is sent and after the replica is in 
TopicDeletionStarted state, it is possible that the replica will mistakenly 
remain in TopicDeletionStarted state and topic deletion will not be retried 
when the broker comes back up.)

6. The delete topic thread marks a topic successfully deleted only if all 
replicas are in TopicDeletionSuccessful state and it starts the topic deletion 
teardown mode where it deletes all topic state from the controllerContext as 
well as from zookeeper. This is the only time the /brokers/topics/<topic> path 
gets deleted. 
On the other hand, if no replica is in TopicDeletionStarted state and at least 
one replica is in TopicDeletionFailed state, then it marks the topic for 
deletion retry. 

7. I've introduced callbacks for controller-broker communication. Ideally, 
every callback should be of the following format (RequestOrResponse) => Unit. 
BUT since StopReplicaResponse doesn't carry the replica id, this is handled in 
a somewhat hacky manner in the patch. The purpose is to fix the approach of 
upgrading controller-broker protocols in a reasonable way before having delete 
topic upgrade StopReplica request in a one-off way. Will file a JIRA for that.

Several integration tests added for delete topic -

1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel

> Add delete topic support 
> -------------------------
>
>                 Key: KAFKA-330
>                 URL: https://issues.apache.org/jira/browse/KAFKA-330
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller, log, replication
>    Affects Versions: 0.8.0, 0.8.1
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>              Labels: features, project
>         Attachments: KAFKA-330.patch, kafka-330-v1.patch, kafka-330-v2.patch
>
>
> One proposal of this API is here - 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2#KafkareplicationdetaileddesignV2-Deletetopic



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to