----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17460/#review33711 -----------------------------------------------------------
In the follow-up patch that serialize all the admin tasks in the back ground thread, I would suggest switching away from using the callbacks to trigger state change while executing the process but depending on some ZK path change, as we did for partition re-assignment. Since the controller-broker communication is already async, I think it is OK to not retry the stopReplicaRequest, but let the brokers to detect some replicas it currently holds have already be deleted through MetadataRequest, which will become the source of truth anyways. core/src/main/scala/kafka/api/ControlledShutdownResponse.scala <https://reviews.apache.org/r/17460/#comment63321> Ditto as below core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala <https://reviews.apache.org/r/17460/#comment63320> This import may be removed: this is the only change in this file. core/src/main/scala/kafka/api/StopReplicaResponse.scala <https://reviews.apache.org/r/17460/#comment63330> Could you change to (topicAndPartition, errorCode) <- responseMap ? core/src/main/scala/kafka/api/UpdateMetadataRequest.scala <https://reviews.apache.org/r/17460/#comment63331> Ditto as above. core/src/main/scala/kafka/controller/ControllerChannelManager.scala <https://reviews.apache.org/r/17460/#comment63334> Instead of checking the replicaId == -1 case here, I feel it is better to handle it in ReplicaStateMachine.handleStateChange function, for indicating the devs that this is possible that the leader becomes -1. core/src/main/scala/kafka/controller/ControllerChannelManager.scala <https://reviews.apache.org/r/17460/#comment63335> Ditto as above. core/src/main/scala/kafka/controller/ControllerChannelManager.scala <https://reviews.apache.org/r/17460/#comment63355> When the broker is down, the RequestSendThread will just keep trying resend, and the callback function will not be executed until the broker is back and a receive is returned from channel.receive(), is that correct? If yes, then will the process be blocked during the time the broker is down? core/src/main/scala/kafka/controller/TopicDeletionManager.scala <https://reviews.apache.org/r/17460/#comment63342> I remember the coding principle for this function is to omit ()? core/src/main/scala/kafka/controller/TopicDeletionManager.scala <https://reviews.apache.org/r/17460/#comment63341> It is an exception case if topicsTobeDeleted.!contains(topics). core/src/main/scala/kafka/controller/TopicDeletionManager.scala <https://reviews.apache.org/r/17460/#comment63343> Ditto above core/src/main/scala/kafka/controller/TopicDeletionManager.scala <https://reviews.apache.org/r/17460/#comment63352> Use inLock here? core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63350> On general comment is that since for each test we need to setup ZK and Broker before and tear them down after, which could be dominant time consuming in running these tests, maybe we can merge some of the testcases into one? - Guozhang Wang On Feb. 5, 2014, 5:31 p.m., Neha Narkhede wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17460/ > ----------------------------------------------------------- > > (Updated Feb. 5, 2014, 5:31 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-330 > https://issues.apache.org/jira/browse/KAFKA-330 > > > Repository: kafka > > > Description > ------- > > Joel's review suggestions - Changed the controllerLock instances to inLock > instead of synchronized, fixed some logging > > > Removed init() API from TopicDeletionManager and added docs to > TopicDeletionManager to describe the lifecycle of topic deletion > > > Updated docs for the new states. Removed the changes to log4j.properties > > > Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, > unit tests working > > > Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup > of some APIs pending > > > Changed controller to reference APIs in TopicDeletionManager. All unit tests > pass > > > Introduced a TopicDeletionManager. KafkaController changes pending to use the > new TopicDeletionManager > > > Addressed Guozhang's review comments > > > Fixed docs in a few places > > > Fixed the resume logic for partition reassignment to also include topics that > are queued up for deletion, since topic deletetion is halted until partition > reassignment can finish anyway. We need to let partition reassignment finish > (since it started before topic deletion) so that topic deletion can resume > > > Organized imports > > > Moved offline replica handling to controller failover > > > Reading replica assignment from zookeeper instead of local cache > > > Deleting unused APIs > > > Reverting the change to the stop replica request protocol. Instead hacking > around with callbacks > > > All functionality and all unit tests working > > > Rebased with trunk after controller cleanup patch > > > Diffs > ----- > > core/src/main/scala/kafka/admin/AdminUtils.scala > a167756f0fd358574c8ccb42c5c96aaf13def4f5 > core/src/main/scala/kafka/admin/TopicCommand.scala > 842c11047cca0531fbc572fdb25523244ba2b626 > core/src/main/scala/kafka/api/ControlledShutdownResponse.scala > a80aa4924cfe9a4670591d03258dd82c428bc3af > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > a984878fbd8147b21211829a49de511fd1335421 > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 820f0f57b00849a588a840358d07f3a4a31772d4 > core/src/main/scala/kafka/api/StopReplicaResponse.scala > d7e36308263aec2298e8adff8f22e18212e33fca > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > ea8485b479155b479c575ebc89a4f73086c872cb > core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION > core/src/main/scala/kafka/controller/KafkaController.scala > a0267ae2670e8d5f365e49ec0fa5db1f62b815bf > core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala > fd9200f3bf941aab54df798bb5899eeb552ea3a3 > core/src/main/scala/kafka/controller/PartitionStateMachine.scala > ac4262a403fc73edaecbddf55858703c640b11c0 > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala > 483559aa64726c51320d18b64a1b48f8fb2905a0 > core/src/main/scala/kafka/controller/TopicDeletionManager.scala > PRE-CREATION > core/src/main/scala/kafka/network/BlockingChannel.scala > d22dabdf4fc2346c5487b9fd94cadfbcab70040d > core/src/main/scala/kafka/server/KafkaApis.scala > bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e > core/src/main/scala/kafka/server/KafkaHealthcheck.scala > 9dca55c9254948f1196ba17e1d3ebacdcd66be0c > core/src/main/scala/kafka/server/OffsetCheckpoint.scala > b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b > core/src/main/scala/kafka/server/ReplicaManager.scala > f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 > core/src/main/scala/kafka/server/TopicConfigManager.scala > 42e98dd66f3269e6e3a8210934dabfd65df2dba9 > core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala > b189619bdc1b0d2bba8e8f88467fce014be96ccd > core/src/main/scala/kafka/utils/ZkUtils.scala > b42e52b8e5668383b287b2a86385df65e51b5108 > core/src/test/scala/unit/kafka/admin/AdminTest.scala > 59de1b469fece0b28e1d04dcd7b7015c12576abb > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > 8df0982a1e71e3f50a073c4ae181096d32914f3e > core/src/test/scala/unit/kafka/server/LogOffsetTest.scala > 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > c0475d07a778ff957ad266c08a7a81ea500debd2 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > 03e6266ffdad5891ec81df786bd094066b78b4c0 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 426b1a7bea1d83a64081f2c6b672c88c928713b7 > > Diff: https://reviews.apache.org/r/17460/diff/ > > > Testing > ------- > > Several integration tests added to test - > > 1. Topic deletion when all replica brokers are alive > 2. Halt and resume topic deletion after a follower replica is restarted > 3. Halt and resume topic deletion after a controller failover > 4. Request handling during topic deletion > 5. Topic deletion and partition reassignment in parallel > 6. Topic deletion and preferred replica election in parallel > 7. Topic deletion and per topic config changes in parallel > > > Thanks, > > Neha Narkhede > >