I think what you are saying is that in RequestChannel, we can start generating header/body for new request types and leave requestObj null. For existing requests, header/body will be null initially. Gradually, we can migrate each type of requests by populating header/body, instead of requestObj. This makes sense to me since it serves two purposes (1) not polluting the code base with duplicated request/response objects for new types of requests and (2) allowing the refactoring of existing requests to be done in smaller pieces.
Could you try that approach and perhaps just migrate one existing request type (e.g. HeartBeatRequest) as an example? We probably need to rewind the buffer after reading the requestId when deserializing the header (since the header includes the request id). Thanks, Jun On Mon, Mar 23, 2015 at 4:52 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > I'm thinking of a different approach, that will not fix everything, but > will allow adding new requests without code duplication (and therefore > unblock KIP-4): > > RequestChannel.request currently takes a buffer and parses it into an "old" > request object. Since the objects are byte-compatibly, we should be able to > parse existing requests into both old and new objects. New requests will > only be parsed into new objects. > > Basically: > val requestId = buffer.getShort() > if (requestId in keyToNameAndDeserializerMap) { > requestObj = RequestKeys.deserializerForKey(requestId)(buffer) > header: RequestHeader = RequestHeader.parse(buffer) > body: Struct = > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] > } else { > requestObj = null > header: RequestHeader = RequestHeader.parse(buffer) > body: Struct = > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] > } > > This way existing KafkaApis will keep working as normal. The new Apis can > implement just the new header/body requests. > We'll do the same on the send-side: BoundedByteBufferSend can have a > constructor that takes header/body instead of just a response object. > > Does that make sense? > > Once we have this in, we can move to: > * Adding the missing request/response to the client code > * Replacing requests that can be replaced > > It will also make life easier by having us review and tests smaller chunks > of work (the existing patch is *huge* , touches nearly every core component > and I'm not done yet...) > > Gwen > > > > > On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Ack, yeah, forgot about that. > > > > It's not just a difference of wrappers. The server side actually sends > the > > bytes lazily using FileChannel.transferTo. We need to make it possible to > > carry over that optimization. In some sense what we want to be able to do > > is set a value to a Send instead of a ByteBuffer. > > > > Let me try to add that support to the protocol definition stuff, will > > probably take me a few days to free up time. > > > > -Jay > > > > On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira <gshap...@cloudera.com> > > wrote: > > > > > In case anyone is still following this thread, I need a bit of help :) > > > > > > The old FetchResponse.PartitionData included a MessageSet object. > > > The new FetchResponse.PartitionData includes a ByteBuffer. > > > > > > However, when we read from logs, we return a MessageSet, and as far as > I > > > can see, these can't be converted to ByteBuffers (at least not without > > > copying their data). > > > > > > Did anyone consider how to reconcile the MessageSets with the new > > > FetchResponse objects? > > > > > > Gwen > > > > > > > > > On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira <gshap...@cloudera.com> > > > wrote: > > > > > > > Note: I'm also treating ZkUtils as if it was a public API (i.e. > > > converting > > > > objects that are returned into o.a.k.common equivalents but not > > changing > > > > ZkUtils itself). > > > > I know its not public, but I suspect I'm not the only developer here > > who > > > > has tons of external code that uses it. > > > > > > > > Gwen > > > > > > > > On Wed, Mar 18, 2015 at 5:48 PM, Gwen Shapira <gshap...@cloudera.com > > > > > > wrote: > > > > > > > >> We can't rip them out completely, unfortunately - the SimpleConsumer > > > uses > > > >> them. > > > >> > > > >> So we'll need conversion at some point. I'll try to make the > > > >> conversion point "just before hitting a public API that we can't > > > >> modify", and hopefully it won't look too arbitrary. > > > >> > > > >> > > > >> > > > >> On Wed, Mar 18, 2015 at 5:24 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > > >> > I think either approach is okay in the short term. However our > goal > > > >> should > > > >> > be to eventually get rid of that duplicate code, so if you are up > > for > > > >> just > > > >> > ripping and cutting that may get us there sooner. > > > >> > > > > >> > -Jay > > > >> > > > > >> > On Wed, Mar 18, 2015 at 5:19 PM, Gwen Shapira < > > gshap...@cloudera.com> > > > >> wrote: > > > >> > > > > >> >> Thanks! > > > >> >> > > > >> >> Another clarification: > > > >> >> The Common request/responses use slightly different > infrastructure > > > >> >> objects: Node instead of Broker, TopicPartition instead of > > > >> >> TopicAndPartition and few more. > > > >> >> > > > >> >> I can write utilities to convert Node to Broker to minimize the > > scope > > > >> >> of the change. > > > >> >> Or I can start replacing Brokers with Nodes across the board. > > > >> >> > > > >> >> I'm currently taking the second approach - i.e, if > MetadataRequest > > is > > > >> >> now returning Node, I'm changing the entire line of dependencies > to > > > >> >> use Nodes instead of broker. > > > >> >> > > > >> >> Is this acceptable, or do we want to take a more minimal approach > > for > > > >> >> this patch and do a larger replacement as a follow up? > > > >> >> > > > >> >> Gwen > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps <jay.kr...@gmail.com> > > > >> wrote: > > > >> >> > Great. > > > >> >> > > > > >> >> > For (3) yeah I think we should just think through the > end-to-end > > > >> pattern > > > >> >> > for these versioned requests since it seems like we will have a > > > >> number of > > > >> >> > them. The serialization code used as you described gets us to > the > > > >> right > > > >> >> > Struct which the user would then wrap in something like > > > >> ProduceRequest. > > > >> >> > Presumably there would just be one ProduceRequest that would > > > >> internally > > > >> >> > fill in things like null or otherwise adapt the struct to a > > usable > > > >> >> object. > > > >> >> > On the response side we would have the version from the request > > to > > > >> use > > > >> >> for > > > >> >> > correct versioning. On question is whether this is enough or > > > whether > > > >> we > > > >> >> > need to have switches in KafkaApis to do things like > > > >> >> > if(produceRequest.version == 3) > > > >> >> > // do something > > > >> >> > else > > > >> >> > // do something else > > > >> >> > > > > >> >> > Basically it would be good to be able to write a quick wiki > that > > > was > > > >> like > > > >> >> > "how to add or modify a kafka api" that explained the right way > > to > > > >> do all > > > >> >> > this. > > > >> >> > > > > >> >> > I don't think any of this necessarily blocks this ticket since > at > > > the > > > >> >> > moment we don't have tons of versions of requests out there. > > > >> >> > > > > >> >> > -Jay > > > >> >> > > > > >> >> > On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira < > > > gshap...@cloudera.com > > > >> > > > > >> >> wrote: > > > >> >> > > > > >> >> >> See inline responses: > > > >> >> >> > > > >> >> >> On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps < > jay.kr...@gmail.com > > > > > > >> wrote: > > > >> >> >> > Hey Gwen, > > > >> >> >> > > > > >> >> >> > This makes sense to me. > > > >> >> >> > > > > >> >> >> > A couple of thoughts, mostly confirming what you said I > think: > > > >> >> >> > > > > >> >> >> > 1. Ideally we would move completely over to the new style > > of > > > >> >> request > > > >> >> >> > definition for server-side processing, even for the > > internal > > > >> >> >> requests. This > > > >> >> >> > way all requests would have the same header/body struct > > > stuff. > > > >> As > > > >> >> you > > > >> >> >> say > > > >> >> >> > for the internal requests we can just delete the scala > > code. > > > >> For > > > >> >> the > > > >> >> >> old > > > >> >> >> > clients they will continue to use their old request > > > definitions > > > >> >> until > > > >> >> >> we > > > >> >> >> > eol them. I would propose that new changes will go only > > into > > > >> the > > > >> >> new > > > >> >> >> > request/response objects and the old scala ones will be > > > >> permanently > > > >> >> >> stuck > > > >> >> >> > on their current version until discontinued. So after > this > > > >> change > > > >> >> >> that old > > > >> >> >> > scala code could be considered frozen. > > > >> >> >> > > > >> >> >> SimpleConsumer is obviously stuck with the old > request/response. > > > >> >> >> > > > >> >> >> The Producers can be converted to the common request/response > > > >> without > > > >> >> >> breaking compatibility. > > > >> >> >> I think we should do this (even though it requires fiddling > with > > > >> >> >> additional network serialization code), just so we can throw > the > > > old > > > >> >> >> ProduceRequest away. > > > >> >> >> > > > >> >> >> Does that make sense? > > > >> >> >> > > > >> >> >> > > > >> >> >> > 2. I think it would be reasonable to keep all the > requests > > > >> under > > > >> >> >> common, > > > >> >> >> > even though as you point out there is currently no use > for > > > >> some of > > > >> >> >> them > > > >> >> >> > beyond broker-to-broker communication at the moment. > > > >> >> >> > > > >> >> >> Yep. > > > >> >> >> > > > >> >> >> > 3. We should think a little about how versioning will > work. > > > >> Making > > > >> >> >> this > > > >> >> >> > convenient on the server side is an important goal for > the > > > new > > > >> >> style > > > >> >> >> of > > > >> >> >> > request definition. At the serialization level we now > > handle > > > >> >> >> versioning but > > > >> >> >> > the question we should discuss and work out is how this > > will > > > >> map to > > > >> >> >> the > > > >> >> >> > request objects (which I assume will remain unversioned). > > > >> >> >> > > > >> >> >> The way I see it working (I just started on this, so I may > have > > > >> gaps): > > > >> >> >> > > > >> >> >> * Request header contains the version > > > >> >> >> * When we read the request, we use ProtoUtils.requestSchema > > which > > > >> >> >> takes version as a parameter and is responsible to give us the > > > right > > > >> >> >> Schema, which we use to read the buffer and get the correct > > > struct. > > > >> >> >> * KafkaApis handlers have the header, so they can use it to > > access > > > >> the > > > >> >> >> correct fields, build the correct response, etc. > > > >> >> >> > > > >> >> >> Does that sound about right? > > > >> >> >> > > > >> >> >> > > > >> >> >> > 4. Ideally after this refactoring the network package > > should > > > >> not be > > > >> >> >> > dependent on the individual request objects. The > intention > > is > > > >> that > > > >> >> >> stuff in > > > >> >> >> > kafka.network is meant to be generic network > infrastructure > > > >> that > > > >> >> >> doesn't > > > >> >> >> > know about the particular fetch/produce apis we have > > > >> implemented on > > > >> >> >> top. > > > >> >> >> > > > >> >> >> I'll make a note to validate that this is the case. > > > >> >> >> > > > >> >> >> > > > > >> >> >> > -Jay > > > >> >> >> > > > > >> >> >> > On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira < > > > >> gshap...@cloudera.com > > > >> >> > > > > >> >> >> > wrote: > > > >> >> >> > > > > >> >> >> >> Hi Jun, > > > >> >> >> >> > > > >> >> >> >> I was taking a slightly different approach. Let me know if > it > > > >> makes > > > >> >> >> >> sense to you: > > > >> >> >> >> > > > >> >> >> >> 1. Get the bytes from network (kinda unavoidable...) > > > >> >> >> >> 2. Modify RequestChannel.Request to contain header and body > > > >> (instead > > > >> >> >> >> of a single object) > > > >> >> >> >> 3. Create the head and body from bytes as follow: > > > >> >> >> >> val header: RequestHeader = RequestHeader.parse(buffer) > > > >> >> >> >> val apiKey: Int = header.apiKey > > > >> >> >> >> val body: Struct = > > > >> >> >> >> > > > >> >> >> > > > >> >> > > > >> > > > > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] > > > >> >> >> >> 4. KafkaAPIs will continue getting RequestChannel.Request, > > but > > > >> will > > > >> >> >> >> now have access to body and header separately. > > > >> >> >> >> > > > >> >> >> >> I agree that I need a Request/Response objects that contain > > > only > > > >> the > > > >> >> >> >> body for all requests objects. > > > >> >> >> >> I'm thinking of implementing them in o.a.k.Common.Requests > in > > > >> Java > > > >> >> for > > > >> >> >> >> consistency. > > > >> >> >> >> > > > >> >> >> >> When we are discussing the requests/responses used in > > > >> SimpleConsumer, > > > >> >> >> >> we mean everything used in javaapi, right? > > > >> >> >> >> > > > >> >> >> >> Gwen > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao <j...@confluent.io > > > > > >> wrote: > > > >> >> >> >> > Hi, Gwen, > > > >> >> >> >> > > > > >> >> >> >> > I was thinking that we will be doing the following in > > > >> KAFKA-1927. > > > >> >> >> >> > > > > >> >> >> >> > 1. Get the bytes from network. > > > >> >> >> >> > 2. Use a new generic approach to convert bytes into > request > > > >> >> objects. > > > >> >> >> >> > 2.1 Read the fixed request header (using the util in > > client). > > > >> >> >> >> > 2.2 Based on the request id in the header, deserialize > the > > > >> rest of > > > >> >> the > > > >> >> >> >> > bytes into a request specific object (using the new java > > > >> objects). > > > >> >> >> >> > 3. We will then be passing a header and an > > > >> AbstractRequestResponse > > > >> >> to > > > >> >> >> >> > KafkaApis. > > > >> >> >> >> > > > > >> >> >> >> > In order to do that, we will need to create similar > > > >> >> request/response > > > >> >> >> >> > objects for internal requests such as StopReplica, > > > >> LeaderAndIsr, > > > >> >> >> >> > UpdateMetadata, ControlledShutdown. Not sure whether they > > > >> should be > > > >> >> >> >> written > > > >> >> >> >> > in java or scala, but perhaps they should be only in the > > core > > > >> >> project. > > > >> >> >> >> > > > > >> >> >> >> > Also note, there are some scala requests/responses used > > > >> directly in > > > >> >> >> >> > SimpleConsumer. Since that's our public api, we can't > > remove > > > >> those > > > >> >> >> scala > > > >> >> >> >> > objects until the old consumer is phased out. We can > remove > > > the > > > >> >> rest > > > >> >> >> of > > > >> >> >> >> the > > > >> >> >> >> > scala request objects. > > > >> >> >> >> > > > > >> >> >> >> > Thanks, > > > >> >> >> >> > > > > >> >> >> >> > Jun > > > >> >> >> >> > > > > >> >> >> >> > > > > >> >> >> >> > On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira < > > > >> >> gshap...@cloudera.com> > > > >> >> >> >> wrote: > > > >> >> >> >> > > > > >> >> >> >> >> Hi, > > > >> >> >> >> >> > > > >> >> >> >> >> I'm starting this thread for the various questions I run > > > into > > > >> >> while > > > >> >> >> >> >> refactoring the server to use client requests and > > responses. > > > >> >> >> >> >> > > > >> >> >> >> >> Help is appreciated :) > > > >> >> >> >> >> > > > >> >> >> >> >> First question: LEADER_AND_ISR request and STOP_REPLICA > > > >> request > > > >> >> are > > > >> >> >> >> >> unimplemented in the client. > > > >> >> >> >> >> > > > >> >> >> >> >> Do we want to implement them as part of this > refactoring? > > > >> >> >> >> >> Or should we continue using the scala implementation for > > > >> those? > > > >> >> >> >> >> > > > >> >> >> >> >> Gwen > > > >> >> >> >> >> > > > >> >> >> >> > > > >> >> >> > > > >> >> > > > >> > > > > > > > > > > > > > >