[ 
https://issues.apache.org/jira/browse/KAFKA-330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13621189#comment-13621189
 ] 

Neha Narkhede commented on KAFKA-330:
-------------------------------------

Thanks for the patch! Some suggestions -

1. In controller, it is important to not let a long delete topics operation 
block critical state changes like elect leader. To make this possible, 
relinquish the lock between the deletes for individual topics
2. If you do relinquish the lock like I suggested above, you need to now take 
care of avoid leader elections for partitions being deleted
3. Since now you will handle topic deletion for individual topics, it might be 
worth changing the zookeeper structure for delete topics so status on 
individual topic deletes gets reported accordingly. One way to do this is to 
introduce a path to indicate that the admin tool has initiated delete operation 
for some topics (/admin/delete_topics_updated), and create child nodes under 
/admin/delete_topics, one per topic. As you complete individual topic deletion, 
you delete the /admin/delete_topics/<topic> path. Admin tool creates the 
/admin/delete_topics/<topic> path and updates /admin/delete_topics_updated. 
Controller only registers a data change watcher on 
/admin/delete_topics_updated. When this watcher fires, it reads the children of 
/admin/delete_topics and starts topic deletion. 
4. On startup/failover, the controller registers a data change watch on 
/admin/delete_topics_updated, and then reads the list of topics under 
/admin/delete_topics.
5. Admin tool never errors out since it just adds to the list of deleted topics

On the broker side, there are a few things to be done correctly -

1. KafkaApis
After receiving stop replica request, request handler should reject 
produce/fetch requests for partitions to be deleted by returning 
PartitionBeingDeleted error code. Once the delete is complete, the partition 
can be removed from this list. In that case, it will return 
UnknownTopicOrPartition error code

2. ReplicaManager
2.1 Remove unused variable leaderBrokerId from makeFollower()
2.2 Fix the comment inside recordFollowerPosition to say "partition hasn't been 
created or has been deleted"
2.3 Let the partition do the delete() operation. This will ensure that the 
leaderAndIsrUpdateLock is acquired for the duration of the delete. This will 
avoid interleaving leader/isr requests with stop replica requests and simplify 
the reasoning of log truncate/highwatermark update operations

3. Partition - Introduce a new delete() API that works like this -
1. Acquire leaderIsrUpdateLock so that create log does not interfere with 
delete log. Also remove/add fetcher does not interfere with delete log.
2. Removes fetcher for the partition
3. Invoke delete() on the log. Be careful how current read/write requests will 
be affected.

4. LogManager
1. When deleteLogs() is invoked, remove logs from allLogs. This will prevent 
flush being invoked on the log to be deleted.
2. Invoke log.delete() on every individual log.
3. log.markDeletedWhile(_ => true) will leave an extra rolled over segment in 
the in memory segment list

5. Log
1. Log delete should acquire "lock" to prevent interleaving with 
append/truncate/roll/flush etc
Following steps need to be taken during log.delete()
2. Invoke log.close()
3. Invoke segmentList.delete(), where SegmentList.delete() only does 
contents.set(new Array[T](0))
4. Invoke segment.delete()
5. Update a flag deleted = true

Few questions to be thought about -

- Are any changes required to roll(). If deleted flag is true, then skip roll().
- Are any changes required to markDeletedWhile(). Same as roll. If deleted flag 
is true, skip
- Are any changes required to flush() ? This can be invoked either during roll 
or by append. It cannot be invoked by the flush thread since that is disabled 
for logs to be deleted. This needs to be handled by using lastOption. 
- See what to do with truncateTo(). This is used during make follower in 
Partition. This won't interfere with delete since Partition's delete acquires 
the leaderIsrUpdateLock. Another place that uses truncateTo() is the 
handleOffsetOutOfRange on the follower. This won't interleave since the replica 
fetcher was already removed before attempting to delete the log
- See what to do with truncateAndStartWithNewOffset(). This won't interleave 
with delete log since the replica fetcher was already removed before attempting 
to delete the log
- What if the broker is writing from the log when stop replica is deleting it ? 
Since log.delete() acquires the "lock", either append starts before or after 
the delete. If it starts after, then the changed mentioned in #7 and #9 should 
be made. 
- What if the broker is about to write to the log that is under deletion ? Same 
as above
- What if the broker is reading from the log that is being deleted ? It will 
get a ClosedChannelException, I think. This needs to be conformed. The test can 
run a consumer that is consuming data from beginning of a log and you can 
invoke delete topic. 
- What if the broker about to read from the log that is being deleted ? It will 
try reading from a file channel that is closed. This will run into 
ClosedChannelException. Should we catch ClosedChannelException and log an 
appropriate error and send PartitionDeleted error code when that happens ?
- What happens to the partition entry from the high watermark file when it is 
being deleted ? When partition is removed from allPartitions, the next high 
watermark checkpoint removes the partition's entry from the high watermark file.
- What happens to requests in the purgatory when partition has been deleted ? 
When a partition has been removed from allPartitions, then the requests in the 
purgatory will send UnknownTopicOrPartitionCode back to the client.

6. Log.read()
val first = view.head.start
This needs to change to headOption. Return empty message set when this returns 
None

7. Log.flush()
segments.view.last.flush()
Need to change the above to segments.view.lastOption. If that returns None, 
then return without flushing. 

8. SegmentList.delete()
contents.set(new Array[T](0))

9. Log.append()
Fix this to use lastOption - val segment = maybeRoll(segments.view.last)
If None, then return (-2,-2) to signify that the log was deleted





                
> Add delete topic support 
> -------------------------
>
>                 Key: KAFKA-330
>                 URL: https://issues.apache.org/jira/browse/KAFKA-330
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Swapnil Ghike
>            Priority: Blocker
>              Labels: features, kafka-0.8, p2, project
>             Fix For: 0.8
>
>         Attachments: kafka-330-v1.patch
>
>
> One proposal of this API is here - 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2#KafkareplicationdetaileddesignV2-Deletetopic

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to