----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23516/ -----------------------------------------------------------
(Updated July 15, 2014, 6:36 p.m.) Review request for kafka. Bugs: KAFKA-1462 https://issues.apache.org/jira/browse/KAFKA-1462 Repository: kafka Description (updated) ------- 1. I kept the request objects in the server separated from those in the client. This is because (1) some of the existing request objects are part of old client api (FetechRequest, OffsetCommitRequest, etc) and we can't remove them until the old clients are removed, (2) changing existing request objects on the server side requires significant refactoring. 2. On the client side, I refactored existing request objects a bit. Now, every request/response object extends from a GenericStruct. GenericStruct provides a standard way for doing serialization and toString so that we don't have to do that on every request. Each request/response can be constructed in two ways: (1) by providing request specific fields; (2) by providing a struct. The latter is used for getting a request/response from its serialized format. 3. On the server side. What I did is to keep the existing requests more or less untouched. For new types of requests, create a thin wrapper on the server side so that it can leverage the request objects created on the client side. This way the server side object will share the serialization and the toString logic with the client side object. In order to do this, I removed correlationId from the RequestOrResponse object. There is only one place where correlationId is directly referenced and it is not necessary. 4. Multi-version support. We now need to support two versions of OffsetCommitRequest since for the new consumer work, we added two extra fields in the request. For simplicity, both versions are converted to the same request object. Since the old version doesn't have the new fields, defaults will be used. 5. The new requests/responses are based on the format described in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Requestformats. I made some minor changes to the wiki so that the new requests follow the current standard. 6. Added some missing util functions and added unit test for testing the serialization/deserialization logic. Diffs ----- clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java a016269512b6d6d6e0fd3fab997e9c8265024eb4 clients/src/main/java/org/apache/kafka/common/Cluster.java c62707ab3aba26771fc4b993df28bf8c44f32309 clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 6fe7573973832615976defa37fe0dfbb8f911939 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 044b03061802ee5e8ea4f1995fb0988e1a70e9a7 clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 8cecba50bf067713184208552af36469962cd628 clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java f35bd87cf0c52a30ed779b25d60bbe64a60b9502 clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 2652c32f123b3bc4b0456d4bc9fbba52c051724c clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 6036f6af1c55c1b0a15471e79b229b17f50ce31c clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 6cf4fb714916f1a318d788cde8fc0aad9dfb83ca clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java 66cc2fea6443968e525419a203dbc4227e0b1cdf clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java 257b8287757e40349ea041ed7a651993007a55a8 clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 2f98192b064d1ce7c0779e901293edb8c3801915 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java PRE-CREATION core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala dfad6e6534dd9b00099d110804899080e8d832ab core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala c72ca14708a3625cb89d5fb92630138d2afa2bf0 core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 7dacb2023788064b736df8b775aaf12281d545b5 core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 46ec3db28f88bbf9e0b0de2133807dc552bcae13 core/src/main/scala/kafka/api/FetchRequest.scala a8b73acd1a813284744359e8434cb52d22063c99 core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 3e408174dcc7e8dd9097bae41277ee4f7160afb3 core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala f63644448bb5a1d560f79427284ccbac9d46b789 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 630768ab57afb579049bcbc5d44ee6823b0e7cc2 core/src/main/scala/kafka/api/OffsetCommitResponse.scala 4946e9729ecbf3da35bdab5c832d26977c107e9e core/src/main/scala/kafka/api/OffsetFetchRequest.scala a32f8588ff02f5fb3c99fb8e5508f462923e8edc core/src/main/scala/kafka/api/OffsetFetchResponse.scala c1222f422ddb6413bbb2e5da2980903ee70b9156 core/src/main/scala/kafka/api/OffsetRequest.scala 7cbc26c6e38420aa57046a76087fe6d15df72477 core/src/main/scala/kafka/api/OffsetResponse.scala 0e1d6e362a1cec8250cf3930d3046058be4ae192 core/src/main/scala/kafka/api/ProducerRequest.scala 0c295a2fe6712a77cd24719cb42015e2f787b08d core/src/main/scala/kafka/api/ProducerResponse.scala 5a1d8015379b1f5d9130d9edca89544ee7dd0039 core/src/main/scala/kafka/api/RequestKeys.scala fbfc9d3aeaffed4ca85902125fcc1050086835db core/src/main/scala/kafka/api/RequestOrResponse.scala 57f87a48c5e87220e7f377b23d2bbfa0d16350dc core/src/main/scala/kafka/api/StopReplicaRequest.scala 68fc1389ee71122adb716d9d821dd8987a78ecee core/src/main/scala/kafka/api/StopReplicaResponse.scala c90ddee3d820472236ab554cddd2e0db24233ae3 core/src/main/scala/kafka/api/TopicMetadataRequest.scala a319f2f438bfd84c63edb330685b2b41d4b08aa0 core/src/main/scala/kafka/api/TopicMetadataResponse.scala f6b7429faeab34d0938cb2f78ce91021be7c4b85 core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 543e262b25a946abd84dd58dc5fcee67c6252375 core/src/main/scala/kafka/api/UpdateMetadataResponse.scala c583c1f00c89a993fb9dc280f190c32ea895dca5 core/src/main/scala/kafka/controller/ControllerChannelManager.scala 8763968fbff697e4c5c98ab1274627c192a4d26a core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 08dcc5553ccac7fbec0ea2662b402e2cec079e48 core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 7e6da164a26b1893c26c624a9998d4fedf8af95e core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala a2117b34c2ee3554602fe068eed0c90b075958c1 Diff: https://reviews.apache.org/r/23516/diff/ Testing ------- Thanks, Jun Rao