Thanks for the review Gwen. 1. The replica assignment protocol takes [replicas], there is the > implicit assumption that the first replica is the leader. This matches > current behavior elsewhere, but lets document it explicitly.
I added this to the wiki and will update the protocol doc string in the patch. 2. I like the timeout, but want to clarify why, since it may not be > obvious to everyone: I tried to describe why a timeout, even if not global, is useful in the "Cluster Consistent Blocking <https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-clusterconsistentblocking>" section. I have a QA that links to that section in the Create Topic section (Fixed the broken link). Below is the relevant text from that section: The intermediate changes in KIP-4 should allow an easy transition to > "complete blocking" when the work can be done. This is supported by > providing optional local blocking in the mean time. This local blocking > only blocks until the local state on the controller is correct. We will > still provide a polling mechanism for users that do not want to block at > all. A polling mechanism is required in the optimal implementation too > because users still need a way to check state after a timeout occurs > because operations like "create topic" are not transactional. Local > blocking has the added benefit of avoiding wasted poll requests to other > brokers when its impossible for the request to be completed. If the > controllers state is not correct, then the other brokers cant be either. > Clients who don't want to validate the entire cluster state is correct can > block on the controller and avoid polling all together with reasonable > confidence that though they may get a retriable error on follow up > requests, the requested change was successful and the cluster will be > accurate eventually. Because we already add a timeout field to the requests wire protocols, > changing the behavior to block until the cluster is consistent in the > future would not require a protocol change. Though the version could be > bumped to indicate a behavior change. Thanks, Grant On Fri, Jun 10, 2016 at 12:34 PM, Gwen Shapira <g...@confluent.io> wrote: > Thank you for the clear proposal, Grant! > > I like the request/response objects and the timeout semantics. Two > comments: > > 1. The replica assignment protocol takes [replicas], there is the > implicit assumption that the first replica is the leader. This matches > current behavior elsewhere, but lets document it explicitly. > > 2. I like the timeout, but want to clarify why, since it may not be > obvious to everyone: > Currently, the response is sent when the controller has sent the > "update metadata" request to the brokers involved with the new topic. > It is a rather weak guarantee, but if clients decide to poll the > brokers for updates, it does reduce the time spent polling. > More important, this behavior is net improvement on current state > (completely async and ZK dependent) and when we do have a way to get > "ack" from replicas, we will be able to add the new behavior without > changing the protocol (just the semantics of waiting). > > Gwen > > On Fri, Jun 10, 2016 at 7:21 PM, Grant Henke <ghe...@cloudera.com> wrote: > > Now that Kafka 0.10 has been released I would like to start work on the > new > > protocol messages and client implementation for KIP-4. In order to break > up > > the discussion and feedback I would like to continue breaking up the > > content in to smaller pieces. > > > > This discussion thread is for the CreateTopic request/response and server > > side implementation. Details for this implementation can be read here: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-CreateTopicRequest > > > > I have included the exact content below for clarity: > > > >> Create Topic Request (KAFKA-2945 > >> <https://issues.apache.org/jira/browse/KAFKA-2945>) > >> > >> > >> CreateTopic Request (Version: 0) => [create_topic_requests] timeout > >> create_topic_requests => topic partitions replication_factor > [replica_assignment] [configs] > >> topic => STRING > >> partitions => INT32 > >> replication_factor => INT32 > >> replica_assignment => partition_id [replicas] > >> partition_id => INT32 > >> replicas => INT32 > >> configs => config_key config_value > >> config_key => STRING > >> config_value => STRING > >> timeout => INT32 > >> > >> CreateTopicRequest is a batch request to initiate topic creation with > >> either predefined or automatic replica assignment and optionally topic > >> configuration. > >> > >> Request semantics: > >> > >> 1. Must be sent to the controller broker > >> 2. Multiple instructions for the same topic in one request will be > >> silently ignored, only the last from the list will be executed. > >> - This is because the list of topics is modeled server side as a > >> map with TopicName as the key > >> 3. The principle must be authorized to the "Create" Operation on the > >> "Cluster" resource to create topics. > >> - Unauthorized requests will receive a > ClusterAuthorizationException > >> 4. > >> > >> Only one from ReplicaAssignment or (Partitions + ReplicationFactor), > can > >> be defined in one instruction. If both parameters are specified - > >> ReplicaAssignment takes precedence. > >> - In the case ReplicaAssignment is defined number of partitions and > >> replicas will be calculated from the supplied ReplicaAssignment. > >> - In the case of defined (Partitions + ReplicationFactor) replica > >> assignment will be automatically generated by the server. > >> - One or the other must be defined. The existing broker side auto > >> create defaults will not be used > >> (default.replication.factor, num.partitions). The client > implementation can > >> have defaults for these options when generating the messages. > >> 5. Setting a timeout > 0 will allow the request to block until the > >> topic metadata is "complete" on the controller node. > >> - Complete means the topic metadata has been completely populated > >> (leaders, replicas, ISRs) > >> - If a timeout error occurs, the topic could still be created > >> successfully at a later time. Its up to the client to query for > the state > >> at that point. > >> 6. The request is not transactional. > >> 1. If an error occurs on one topic, the other could still be > >> created. > >> 2. Errors are reported independently. > >> > >> QA: > >> > >> - Why is CreateTopicRequest a batch request? > >> - Scenarios where tools or admins want to create many topics > should > >> be able to with fewer requests > >> - Example: MirrorMaker may want to create the topics downstream > >> - What happens if some topics error immediately? Will it > >> return immediately? > >> - The request will block until all topics have either been > created, > >> errors, or the timeout has been hit > >> - There is no "short circuiting" where 1 error stops the other > >> topics from being created > >> - Why implement "partial blocking" instead of fully async of fully > >> consistent? > >> - See Cluster Consistent Blocking > >> < > https://cwiki.apache.org/#KIP-4-Commandlineandcentralizedadministrativeoperations-clusterconsistentblocking > > > >> below > >> - Why require the request to go to the controller? > >> - The controller is responsible for the cluster metadata and > >> its propagation > >> - See Request Forwarding > >> < > https://cwiki.apache.org/#KIP-4-Commandlineandcentralizedadministrativeoperations-request > > > >> below > >> > >> Create Topic Response > >> > >> > >> CreateTopic Response (Version: 0) => [topic_error_codes] > >> topic_error_codes => topic error_code > >> topic => STRING > >> error_code => INT16 > >> > >> CreateTopicResponse contains a map between topic and topic creation > >> result error code (see New Protocol Errors > >> < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-NewProtocolErrors > > > >> ). > >> > > > > A sample PR is on github (https://github.com/apache/kafka/pull/1489) > though > > it could change drastically based on the feedback here. > > > > Thanks, > > Grant > > > > -- > > Grant Henke > > Software Engineer | Cloudera > > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke > -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke