> On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/TopicCommandHelper.scala, lines 1-17 > > <https://reviews.apache.org/r/29301/diff/7/?file=821380#file821380line1> > > > > One general comment: > > > > For some topic commands, why use AdminUtils to write ZK path again > > instead of handle it via the controller directly? Or this is still WIP? > > Andrii Biletskyi wrote: > Not sure I understand you. You mean technially calling ZK client from > Controller class, not through TopicCommandHelper? If so - it's just to leave > KafkaApi clean and small. > > Guozhang Wang wrote: > For example, upon receiving a create-topic request, the helper class will > call AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK() which will > just write this request to ZK admin path for it to be captured by controller; > however since only the broker with the active controller will receive such > requests why don't we just hand off the request from KafkaApi to the > controller to handle it. > > One question, though, is that we need to make sure concurrency is correct > for controller handling multiple such tasks, and we have some thoughts about > how to deal with such cases (see Jiangjie and my commnets in KAFKA-1305).
Thanks for explanation. So instead of current workflow: CreateTopicRequest -> Helper class -> AdminUtils -> zk path is created -> Controller's changeTopicListener picks up the change -> topic is created You propose: CreateTopicRequest -> Controller directly executes logic from ChangeTopicListener ? Very interesting idea! Can we make a separate ticket for that? I tried to port TopicCommand "as is" in order to have at least for now working end-to-end infrastructure to handle Admin commands. I believe this is more like refactoring TopicCommand (probably delete- and alterTopic should be changed too). I'm a bit concerned adding this refactoring will require additional efforts to test (especially taking into account your note about KAFKA-1305) and time to agree on approach we will use to address this issue. > On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, lines > > 301-310 > > <https://reviews.apache.org/r/29301/diff/7/?file=821376#file821376line301> > > > > Do not understand the rationale behind this: could you add some > > comments? Particularly, why we want to send an empty metadata map to the > > brokers with forceSendBrokerInfo? > > Andrii Biletskyi wrote: > Thanks, this is done because on startup we don't send UpdateMetadaRequest > (updateMetadataRequestMap is empty) and thus brokers' cache is not filled > with brokers and controller. This leads to ClusterMetadataRequest can't be > served correctly. > I'm not sure this is the best way to do it, open for suggestions. > > Guozhang Wang wrote: > In this case can we just use addUpdateMetadataRequestForBrokers() before > calling sendRequestsToBrokers()? If I understood correctly - addUpdateMetadataRequestForBrokers() is already called, it's just nothing is added to UpdateMetadata. The steps are the following: 1. One broker cluster is started (no topics) 2. KafkaController.onControllerFailover() is called 3. sendUpdateMetadataRequest() 4. addUpdateMetadataRequest(): updateMetadataRequest is created foreach controllerContext.partitionLeadershipInfo.keySet (which is empty) 5. sendRequestsToBrokers(): we send UpdateMetadata foreach broker from updateMetadataRequestMap (which is empty) -> broker holding a controller's role doesn't receive UpdateMetadataRequest So essentially the problem is that UpdateMetadaRequest holds data about controller, brokers _and_ partitionState but we send UpdateMetadaRequest only if there is partitionState update to be sent. > On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminRequest.java, > > lines 1-28 > > <https://reviews.apache.org/r/29301/diff/7/?file=821321#file821321line1> > > > > Wondering if an abstract admin request is necessary, as it does not > > have many common interface functions. > > Andrii Biletskyi wrote: > This is needed to avoid code dupliaction in admin clients. See > RequestDispatcher for example. > You will need to call admin request and get response of that type. Having > AbstractAdminRequest (specifically createResponseCounterpart) lets you have: > ``` > public <T extends AbstractAdminResponse> T > sendAdminRequest(AbstractAdminRequest<T> abstractRequest) throws Exception { > ``` > Instead of sendCreateTopicRequest, sendAlter... etc. If there is a better > and cleaner way to achive this - please let me know. > > Guozhang Wang wrote: > I see. How about changing "sendAdminRequest(AbstractAdminRequest<T>)" to > "sendRequest(ClientRequest)" and the caller like AlterTopicCommand.execute() > will be: > > AlterTopicRequest alterTopicRequest = // create the request > ClientRequest request = new ClientRequest(new RequestSend(...) ...) > dispatcher.sendRequest(request) > > This way we are duplicating the second line here in every upper-level > class, while saving the admin interface. I actually do not know which one is > better.. Yes, but you will also need typed response. Let me continue your example: AlterTopicRequest alterTopicRequest = // create the request ClientRequest request = new ClientRequest(new RequestSend(...) ...) __ClientResponse response = dispatcher.sendRequest(request, ApiKeys.ALTER_TOPIC)__ __AlterTopicResponse alterTopicResponse = new AlterTopicResponse(response.responseBody())__ alterTopicResponse.// now get what you need from typed response And you will have this NetworkClient related Stuff (RequestSend, ClientRequest ...) everywhere in you client code. But it looks pretty strange you can't have generic method to send request and get immidiately response of the required type. So really RequestDispatcher allready has sendRequest() as you suggest, with sendAdminRequest I tried to address issue with getting response counterpart. But I agree that solution might mislead people, so if doesn't worth it - I'm okay to remove intermediate AbstractAdminRequest/Response. - Andrii ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29301/#review70790 ----------------------------------------------------------- On Jan. 14, 2015, 4:07 p.m., Andrii Biletskyi wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/29301/ > ----------------------------------------------------------- > > (Updated Jan. 14, 2015, 4:07 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1694 > https://issues.apache.org/jira/browse/KAFKA-1694 > > > Repository: kafka > > > Description > ------- > > KAFKA-1694 - introduced new type for Wire protocol, ported > ClusterMetadataResponse to it > > > KAFKA-1694 - Split Admin RQ/RP to separate messages > > > KAFKA-1694 - Admin commands can be handled only by controller; > DeleteTopicCommand NPE fix > > > KAFKA-1776 - Ported ConsumerGroupOffsetChecker > > > KAFKA-1776 - Ported PreferredReplicaElectionTool and ReassignPartitionsTool > to CLI > > > KAFKA-1694 - kafka-tools is uploaded on uploadAllArchives > > > KAFKA-1694 - ReviewBoard 29301 code review fixes > > > KAFKA-1694 - Data for ReassignPartitions and PreferredReplicaLeaderElection > is in json string > > > KAFKA-1694 - Added logging > > > KAFKA-1694 - fixed misprint in schema > > > KAFKA-1694 - DescribeTopicCommand supports all flags that TopicCommand does > > > KAFKA-1694 - Fixed compile error for new Selector constructor > > > KAFKA-1694 - Fixed ConsumerGroupChecker sends DescribeTopicResponse instead > of ConsumerGroupOffsetsResponse > > > KAFKA-1694 - Introduced AbstractAdminRequest/Response to avoid code > duplication for sending admin requests / receiving response. > > > Diffs > ----- > > bin/kafka.sh PRE-CREATION > bin/windows/kafka.bat PRE-CREATION > build.gradle c9ac43378c3bf5443f0f47c8ba76067237ecb348 > clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java > 109fc965e09b2ed186a073351bd037ac8af20a4c > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java > 7517b879866fc5dad5f8d8ad30636da8bbe7784a > clients/src/main/java/org/apache/kafka/common/protocol/types/MaybeOf.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java > 121e880a941fcd3e6392859edba11a94236494cc > > clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/AlterTopicResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ConsumerGroupOffsetsResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/CreateTopicResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DeleteTopicResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicOutput.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/DescribeTopicResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsOutput.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ListTopicsResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/PreferredReplicaLeaderElectionResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/ReassignPartitionsResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/TopicConfigDetails.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/TopicPartitionDetails.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyPreferredReplicaLeaderElectionResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/admin/VerifyReassignPartitionsResponse.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java > df37fc6d8f0db0b8192a948426af603be3444da4 > config/tools-log4j.properties 52f07c96019b4083fc78f62cfb0a81080327e847 > core/src/main/scala/kafka/api/ApiUtils.scala > 1f80de1638978901500df808ca5133308c9d1fca > core/src/main/scala/kafka/api/ClusterMetadataRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/ClusterMetadataResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/RequestKeys.scala > c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 > core/src/main/scala/kafka/api/admin/AlterTopicRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/AlterTopicResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/ConsumerGroupOffsetsRequest.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/ConsumerGroupOffsetsResponse.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/CreateTopicRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/CreateTopicResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/DeleteTopicRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/DeleteTopicResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/DescribeTopicRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/DescribeTopicResponse.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/ListTopicsRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/admin/ListTopicsResponse.scala PRE-CREATION > > core/src/main/scala/kafka/api/admin/PreferredReplicaLeaderElectionRequest.scala > PRE-CREATION > > core/src/main/scala/kafka/api/admin/PreferredReplicaLeaderElectionResponse.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/ReassignPartitionsRequest.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/ReassignPartitionsResponse.scala > PRE-CREATION > > core/src/main/scala/kafka/api/admin/VerifyPreferredReplicaLeaderElectionRequest.scala > PRE-CREATION > > core/src/main/scala/kafka/api/admin/VerifyPreferredReplicaLeaderElectionResponse.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/VerifyReassignPartitionsRequest.scala > PRE-CREATION > core/src/main/scala/kafka/api/admin/VerifyReassignPartitionsResponse.scala > PRE-CREATION > core/src/main/scala/kafka/common/AdminRequestFailedException.scala > PRE-CREATION > core/src/main/scala/kafka/common/ErrorMapping.scala > eedc2f5f21dd8755fba891998456351622e17047 > core/src/main/scala/kafka/common/InvalidRequestTargetException.scala > PRE-CREATION > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > eb492f00449744bc8d63f55b393e2a1659d38454 > core/src/main/scala/kafka/controller/KafkaController.scala > 66df6d2fbdbdd556da6bea0df84f93e0472c8fbf > core/src/main/scala/kafka/server/KafkaApis.scala > c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 > core/src/main/scala/kafka/server/MetadataCache.scala > bf81a1ab88c14be8697b441eedbeb28fa0112643 > core/src/main/scala/kafka/server/TopicCommandHelper.scala PRE-CREATION > core/src/main/scala/kafka/tools/ConsumerOffsetCheckerHelper.scala > PRE-CREATION > core/src/main/scala/kafka/tools/PreferredReplicaLeaderElectionHelper.scala > PRE-CREATION > core/src/main/scala/kafka/tools/ReassignPartitionsHelper.scala PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > settings.gradle 83f764e6a4a15a5fdba232dce74a369870f26b45 > tools/src/main/java/org/apache/kafka/cli/BaseCommandOpts.java PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/Boot.java PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/RequestDispatcher.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/Shell.java PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/AlterTopicCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/ClearScreenCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/Command.java PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/CreateTopicCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/DeleteTopicCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/DescribeTopicCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/ExitCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/ListTopicsCommand.java > PRE-CREATION > > tools/src/main/java/org/apache/kafka/cli/command/PreferredReplicaLeaderElectionCommand.java > PRE-CREATION > > tools/src/main/java/org/apache/kafka/cli/command/PrintConsumerGroupOffsetsCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/PrintHelpCommand.java > PRE-CREATION > > tools/src/main/java/org/apache/kafka/cli/command/ReassignPartitionsCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/command/TopicSwitchCommand.java > PRE-CREATION > tools/src/main/java/org/apache/kafka/cli/util/StringUtils.java PRE-CREATION > > Diff: https://reviews.apache.org/r/29301/diff/ > > > Testing > ------- > > > Thanks, > > Andrii Biletskyi > >