[ https://issues.apache.org/jira/browse/KAFKA-330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13621189#comment-13621189 ]
Neha Narkhede commented on KAFKA-330: ------------------------------------- Thanks for the patch! Some suggestions - 1. In controller, it is important to not let a long delete topics operation block critical state changes like elect leader. To make this possible, relinquish the lock between the deletes for individual topics 2. If you do relinquish the lock like I suggested above, you need to now take care of avoid leader elections for partitions being deleted 3. Since now you will handle topic deletion for individual topics, it might be worth changing the zookeeper structure for delete topics so status on individual topic deletes gets reported accordingly. One way to do this is to introduce a path to indicate that the admin tool has initiated delete operation for some topics (/admin/delete_topics_updated), and create child nodes under /admin/delete_topics, one per topic. As you complete individual topic deletion, you delete the /admin/delete_topics/<topic> path. Admin tool creates the /admin/delete_topics/<topic> path and updates /admin/delete_topics_updated. Controller only registers a data change watcher on /admin/delete_topics_updated. When this watcher fires, it reads the children of /admin/delete_topics and starts topic deletion. 4. On startup/failover, the controller registers a data change watch on /admin/delete_topics_updated, and then reads the list of topics under /admin/delete_topics. 5. Admin tool never errors out since it just adds to the list of deleted topics On the broker side, there are a few things to be done correctly - 1. KafkaApis After receiving stop replica request, request handler should reject produce/fetch requests for partitions to be deleted by returning PartitionBeingDeleted error code. Once the delete is complete, the partition can be removed from this list. In that case, it will return UnknownTopicOrPartition error code 2. ReplicaManager 2.1 Remove unused variable leaderBrokerId from makeFollower() 2.2 Fix the comment inside recordFollowerPosition to say "partition hasn't been created or has been deleted" 2.3 Let the partition do the delete() operation. This will ensure that the leaderAndIsrUpdateLock is acquired for the duration of the delete. This will avoid interleaving leader/isr requests with stop replica requests and simplify the reasoning of log truncate/highwatermark update operations 3. Partition - Introduce a new delete() API that works like this - 1. Acquire leaderIsrUpdateLock so that create log does not interfere with delete log. Also remove/add fetcher does not interfere with delete log. 2. Removes fetcher for the partition 3. Invoke delete() on the log. Be careful how current read/write requests will be affected. 4. LogManager 1. When deleteLogs() is invoked, remove logs from allLogs. This will prevent flush being invoked on the log to be deleted. 2. Invoke log.delete() on every individual log. 3. log.markDeletedWhile(_ => true) will leave an extra rolled over segment in the in memory segment list 5. Log 1. Log delete should acquire "lock" to prevent interleaving with append/truncate/roll/flush etc Following steps need to be taken during log.delete() 2. Invoke log.close() 3. Invoke segmentList.delete(), where SegmentList.delete() only does contents.set(new Array[T](0)) 4. Invoke segment.delete() 5. Update a flag deleted = true Few questions to be thought about - - Are any changes required to roll(). If deleted flag is true, then skip roll(). - Are any changes required to markDeletedWhile(). Same as roll. If deleted flag is true, skip - Are any changes required to flush() ? This can be invoked either during roll or by append. It cannot be invoked by the flush thread since that is disabled for logs to be deleted. This needs to be handled by using lastOption. - See what to do with truncateTo(). This is used during make follower in Partition. This won't interfere with delete since Partition's delete acquires the leaderIsrUpdateLock. Another place that uses truncateTo() is the handleOffsetOutOfRange on the follower. This won't interleave since the replica fetcher was already removed before attempting to delete the log - See what to do with truncateAndStartWithNewOffset(). This won't interleave with delete log since the replica fetcher was already removed before attempting to delete the log - What if the broker is writing from the log when stop replica is deleting it ? Since log.delete() acquires the "lock", either append starts before or after the delete. If it starts after, then the changed mentioned in #7 and #9 should be made. - What if the broker is about to write to the log that is under deletion ? Same as above - What if the broker is reading from the log that is being deleted ? It will get a ClosedChannelException, I think. This needs to be conformed. The test can run a consumer that is consuming data from beginning of a log and you can invoke delete topic. - What if the broker about to read from the log that is being deleted ? It will try reading from a file channel that is closed. This will run into ClosedChannelException. Should we catch ClosedChannelException and log an appropriate error and send PartitionDeleted error code when that happens ? - What happens to the partition entry from the high watermark file when it is being deleted ? When partition is removed from allPartitions, the next high watermark checkpoint removes the partition's entry from the high watermark file. - What happens to requests in the purgatory when partition has been deleted ? When a partition has been removed from allPartitions, then the requests in the purgatory will send UnknownTopicOrPartitionCode back to the client. 6. Log.read() val first = view.head.start This needs to change to headOption. Return empty message set when this returns None 7. Log.flush() segments.view.last.flush() Need to change the above to segments.view.lastOption. If that returns None, then return without flushing. 8. SegmentList.delete() contents.set(new Array[T](0)) 9. Log.append() Fix this to use lastOption - val segment = maybeRoll(segments.view.last) If None, then return (-2,-2) to signify that the log was deleted > Add delete topic support > ------------------------- > > Key: KAFKA-330 > URL: https://issues.apache.org/jira/browse/KAFKA-330 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Neha Narkhede > Assignee: Swapnil Ghike > Priority: Blocker > Labels: features, kafka-0.8, p2, project > Fix For: 0.8 > > Attachments: kafka-330-v1.patch > > > One proposal of this API is here - > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2#KafkareplicationdetaileddesignV2-Deletetopic -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira