----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17460/#review33179 -----------------------------------------------------------
A few high level comments: 1. Do we need to block adding partitions when a topic is being deleted? 2. StopReplica callback: When the callback is created, we can probably let it reference a local val for replica id. This way, we can have all callbacks use the same callback api. 3. We are now managing replica states in two places, one in ReplicaStateMachine and another in controllerContext.topicDeletionProgressInfo. This makes the code a bit hard to read since everytime we want to do a replica state transition, we have to check the current replica state in two places. Would it be better to integrate topicDeletion related states to ReplicaStateMachine? 4. The synchronization on deleteTopicsCond: It seems that it would be better if we create a separate class like TopicDeletionManager that manages this condition and provide all needed apis for synchronization. That way, only TopicDeletionManager needs to access the condition. We can probably move the delete topic thread there too. core/src/main/scala/kafka/controller/ControllerChannelManager.scala <https://reviews.apache.org/r/17460/#comment62514> Is the check on b>=0 needed since broker id config already checks that? core/src/main/scala/kafka/controller/KafkaController.scala <https://reviews.apache.org/r/17460/#comment62494> What about replicas in the deleted topic? Should we move them to onlineReplica state? core/src/main/scala/kafka/controller/KafkaController.scala <https://reviews.apache.org/r/17460/#comment62492> Should this be logged inside the if statement below? core/src/main/scala/kafka/controller/KafkaController.scala <https://reviews.apache.org/r/17460/#comment62498> There are trade-offs btw deleting the deleteTopicPath before or after deleting the topic path itself. If we delete the deleteTopicPath after and we fail in the middle, a topic can be auto created and then the delete logic resumes, which deletes the re-created topic. If we delete the deleteTopicPath before and we fail in the middle, we won't actually delete the topic. Both cases are rare and I am not sure which one is noticeably better than the other. It seems that the latter is sightly better since we can always try to delete the topic again. In the former, we may have deleted some useful data after topic re-creation. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/17460/#comment62500> Could we use map {case(topicAndPartition, partitionStateInfo) => ... }? core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/17460/#comment62501> Could we use map {case(topicAndPartition, partitionStateInfo) => ... }? core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/17460/#comment62505> Ditto as the above. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/17460/#comment62502> Could we use map {case(topicAndPartition, partitionStateInfo) => ... }? - Jun Rao On Jan. 29, 2014, 6:01 a.m., Neha Narkhede wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17460/ > ----------------------------------------------------------- > > (Updated Jan. 29, 2014, 6:01 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-330 > https://issues.apache.org/jira/browse/KAFKA-330 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's review comments > > > Fixed docs in a few places > > > Fixed the resume logic for partition reassignment to also include topics that > are queued up for deletion, since topic deletetion is halted until partition > reassignment can finish anyway. We need to let partition reassignment finish > (since it started before topic deletion) so that topic deletion can resume > > > Organized imports > > > Moved offline replica handling to controller failover > > > Reading replica assignment from zookeeper instead of local cache > > > Deleting unused APIs > > > Reverting the change to the stop replica request protocol. Instead hacking > around with callbacks > > > All functionality and all unit tests working > > > Rebased with trunk after controller cleanup patch > > > Diffs > ----- > > core/src/main/scala/kafka/admin/AdminUtils.scala > a167756f0fd358574c8ccb42c5c96aaf13def4f5 > core/src/main/scala/kafka/admin/TopicCommand.scala > 842c11047cca0531fbc572fdb25523244ba2b626 > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 820f0f57b00849a588a840358d07f3a4a31772d4 > core/src/main/scala/kafka/api/StopReplicaResponse.scala > d7e36308263aec2298e8adff8f22e18212e33fca > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > ea8485b479155b479c575ebc89a4f73086c872cb > core/src/main/scala/kafka/controller/KafkaController.scala > a0267ae2670e8d5f365e49ec0fa5db1f62b815bf > core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala > fd9200f3bf941aab54df798bb5899eeb552ea3a3 > core/src/main/scala/kafka/controller/PartitionStateMachine.scala > ac4262a403fc73edaecbddf55858703c640b11c0 > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala > 483559aa64726c51320d18b64a1b48f8fb2905a0 > core/src/main/scala/kafka/server/KafkaApis.scala > bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e > core/src/main/scala/kafka/server/KafkaHealthcheck.scala > 9dca55c9254948f1196ba17e1d3ebacdcd66be0c > core/src/main/scala/kafka/server/OffsetCheckpoint.scala > b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b > core/src/main/scala/kafka/server/ReplicaManager.scala > f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 > core/src/main/scala/kafka/server/TopicConfigManager.scala > 42e98dd66f3269e6e3a8210934dabfd65df2dba9 > core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala > b189619bdc1b0d2bba8e8f88467fce014be96ccd > core/src/main/scala/kafka/utils/ZkUtils.scala > b42e52b8e5668383b287b2a86385df65e51b5108 > core/src/test/scala/unit/kafka/admin/AdminTest.scala > 59de1b469fece0b28e1d04dcd7b7015c12576abb > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > 8df0982a1e71e3f50a073c4ae181096d32914f3e > core/src/test/scala/unit/kafka/server/LogOffsetTest.scala > 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > c0475d07a778ff957ad266c08a7a81ea500debd2 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > 03e6266ffdad5891ec81df786bd094066b78b4c0 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > d88b6c3e8fd8098d540998b6a82a65cec8a1dcb0 > > Diff: https://reviews.apache.org/r/17460/diff/ > > > Testing > ------- > > Several integration tests added to test - > > 1. Topic deletion when all replica brokers are alive > 2. Halt and resume topic deletion after a follower replica is restarted > 3. Halt and resume topic deletion after a controller failover > 4. Request handling during topic deletion > 5. Topic deletion and partition reassignment in parallel > 6. Topic deletion and preferred replica election in parallel > 7. Topic deletion and per topic config changes in parallel > > > Thanks, > > Neha Narkhede > >