-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33711
-----------------------------------------------------------


In the follow-up patch that serialize all the admin tasks in the back ground 
thread, I would suggest switching away from using the callbacks to trigger 
state change while executing the process but depending on some ZK path change, 
as we did for partition re-assignment. Since the controller-broker 
communication is already async, I think it is OK to not retry the 
stopReplicaRequest, but let the brokers to detect some replicas it currently 
holds have already be deleted through MetadataRequest, which will become the 
source of truth anyways.


core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
<https://reviews.apache.org/r/17460/#comment63321>

    Ditto as below



core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
<https://reviews.apache.org/r/17460/#comment63320>

    This import may be removed: this is the only change in this file.



core/src/main/scala/kafka/api/StopReplicaResponse.scala
<https://reviews.apache.org/r/17460/#comment63330>

    Could you change to (topicAndPartition, errorCode) <- responseMap ?



core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
<https://reviews.apache.org/r/17460/#comment63331>

    Ditto as above.



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment63334>

    Instead of checking the replicaId == -1 case here, I feel it is better to 
handle it in ReplicaStateMachine.handleStateChange function, for indicating the 
devs that this is possible that the leader becomes -1.



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment63335>

    Ditto as above.



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment63355>

    When the broker is down, the RequestSendThread will just keep trying 
resend, and the callback function will not be executed until the broker is back 
and a receive is returned from channel.receive(), is that correct? If yes, then 
will the process be blocked during the time the broker is down?



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63342>

    I remember the coding principle for this function is to omit ()?



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63341>

    It is an exception case if topicsTobeDeleted.!contains(topics).



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63343>

    Ditto above



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63352>

    Use inLock here?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63350>

    On general comment is that since for each test we need to setup ZK and 
Broker before and tear them down after, which could be dominant time consuming 
in running these tests, maybe we can merge some of the testcases into one?


- Guozhang Wang


On Feb. 5, 2014, 5:31 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 5, 2014, 5:31 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock 
> instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to 
> TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, 
> unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup 
> of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests 
> pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the 
> new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that 
> are queued up for deletion, since topic deletetion is halted until partition 
> reassignment can finish anyway. We need to let partition reassignment finish 
> (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking 
> around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
> a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala 
> d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
> fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
> ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
> 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 
> d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
> 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
> b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 
> 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala 
> b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
> 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>

Reply via email to