> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, line 
> > 220
> > <https://reviews.apache.org/r/17460/diff/7/?file=462654#file462654line220>
> >
> >     Why not only limit to the topic-partitions relevant to this 
> > leaderAndIsrRequest?

With this patch, UpdateMetadata request is the mechanism used by the brokers to 
know which topics are live and which topics it will reject incoming requests 
for. Hence, this list always has to be the full list of active topics in the 
cluster. As such, it will not include topics in /admin/delete_topics


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, line 
> > 304
> > <https://reviews.apache.org/r/17460/diff/7/?file=462654#file462654line304>
> >
> >     You mean just put all in a single StopReplicaRequest? If so, any reason 
> > not to do it now?

Was planning on making the change after I received a more detailed review. Will 
probably include it in the next patch.


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, line 555
> > <https://reviews.apache.org/r/17460/diff/7/?file=462656#file462656line555>
> >
> >     I'm unclear whether this and the call from onPreferredReplica.. are 
> > really required. i.e., prior to those operations beginning we do check if 
> > those topics are being deleted but I haven't fully thought this through.

The comment above it -

// signal delete topic thread if reassignment for some partitions belonging to 
topics being deleted just completed

Basically, if reassignment was in progress when delete topic is issued, we 
pause/halt the deletion of that topic until the reassignment finishes. Same 
with preferred replica election. Hence the resume() call.


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 101
> > <https://reviews.apache.org/r/17460/diff/7/?file=462660#file462660line101>
> >
> >     Can you fix up the comment a bit? i.e., the lock will in fact be 
> > released while awaiting, so maybe you should just say that the lock should 
> > be acquired before calling.

I'm certain you have OCD :)


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, line 1195
> > <https://reviews.apache.org/r/17460/diff/7/?file=462656#file462656line1195>
> >
> >     From the lock javadoc:
> >     Note that Lock instances are just normal objects and can themselves be 
> > used as the target in a synchronized statement. Acquiring the monitor lock 
> > of a Lock instance has no specified relationship with invoking any of the 
> > lock() methods of that instance. It is recommended that to avoid confusion 
> > you never use Lock instances in this way, except within their own 
> > implementation. 
> >     
> >     So you may need to still synchronize on this lock object before calling 
> > controllerLock.lock - or switch to lock/unlock everywhere we currently use 
> > synchronized.

Good point! Will change all those instances to lock() instead of synchronized.


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/DeleteTopicsThread.scala, line 30
> > <https://reviews.apache.org/r/17460/diff/7/?file=462655#file462655line30>
> >
> >     Why is the Set.empty[String] needed?

That's done to initialize an immutable set with a set of values.


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33668
-----------------------------------------------------------


On Feb. 1, 2014, 10:58 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2014, 10:58 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Removed init() API from TopicDeletionManager and added docs to 
> TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, 
> unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup 
> of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests 
> pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the 
> new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that 
> are queued up for deletion, since topic deletetion is halted until partition 
> reassignment can finish anyway. We need to let partition reassignment finish 
> (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking 
> around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
> a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala 
> d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
> fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
> ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
> 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 
> d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
> 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
> b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 
> 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala 
> b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
> 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>

Reply via email to