> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > Already checked-in so this is really a follow-up review.
> > 
> > My overall take on the implementation is that it is (perhaps - because I'm
> > not 100 percent sure myself) complex mainly to handle corner cases which are
> > rare but I think recoverable. i.e., if we assume (and it may not be a valid
> > assumption) that topics will not/should never be deleted when there is live
> > traffic to a topic then just the call-backs and user-issued re-attempts on
> > failed deletion would be sufficient. We can talk more about that, but what
> > you have implemented is definitely more convenient and complete for the 
> > user.
> > 
> > Also, I encountered this problem while trying it out:
> > - bring up a broker
> > - ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc 
> > --sync
> >   < send a few messages >
> > - ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic abc
> > - I looked at state-change.log and made sure deletion completed
> > - ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc 
> > --sync
> >   < can never produce >
> > 
> > I see these on the server:
> > 
> > [2014-02-07 18:34:21,229] WARN [KafkaApi-0] Produce request with 
> > correlation id 2 from client  on partition [abc,0] failed due to Partition 
> > [abc,0] doesn't exist on 0 (kafka.server.KafkaApis)
> > [2014-02-07 18:34:21,229] INFO [KafkaApi-0] Send the close connection 
> > response due to error handling produce request [clientId = , correlationId 
> > = 2, topicAndPartition = [abc,0]] with Ack=0 (kafka.server.KafkaApis)
> > [2014-02-07 18:34:26,755] INFO Closing socket connection to /127.0.0.1. 
> > (kafka.network.Processor)
> > [2014-02-07 18:34:26,756] WARN [KafkaApi-0] Produce request with 
> > correlation id 9 from client  on partition [abc,1] failed due to Partition 
> > [abc,1] doesn't exist on 0 (kafka.server.KafkaApis)
> > [2014-02-07 18:34:26,756] INFO [KafkaApi-0] Send the close connection 
> > response due to error handling produce request [clientId = , correlationId 
> > = 9, topicAndPartition = [abc,1]] with Ack=0 (kafka.server.KafkaApis)
> > 
> > I had to bounce the broker to be able to produce again.
> > 
> > What did I do wrong? I can debug this later, but I'm going home soon :)
> >

Regarding your test, I realized that it is a bug that needs to be fixed. Will 
upload a patch later.

The case you mention looks at delete topic as just an admin initiated command 
that the admin can retry later. That is if there aren't other state changes 
going on in the cluster during the retry as well. Frequent state changes aren't 
as rare as you think, thereby making manual retries undesirable. For example, 
we have an automatic preferred replica election feature that will detect 
imbalance and trigger a preferred replica election one partition at a time. Now 
imagine you happen to try topic deletion during this time, which fails due one 
partition undergoing preferred replica election. You say, well fine, let me 
retry delete topic (hoping it succeeds this time) and it fails again. This time 
due to some other partition for that topic undergoing preferred replica 
election. This is ignoring the fact that we have no clear way to tell the user 
why the delete topic failed in the first place. Now extend this problem when we 
add the automatic TTL based topic deletion feature. Topic deletion 
 will get triggered by some TTL when other state changes are being issued by 
the admin at the same time. Of course, this time again, the admin has no way to 
tell that there is a TTL based topic deletion about to take place. This is 
still fine. You can argue that the controller can retry the TTL based topic 
deletion until the topic is deleted. But if you don't maintain the states of 
the respective replicas, how do you know when to retry? If you retry 
arbitrarily, then you risk hitting state changes due to other automatically 
triggered state changes (preferred replica elections). 

Overall, you see how, as we introduce these various periodic state changes, it 
is going to cause more operational overhead for retrying admin commands that 
fail with no particular explanation of what happened. I rather prefer the 
controller to be defensive enough to not get itself into a bad state by letting 
these state changes interleave and I also think that the guarantee that a 
delete topic, once issued, will always complete, leads to better user 
experience. Once we have the cancel feature added to all admin commands, topic 
will be deleted unless the operation is cancelled before starting. 


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, line 758
> > <https://reviews.apache.org/r/17460/diff/12/?file=471027#file471027line758>
> >
> >     I think foldLeft's of this form can be simplified and made clearer by 
> > using exists.
> >     
> >     e.g., here:
> >     .filter(r => r._2.exists((res, r) => 
> > !controllerContext.liveBrokerIds.contains(r)))

I think you meant replicas.exists(r => 
!controllerContext.liveBrokerIds.contains(r)). Fixed it.


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, line 765
> > <https://reviews.apache.org/r/17460/diff/12/?file=471027#file471027line765>
> >
> >     You mean topics halted (due to replicas on dead brokers) or ineligible 
> > (due to reassignment/preferred leader election) correct? Can you update the 
> > message?

Halted <-> ineligible. Halted as a result of being ineligible. There are 3 
conditions when topics become ineligible and as a result halted for deletion -
1. broker hosting at least one replica goes down
2. preferred replica election is in progress
3. partition reassignment is in progress


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/PartitionStateMachine.scala, line 416
> > <https://reviews.apache.org/r/17460/diff/12/?file=471029#file471029line416>
> >
> >     yay..
> >

I can't seem to find this TODO anywhere in PartitionStateMachine. Were you 
looking at the latest patch or what's in trunk right now?


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/PartitionStateMachine.scala, line 195
> > <https://reviews.apache.org/r/17460/diff/12/?file=471029#file471029line195>
> >
> >     logging can be updated - i.e., not necessary online -> offline.
> >     
> >     Should probably use (%s to %s).format (currState, targetState) here and 
> > elsewhere in handleStateChange.

Good point. Fixed.


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala, line 361
> > <https://reviews.apache.org/r/17460/diff/12/?file=471030#file471030line361>
> >
> >     Would prefer ReplicaDeletionIneligible:
> >     - since it isn't really a failure... i.e., we should eventually resume.
> >     - and I prefer "Ineligible" to "Halted" because I think it is weird to 
> > have replicas on dead brokers to come back up _have_ to go through a state 
> > called ReplicaDeletion_Failed_ if there was in fact no attempt at deletion.

Well, in some cases it is a failure since if the broker returns a response with 
some error code, it is marked as failed. Also I think it is weird that from a 
Started state you go to Ineligible instead of Failed. I can see it both ways 
though. Agree on your point regarding dead brokers having to go through a state 
called ReplicaDeletionFailed though. Will change this


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33961
-----------------------------------------------------------


On Feb. 6, 2014, 7:37 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 7:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's follow up comments
> 
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress 
> set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, 
> which means that successfully deleted replicas will not be retried unless 
> there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor 
> pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock 
> instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to 
> TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, 
> unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup 
> of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests 
> pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the 
> new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that 
> are queued up for deletion, since topic deletetion is halted until partition 
> reassignment can finish anyway. We need to let partition reassignment finish 
> (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking 
> around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
> a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala 
> d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
> fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
> ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
> 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 
> d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
> 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
> b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 
> 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala 
> b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
> 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>

Reply via email to