> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote: > > In the follow-up patch that serialize all the admin tasks in the back > > ground thread, I would suggest switching away from using the callbacks to > > trigger state change while executing the process but depending on some ZK > > path change, as we did for partition re-assignment. Since the > > controller-broker communication is already async, I think it is OK to not > > retry the stopReplicaRequest, but let the brokers to detect some replicas > > it currently holds have already be deleted through MetadataRequest, which > > will become the source of truth anyways.
Ya, I thought about the signal vs zookeeper path and I didn't see a point to go through zookeeper for something that doesn't require any distributed co-ordination at all and as such using zookeeper like a database and signal mechanism is not a good idea. Having said that, the reason we use the zookeeper path notification for partition reassignment is because the common ISR path for a partition can be updated by not just the controller but also the brokers, thereby requiring a zookeeper based watch mechanism. Delete topic, on the other hand, requires the controller itself to maintain internal state and signal based of that. Also, having the broker delete the logs (as I mentioned while explaining the different design choices for delete topic on the JIRA) is infeasible since it doesn't know the version of the topic. It is easier to let the controller not allow topic deletion to complete unless all replicas have acknowledged their delete from the respective brokers > On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala, line 29 > > <https://reviews.apache.org/r/17460/diff/8/?file=470124#file470124line29> > > > > This import may be removed: this is the only change in this file. Nope. It cannot be removed, it is required to distinguish between Set from Predef and Set from collection. > On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, line > > 307 > > <https://reviews.apache.org/r/17460/diff/8/?file=470128#file470128line307> > > > > When the broker is down, the RequestSendThread will just keep trying > > resend, and the callback function will not be executed until the broker is > > back and a receive is returned from channel.receive(), is that correct? If > > yes, then will the process be blocked during the time the broker is down? Nope. The sendRequest is non blocking. It cannot block until the queue of requests sent to that broker grows a lot. The controller doesn't send too many requests to dead brokers, so I don't see how that can happen unless there is a bug. Topic deletion is halted when at least one replica goes down, which will prevent the controller from sending requests to the dead broker. > On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 119 > > <https://reviews.apache.org/r/17460/diff/8/?file=470134#file470134line119> > > > > I remember the coding principle for this function is to omit ()? That is, if the API makes no internal state changes and this one does. So I included the () > On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 131 > > <https://reviews.apache.org/r/17460/diff/8/?file=470134#file470134line131> > > > > It is an exception case if topicsTobeDeleted.!contains(topics). How? It is set intersection. Either your set intersects and you have a resulting set of non zero size or it doesn't, in which you end up with an empty set > On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote: > > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, line 20 > > <https://reviews.apache.org/r/17460/diff/8/?file=470144#file470144line20> > > > > On general comment is that since for each test we need to setup ZK and > > Broker before and tear them down after, which could be dominant time > > consuming in running these tests, maybe we can merge some of the testcases > > into one? Ya, agree that these tests run longer. At the same time, these tests also validate some important test cases. I guess once we improve our system tests, we can move some of these tests there. Until then, it is useful to have these tests act as integration tests. We could also merge some of them together and if we do, then we have to put up with really large testcases, which will be hard to understand and maintain. Taking your point though, the tests for validating different request types could be clubbed together without losing readability. - Neha ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17460/#review33711 ----------------------------------------------------------- On Feb. 6, 2014, 3:49 p.m., Neha Narkhede wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17460/ > ----------------------------------------------------------- > > (Updated Feb. 6, 2014, 3:49 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-330 > https://issues.apache.org/jira/browse/KAFKA-330 > > > Repository: kafka > > > Description > ------- > > Refactored tests per Jun and Guozhang's feedback > > > Code refactor to address Jun's and Guozhang's review comments. Tests refactor > pending > > > Joel's review suggestions - Changed the controllerLock instances to inLock > instead of synchronized, fixed some logging > > > Removed init() API from TopicDeletionManager and added docs to > TopicDeletionManager to describe the lifecycle of topic deletion > > > Updated docs for the new states. Removed the changes to log4j.properties > > > Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, > unit tests working > > > Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup > of some APIs pending > > > Changed controller to reference APIs in TopicDeletionManager. All unit tests > pass > > > Introduced a TopicDeletionManager. KafkaController changes pending to use the > new TopicDeletionManager > > > Addressed Guozhang's review comments > > > Fixed docs in a few places > > > Fixed the resume logic for partition reassignment to also include topics that > are queued up for deletion, since topic deletetion is halted until partition > reassignment can finish anyway. We need to let partition reassignment finish > (since it started before topic deletion) so that topic deletion can resume > > > Organized imports > > > Moved offline replica handling to controller failover > > > Reading replica assignment from zookeeper instead of local cache > > > Deleting unused APIs > > > Reverting the change to the stop replica request protocol. Instead hacking > around with callbacks > > > All functionality and all unit tests working > > > Rebased with trunk after controller cleanup patch > > > Diffs > ----- > > core/src/main/scala/kafka/admin/AdminUtils.scala > a167756f0fd358574c8ccb42c5c96aaf13def4f5 > core/src/main/scala/kafka/admin/TopicCommand.scala > 842c11047cca0531fbc572fdb25523244ba2b626 > core/src/main/scala/kafka/api/ControlledShutdownResponse.scala > a80aa4924cfe9a4670591d03258dd82c428bc3af > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > a984878fbd8147b21211829a49de511fd1335421 > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 820f0f57b00849a588a840358d07f3a4a31772d4 > core/src/main/scala/kafka/api/StopReplicaResponse.scala > d7e36308263aec2298e8adff8f22e18212e33fca > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > ea8485b479155b479c575ebc89a4f73086c872cb > core/src/main/scala/kafka/controller/KafkaController.scala > a0267ae2670e8d5f365e49ec0fa5db1f62b815bf > core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala > fd9200f3bf941aab54df798bb5899eeb552ea3a3 > core/src/main/scala/kafka/controller/PartitionStateMachine.scala > ac4262a403fc73edaecbddf55858703c640b11c0 > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala > 483559aa64726c51320d18b64a1b48f8fb2905a0 > core/src/main/scala/kafka/controller/TopicDeletionManager.scala > PRE-CREATION > core/src/main/scala/kafka/network/BlockingChannel.scala > d22dabdf4fc2346c5487b9fd94cadfbcab70040d > core/src/main/scala/kafka/server/KafkaApis.scala > bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e > core/src/main/scala/kafka/server/KafkaHealthcheck.scala > 9dca55c9254948f1196ba17e1d3ebacdcd66be0c > core/src/main/scala/kafka/server/OffsetCheckpoint.scala > b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b > core/src/main/scala/kafka/server/ReplicaManager.scala > f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 > core/src/main/scala/kafka/server/TopicConfigManager.scala > 42e98dd66f3269e6e3a8210934dabfd65df2dba9 > core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala > b189619bdc1b0d2bba8e8f88467fce014be96ccd > core/src/main/scala/kafka/utils/ZkUtils.scala > b42e52b8e5668383b287b2a86385df65e51b5108 > core/src/test/scala/unit/kafka/admin/AdminTest.scala > 59de1b469fece0b28e1d04dcd7b7015c12576abb > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > 8df0982a1e71e3f50a073c4ae181096d32914f3e > core/src/test/scala/unit/kafka/server/LogOffsetTest.scala > 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > c0475d07a778ff957ad266c08a7a81ea500debd2 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > 03e6266ffdad5891ec81df786bd094066b78b4c0 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 426b1a7bea1d83a64081f2c6b672c88c928713b7 > > Diff: https://reviews.apache.org/r/17460/diff/ > > > Testing > ------- > > Several integration tests added to test - > > 1. Topic deletion when all replica brokers are alive > 2. Halt and resume topic deletion after a follower replica is restarted > 3. Halt and resume topic deletion after a controller failover > 4. Request handling during topic deletion > 5. Topic deletion and partition reassignment in parallel > 6. Topic deletion and preferred replica election in parallel > 7. Topic deletion and per topic config changes in parallel > > > Thanks, > > Neha Narkhede > >