> On July 16, 2014, 11:09 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/common/Cluster.java, line 18
> > <https://reviews.apache.org/r/23516/diff/1/?file=632640#file632640line18>
> >
> >     Do we ever want to use * in imports?

IDE did the optimization since there are too many imports from the same package.


> On July 16, 2014, 11:09 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java, line 
> > 117
> > <https://reviews.apache.org/r/23516/diff/1/?file=632642#file632642line117>
> >
> >     What is this field used for?

This is to stick any user specific metadata (e.g., external offset in an 
indexer that needs to match the Kafka offset).


> On July 16, 2014, 11:09 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java, line 
> > 177
> > <https://reviews.apache.org/r/23516/diff/1/?file=632642#file632642line177>
> >
> >     I thought the broker will also check consumer id and generation id for 
> > offset fetch request?

Those fields are needed in OffsetCommitRequests since they are changing the 
offset values. Not sure if we need to do the check for the reader though.


> On July 16, 2014, 11:09 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java, line 
> > 224
> > <https://reviews.apache.org/r/23516/diff/1/?file=632642#file632642line224>
> >
> >     This is a long lasting problem: replica_id here can not only be the id 
> > of a follower replica, but also one of the two values: "-1" for ordinary 
> > consumer, or "-2" for debugging consumer. We use this field into folds, 1) 
> > logging for trouble shooting, which can be helpful only when this is from a 
> > follower replica, 2) deciding if it is from the consumer or a replica, for 
> > this purpose we do not really care about this id value. We should probably 
> > rename this field.

We can do that, but that needs to be done consistently in other places. Could 
you file a separate jira?


> On July 16, 2014, 11:09 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java, line 
> > 328
> > <https://reviews.apache.org/r/23516/diff/1/?file=632642#file632642line328>
> >
> >     For the customized rebalancer, will that be supported as: 1) user 
> > implement their rebalancer and re-deploy the server with this new code, 2) 
> > bounce the consumer setting the new rebalancer? Here I assume the 
> > "strategy" will just be the class name?

We will probably have a few predefined strategies. Adding customized strategies 
requires a broker bounce. Not sure if we will use the class name or some 
numerical identifier.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23516/#review47940
-----------------------------------------------------------


On July 15, 2014, 6:36 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23516/
> -----------------------------------------------------------
> 
> (Updated July 15, 2014, 6:36 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1462
>     https://issues.apache.org/jira/browse/KAFKA-1462
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. I kept the request objects in the server separated from those in the 
> client. This is because (1) some of the existing request objects are part of 
> old client api (FetechRequest, OffsetCommitRequest, etc) and we can't remove 
> them until the old clients are removed, (2) changing existing request objects 
> on the server side requires significant refactoring.
> 
> 2. On the client side, I refactored existing request objects a bit. Now, 
> every request/response object extends from a GenericStruct. GenericStruct 
> provides a standard way for doing serialization and toString so that we don't 
> have to do that on every request. Each request/response can be constructed in 
> two ways: (1) by providing request specific fields; (2) by providing a 
> struct. The latter is used for getting a request/response from its serialized 
> format.
> 
> 3. On the server side. What I did is to keep the existing requests more or 
> less untouched. For new types of requests, create a thin wrapper on the 
> server side so that it can leverage the request objects created on the client 
> side. This way the server side object will share the serialization and the 
> toString logic with the client side object. In order to do this, I removed 
> correlationId from the RequestOrResponse object. There is only one place 
> where correlationId is directly referenced and it is not necessary.
> 
> 4. Multi-version support. We now need to support two versions of 
> OffsetCommitRequest since for the new consumer work, we added two extra 
> fields in the request. For simplicity, both versions are converted to the 
> same request object. Since the old version doesn't have the new fields, 
> defaults will be used.
> 
> 5. The new requests/responses are based on the format described in 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Requestformats.
>  I made some minor changes to the wiki so that the new requests follow the 
> current standard.
> 
> 6. Added some missing util functions and added unit test for testing the 
> serialization/deserialization logic.
> 
> 
> Diffs
> -----
> 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> a016269512b6d6d6e0fd3fab997e9c8265024eb4 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> c62707ab3aba26771fc4b993df28bf8c44f32309 
>   clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
> 6fe7573973832615976defa37fe0dfbb8f911939 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
> 044b03061802ee5e8ea4f1995fb0988e1a70e9a7 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
> 8cecba50bf067713184208552af36469962cd628 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
> f35bd87cf0c52a30ed779b25d60bbe64a60b9502 
>   
> clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
> 2652c32f123b3bc4b0456d4bc9fbba52c051724c 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> 6036f6af1c55c1b0a15471e79b229b17f50ce31c 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 6cf4fb714916f1a318d788cde8fc0aad9dfb83ca 
>   clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java 
> 66cc2fea6443968e525419a203dbc4227e0b1cdf 
>   clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java 
> 257b8287757e40349ea041ed7a651993007a55a8 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 2f98192b064d1ce7c0779e901293edb8c3801915 
>   
> clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
>  PRE-CREATION 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
> dfad6e6534dd9b00099d110804899080e8d832ab 
>   core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
> c72ca14708a3625cb89d5fb92630138d2afa2bf0 
>   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
> 7dacb2023788064b736df8b775aaf12281d545b5 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
> 46ec3db28f88bbf9e0b0de2133807dc552bcae13 
>   core/src/main/scala/kafka/api/FetchRequest.scala 
> a8b73acd1a813284744359e8434cb52d22063c99 
>   core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> 3e408174dcc7e8dd9097bae41277ee4f7160afb3 
>   core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala 
> f63644448bb5a1d560f79427284ccbac9d46b789 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
> 630768ab57afb579049bcbc5d44ee6823b0e7cc2 
>   core/src/main/scala/kafka/api/OffsetCommitResponse.scala 
> 4946e9729ecbf3da35bdab5c832d26977c107e9e 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
> a32f8588ff02f5fb3c99fb8e5508f462923e8edc 
>   core/src/main/scala/kafka/api/OffsetFetchResponse.scala 
> c1222f422ddb6413bbb2e5da2980903ee70b9156 
>   core/src/main/scala/kafka/api/OffsetRequest.scala 
> 7cbc26c6e38420aa57046a76087fe6d15df72477 
>   core/src/main/scala/kafka/api/OffsetResponse.scala 
> 0e1d6e362a1cec8250cf3930d3046058be4ae192 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 
> 0c295a2fe6712a77cd24719cb42015e2f787b08d 
>   core/src/main/scala/kafka/api/ProducerResponse.scala 
> 5a1d8015379b1f5d9130d9edca89544ee7dd0039 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> fbfc9d3aeaffed4ca85902125fcc1050086835db 
>   core/src/main/scala/kafka/api/RequestOrResponse.scala 
> 57f87a48c5e87220e7f377b23d2bbfa0d16350dc 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 68fc1389ee71122adb716d9d821dd8987a78ecee 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala 
> c90ddee3d820472236ab554cddd2e0db24233ae3 
>   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
> a319f2f438bfd84c63edb330685b2b41d4b08aa0 
>   core/src/main/scala/kafka/api/TopicMetadataResponse.scala 
> f6b7429faeab34d0938cb2f78ce91021be7c4b85 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 543e262b25a946abd84dd58dc5fcee67c6252375 
>   core/src/main/scala/kafka/api/UpdateMetadataResponse.scala 
> c583c1f00c89a993fb9dc280f190c32ea895dca5 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> 8763968fbff697e4c5c98ab1274627c192a4d26a 
>   core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 
> 08dcc5553ccac7fbec0ea2662b402e2cec079e48 
>   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
> 7e6da164a26b1893c26c624a9998d4fedf8af95e 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> a2117b34c2ee3554602fe068eed0c90b075958c1 
> 
> Diff: https://reviews.apache.org/r/23516/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>

Reply via email to