> On July 16, 2014, 11:09 p.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/common/Cluster.java, line 18 > > <https://reviews.apache.org/r/23516/diff/1/?file=632640#file632640line18> > > > > Do we ever want to use * in imports?
IDE did the optimization since there are too many imports from the same package. > On July 16, 2014, 11:09 p.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java, line > > 117 > > <https://reviews.apache.org/r/23516/diff/1/?file=632642#file632642line117> > > > > What is this field used for? This is to stick any user specific metadata (e.g., external offset in an indexer that needs to match the Kafka offset). > On July 16, 2014, 11:09 p.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java, line > > 177 > > <https://reviews.apache.org/r/23516/diff/1/?file=632642#file632642line177> > > > > I thought the broker will also check consumer id and generation id for > > offset fetch request? Those fields are needed in OffsetCommitRequests since they are changing the offset values. Not sure if we need to do the check for the reader though. > On July 16, 2014, 11:09 p.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java, line > > 224 > > <https://reviews.apache.org/r/23516/diff/1/?file=632642#file632642line224> > > > > This is a long lasting problem: replica_id here can not only be the id > > of a follower replica, but also one of the two values: "-1" for ordinary > > consumer, or "-2" for debugging consumer. We use this field into folds, 1) > > logging for trouble shooting, which can be helpful only when this is from a > > follower replica, 2) deciding if it is from the consumer or a replica, for > > this purpose we do not really care about this id value. We should probably > > rename this field. We can do that, but that needs to be done consistently in other places. Could you file a separate jira? > On July 16, 2014, 11:09 p.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java, line > > 328 > > <https://reviews.apache.org/r/23516/diff/1/?file=632642#file632642line328> > > > > For the customized rebalancer, will that be supported as: 1) user > > implement their rebalancer and re-deploy the server with this new code, 2) > > bounce the consumer setting the new rebalancer? Here I assume the > > "strategy" will just be the class name? We will probably have a few predefined strategies. Adding customized strategies requires a broker bounce. Not sure if we will use the class name or some numerical identifier. - Jun ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23516/#review47940 ----------------------------------------------------------- On July 15, 2014, 6:36 p.m., Jun Rao wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23516/ > ----------------------------------------------------------- > > (Updated July 15, 2014, 6:36 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1462 > https://issues.apache.org/jira/browse/KAFKA-1462 > > > Repository: kafka > > > Description > ------- > > 1. I kept the request objects in the server separated from those in the > client. This is because (1) some of the existing request objects are part of > old client api (FetechRequest, OffsetCommitRequest, etc) and we can't remove > them until the old clients are removed, (2) changing existing request objects > on the server side requires significant refactoring. > > 2. On the client side, I refactored existing request objects a bit. Now, > every request/response object extends from a GenericStruct. GenericStruct > provides a standard way for doing serialization and toString so that we don't > have to do that on every request. Each request/response can be constructed in > two ways: (1) by providing request specific fields; (2) by providing a > struct. The latter is used for getting a request/response from its serialized > format. > > 3. On the server side. What I did is to keep the existing requests more or > less untouched. For new types of requests, create a thin wrapper on the > server side so that it can leverage the request objects created on the client > side. This way the server side object will share the serialization and the > toString logic with the client side object. In order to do this, I removed > correlationId from the RequestOrResponse object. There is only one place > where correlationId is directly referenced and it is not necessary. > > 4. Multi-version support. We now need to support two versions of > OffsetCommitRequest since for the new consumer work, we added two extra > fields in the request. For simplicity, both versions are converted to the > same request object. Since the old version doesn't have the new fields, > defaults will be used. > > 5. The new requests/responses are based on the format described in > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Requestformats. > I made some minor changes to the wiki so that the new requests follow the > current standard. > > 6. Added some missing util functions and added unit test for testing the > serialization/deserialization logic. > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > a016269512b6d6d6e0fd3fab997e9c8265024eb4 > clients/src/main/java/org/apache/kafka/common/Cluster.java > c62707ab3aba26771fc4b993df28bf8c44f32309 > clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java > 6fe7573973832615976defa37fe0dfbb8f911939 > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java > 044b03061802ee5e8ea4f1995fb0988e1a70e9a7 > clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java > 8cecba50bf067713184208552af36469962cd628 > > clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java > f35bd87cf0c52a30ed779b25d60bbe64a60b9502 > > clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java > 2652c32f123b3bc4b0456d4bc9fbba52c051724c > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java > 6036f6af1c55c1b0a15471e79b229b17f50ce31c > clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java > 6cf4fb714916f1a318d788cde8fc0aad9dfb83ca > clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java > 66cc2fea6443968e525419a203dbc4227e0b1cdf > clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java > 257b8287757e40349ea041ed7a651993007a55a8 > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 2f98192b064d1ce7c0779e901293edb8c3801915 > > clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java > PRE-CREATION > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala > dfad6e6534dd9b00099d110804899080e8d832ab > core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala > c72ca14708a3625cb89d5fb92630138d2afa2bf0 > core/src/main/scala/kafka/api/ControlledShutdownRequest.scala > 7dacb2023788064b736df8b775aaf12281d545b5 > core/src/main/scala/kafka/api/ControlledShutdownResponse.scala > 46ec3db28f88bbf9e0b0de2133807dc552bcae13 > core/src/main/scala/kafka/api/FetchRequest.scala > a8b73acd1a813284744359e8434cb52d22063c99 > core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala > PRE-CREATION > core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala PRE-CREATION > core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala PRE-CREATION > core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala PRE-CREATION > core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala PRE-CREATION > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > 3e408174dcc7e8dd9097bae41277ee4f7160afb3 > core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala > f63644448bb5a1d560f79427284ccbac9d46b789 > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > 630768ab57afb579049bcbc5d44ee6823b0e7cc2 > core/src/main/scala/kafka/api/OffsetCommitResponse.scala > 4946e9729ecbf3da35bdab5c832d26977c107e9e > core/src/main/scala/kafka/api/OffsetFetchRequest.scala > a32f8588ff02f5fb3c99fb8e5508f462923e8edc > core/src/main/scala/kafka/api/OffsetFetchResponse.scala > c1222f422ddb6413bbb2e5da2980903ee70b9156 > core/src/main/scala/kafka/api/OffsetRequest.scala > 7cbc26c6e38420aa57046a76087fe6d15df72477 > core/src/main/scala/kafka/api/OffsetResponse.scala > 0e1d6e362a1cec8250cf3930d3046058be4ae192 > core/src/main/scala/kafka/api/ProducerRequest.scala > 0c295a2fe6712a77cd24719cb42015e2f787b08d > core/src/main/scala/kafka/api/ProducerResponse.scala > 5a1d8015379b1f5d9130d9edca89544ee7dd0039 > core/src/main/scala/kafka/api/RequestKeys.scala > fbfc9d3aeaffed4ca85902125fcc1050086835db > core/src/main/scala/kafka/api/RequestOrResponse.scala > 57f87a48c5e87220e7f377b23d2bbfa0d16350dc > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 68fc1389ee71122adb716d9d821dd8987a78ecee > core/src/main/scala/kafka/api/StopReplicaResponse.scala > c90ddee3d820472236ab554cddd2e0db24233ae3 > core/src/main/scala/kafka/api/TopicMetadataRequest.scala > a319f2f438bfd84c63edb330685b2b41d4b08aa0 > core/src/main/scala/kafka/api/TopicMetadataResponse.scala > f6b7429faeab34d0938cb2f78ce91021be7c4b85 > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 543e262b25a946abd84dd58dc5fcee67c6252375 > core/src/main/scala/kafka/api/UpdateMetadataResponse.scala > c583c1f00c89a993fb9dc280f190c32ea895dca5 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > 8763968fbff697e4c5c98ab1274627c192a4d26a > core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala > 08dcc5553ccac7fbec0ea2662b402e2cec079e48 > core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala > 7e6da164a26b1893c26c624a9998d4fedf8af95e > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > a2117b34c2ee3554602fe068eed0c90b075958c1 > > Diff: https://reviews.apache.org/r/23516/diff/ > > > Testing > ------- > > > Thanks, > > Jun Rao > >