Hi, Andrii, Good point on the constructor for MetadataResponse. Perhaps in V2 of MetadataResponse, we can add a new constructor with both cluster and a topicConfig map. Currently, cluster doesn't really need topic configs and we can leave it as it is now.
Thanks, Jun On Mon, May 18, 2015 at 11:10 AM, Andrii Biletskyi < andrii.bilets...@stealth.ly> wrote: > Jun, > > Thanks for the explanation. I believe my understanding is close to what > you have written. I see, I still think that this approach is > somewhat limiting > (what if you add field of type int in V1 and then remove another field of > type int in V2 - method overloading for V0 and V2 constructors will not > compile) but in any case we need to follow this approach. > > Ok, then I believe I will have to remove all "error"-constructors which > were > added as part of this sub-task. Instead in getErrorResponse(versionId, > throwable) > I will pattern-match on version and get the right response version > by calling the constructor with the right arguments. > > Also one small issue with this approach. Currently we create > MetadataRequest from a Cluster object. As you remember in KIP-4 we > planned to evolve it to include topic-level configs. We agreed to add > this to Cluster class directly. In this case it will break our pattern - > constructor per version, since the constructor won't be changed (simply > accepting cluster instance in both cases). > What is the preferable solution in this case? I can explicitly add > topicConfigs > param to the signature of the V1 constructor but it seems inconsistent > because > Cluster would already encapsulate topicConfigs at that point. > > Thanks, > Andrii Biletskyi > > On Mon, May 18, 2015 at 8:28 PM, Jun Rao <j...@confluent.io> wrote: > > > Andri, > > > > Let me clarify a bit how things work now. You can see if this fits your > > need or if it can be improved. If you look at OffsetCommitRequest, our > > convention is the following. > > > > 1. The request object can be constructed from a set of required fields. > The > > client typically constructs a request object this way. There will be one > > constructor for each version. The version id is not specified explicitly > > since it's implied by the input parameters. Every time we introduce a new > > version, we will add a new constructor of this form. We will leave the > old > > constructors as they are, but mark them as deprecated. Code compiled with > > the old Kafka jar will still work with the new Kafka jar before we > actually > > remove the deprecated constructors. > > > > 2. The request object can also be constructed from a struct. This is > > typically used by the broker to convert network bytes into a request > > object. Currently, the constructor looks for specific fields in the > struct > > to distinguish which version it corresponds to. > > > > 3. In both cases, the request object always tries to reflect the fields > in > > the latest version. We use the following convention when mapping older > > versions to the latest version in the request object: If a new field is > > added, we try to use a default for the missing field in the old version. > If > > a field is removed, we simply ignore it in the old version. > > > > Thanks, > > > > Jun > > > > On Mon, May 18, 2015 at 8:41 AM, Andrii Biletskyi < > > andrii.bilets...@stealth.ly> wrote: > > > > > Hi all, > > > > > > I started working on it and it seems we are going the wrong way. > > > So it appears we need to distinguish constructors by versions in > > > request/response (so we can set correct schema). > > > Request/Response classes will look like: > > > > > > class SomeRequest extends AbstractRequest { > > > SomeRequest(versionId, <request-specific params >) > > > > > > // for the latest version > > > SomeRequest(<request-specific params>) > > > } > > > > > > Now, what if in SomeRequest_V1 we remove some field from the schema? > > > Well, we can leave constructor signature and simply check > > programmatically > > > if set schema contains given field and if no simply ignore it. Thus > > > mentioned > > > constructor can support V0 & V1. Now, suppose in V2 we add some field - > > > there's nothing we can do, we need to add new parameter and thus add > new > > > constructor: > > > SomeRequest(versionId, <request-specific params for V2>) > > > > > > but it's a bit strange - to introduce constructors which may fail in > > > runtime-only > > > because you used the wrong constructor for your request version. > > > Overall in my opinion such approach depicts we are trying to give > clients > > > factory-like > > > methods but implemented as class constructors... > > > > > > Another thing is about versionId-less constructor (used for the latest > > > version). > > > Again, suppose in V1 we extend schema with additional value, we will > have > > > to change constructor without versionId, because this becomes the > latest > > > version. > > > But would it be considered backward-compatible? Client code that uses > V0 > > > and > > > upgrades will not compile in this case. > > > > > > Thoughts? > > > > > > Thanks, > > > Andrii Biletskyi > > > > > > > > > > > > > > > On Fri, May 15, 2015 at 4:31 PM, Andrii Biletskyi < > > > andrii.bilets...@stealth.ly> wrote: > > > > > > > Okay, > > > > I can pick that. I'll create sub-task under KAFKA-2044. > > > > > > > > Thanks, > > > > Andrii Biletskyi > > > > > > > > On Fri, May 15, 2015 at 4:27 PM, Gwen Shapira <gshap...@cloudera.com > > > > > > wrote: > > > > > > > >> Agree that you need version in getErrorResponse too (so you'll get > the > > > >> correct error), which means you'll need to add versionId to > > constructors > > > >> of > > > >> every response object... > > > >> > > > >> You'll want to keep two interfaces, one with version and one with > > > >> CURR_VERSION as default, so you won't need to modify every single > > > call... > > > >> > > > >> On Fri, May 15, 2015 at 4:03 PM, Andrii Biletskyi < > > > >> andrii.bilets...@stealth.ly> wrote: > > > >> > > > >> > Correct, I think we are on the same page. > > > >> > This way we can fix RequestChannel part (where it uses > > > >> > AbstractRequest.getRequest) > > > >> > > > > >> > But would it be okay to add versionId to > > > >> AbstractRequest.getErrorResponse > > > >> > signature too? > > > >> > I'm a bit lost with all those Abstract... objects hierarchy and > not > > > sure > > > >> > whether it's > > > >> > the right solution. > > > >> > > > > >> > Thanks, > > > >> > Andrii Biletskyi > > > >> > > > > >> > On Fri, May 15, 2015 at 3:47 PM, Gwen Shapira < > > gshap...@cloudera.com> > > > >> > wrote: > > > >> > > > > >> > > I agree, we currently don't handle versions correctly when > > > >> de-serializing > > > >> > > into java objects. This will be an isssue for every req/resp we > > move > > > >> to > > > >> > use > > > >> > > the java objects. > > > >> > > > > > >> > > It looks like this requires: > > > >> > > 1. Add versionId parameter to all parse functions in Java > req/resp > > > >> > objects > > > >> > > 2. Modify getRequest to pass it along > > > >> > > 3. Modify RequestChannel to get the version out of the header > and > > > use > > > >> it > > > >> > > when de-serializing the body. > > > >> > > > > > >> > > Did I get that correct? I want to make sure we are talking about > > the > > > >> same > > > >> > > issue. > > > >> > > > > > >> > > Gwen > > > >> > > > > > >> > > On Fri, May 15, 2015 at 1:45 PM, Andrii Biletskyi < > > > >> > > andrii.bilets...@stealth.ly> wrote: > > > >> > > > > > >> > > > Gwen, > > > >> > > > > > > >> > > > I didn't find this in answers above so apologies if this was > > > >> discussed. > > > >> > > > It's about the way we would like to handle request versions. > > > >> > > > > > > >> > > > As I understood from Jun's answer we generally should try > using > > > the > > > >> > same > > > >> > > > java object while evolving the request. I believe the only > > example > > > >> of > > > >> > > > evolved > > > >> > > > request now - OffsetCommitRequest follows this approach. > > > >> > > > > > > >> > > > I'm trying to evolve MetadataRequest to the next version as > part > > > of > > > >> > KIP-4 > > > >> > > > and not sure current AbstractRequest api (which is a basis for > > > >> ported > > > >> > to > > > >> > > > java requests) > > > >> > > > is sufficient. > > > >> > > > > > > >> > > > The problem is: in order to deserialize bytes into correct > > correct > > > >> > object > > > >> > > > you need > > > >> > > > to know it's version. Suppose KafkaApi serves > > > OffsetCommitRequestV0 > > > >> and > > > >> > > V2 > > > >> > > > (current). > > > >> > > > For such cases OffsetCommitRequest class has two constructors: > > > >> > > > > > > >> > > > public static OffsetCommitRequest parse(ByteBuffer buffer, int > > > >> > versionId) > > > >> > > > AND > > > >> > > > public static OffsetCommitRequest parse(ByteBuffer buffer) > > > >> > > > > > > >> > > > The latter one will simply pick the "current" schema version. > > > >> > > > Now AbstractRequest.getRequest which is an entry point for > > > >> > deserializing > > > >> > > > request > > > >> > > > for KafkaApi matches only on RequestHeader.apiKey (and thus > uses > > > the > > > >> > > second > > > >> > > > OffsetCommitRequest constructor) which is not sufficient > because > > > we > > > >> > also > > > >> > > > need > > > >> > > > RequestHeader.apiVersion in case old request version. > > > >> > > > > > > >> > > > The same problem appears in > > > >> AbstractRequest.getErrorResponse(Throwable > > > >> > > e) - > > > >> > > > to construct the right error response object we need to know > to > > > >> which > > > >> > > > apiVersion > > > >> > > > to respond. > > > >> > > > > > > >> > > > I think this can affect other tasks under KAFKA-1927 - > replacing > > > >> > separate > > > >> > > > RQ/RP, > > > >> > > > so maybe it makes sense to decide/fix it once. > > > >> > > > > > > >> > > > Thanks, > > > >> > > > Andrii Bieltskyi > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > On Wed, Mar 25, 2015 at 12:42 AM, Gwen Shapira < > > > >> gshap...@cloudera.com> > > > >> > > > wrote: > > > >> > > > > > > >> > > > > OK, I posted a working patch on KAFKA-2044 and > > > >> > > > > https://reviews.apache.org/r/32459/diff/. > > > >> > > > > > > > >> > > > > There are few decisions there than can be up to discussion > > > >> (factory > > > >> > > > method > > > >> > > > > on AbstractRequestResponse, the new handleErrors in request > > > API), > > > >> but > > > >> > > as > > > >> > > > > far as support for o.a.k.common requests in core goes, it > does > > > >> what > > > >> > it > > > >> > > > > needs to do. > > > >> > > > > > > > >> > > > > Please review! > > > >> > > > > > > > >> > > > > Gwen > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > On Tue, Mar 24, 2015 at 10:59 AM, Gwen Shapira < > > > >> > gshap...@cloudera.com> > > > >> > > > > wrote: > > > >> > > > > > > > >> > > > > > Hi, > > > >> > > > > > > > > >> > > > > > I uploaded a (very) preliminary patch with my idea. > > > >> > > > > > > > > >> > > > > > One thing thats missing: > > > >> > > > > > RequestResponse had handleError method that all requests > > > >> > > implemented, > > > >> > > > > > typically generating appropriate error Response for the > > > request > > > >> and > > > >> > > > > sending > > > >> > > > > > it along. Its used by KafkaApis to handle all protocol > > errors > > > >> for > > > >> > > valid > > > >> > > > > > requests that are not handled elsewhere. > > > >> > > > > > AbstractRequestResponse doesn't have such method. > > > >> > > > > > > > > >> > > > > > I can, of course, add it. > > > >> > > > > > But before I jump into this, I'm wondering if there was > > > another > > > >> > plan > > > >> > > on > > > >> > > > > > handling Api errors. > > > >> > > > > > > > > >> > > > > > Gwen > > > >> > > > > > > > > >> > > > > > On Mon, Mar 23, 2015 at 6:16 PM, Jun Rao < > j...@confluent.io> > > > >> wrote: > > > >> > > > > > > > > >> > > > > >> I think what you are saying is that in RequestChannel, we > > can > > > >> > start > > > >> > > > > >> generating header/body for new request types and leave > > > >> requestObj > > > >> > > > null. > > > >> > > > > >> For > > > >> > > > > >> existing requests, header/body will be null initially. > > > >> Gradually, > > > >> > we > > > >> > > > can > > > >> > > > > >> migrate each type of requests by populating header/body, > > > >> instead > > > >> > of > > > >> > > > > >> requestObj. This makes sense to me since it serves two > > > purposes > > > >> > (1) > > > >> > > > not > > > >> > > > > >> polluting the code base with duplicated request/response > > > >> objects > > > >> > for > > > >> > > > new > > > >> > > > > >> types of requests and (2) allowing the refactoring of > > > existing > > > >> > > > requests > > > >> > > > > to > > > >> > > > > >> be done in smaller pieces. > > > >> > > > > >> > > > >> > > > > >> Could you try that approach and perhaps just migrate one > > > >> existing > > > >> > > > > request > > > >> > > > > >> type (e.g. HeartBeatRequest) as an example? We probably > > need > > > to > > > >> > > rewind > > > >> > > > > the > > > >> > > > > >> buffer after reading the requestId when deserializing the > > > >> header > > > >> > > > (since > > > >> > > > > >> the > > > >> > > > > >> header includes the request id). > > > >> > > > > >> > > > >> > > > > >> Thanks, > > > >> > > > > >> > > > >> > > > > >> Jun > > > >> > > > > >> > > > >> > > > > >> On Mon, Mar 23, 2015 at 4:52 PM, Gwen Shapira < > > > >> > > gshap...@cloudera.com> > > > >> > > > > >> wrote: > > > >> > > > > >> > > > >> > > > > >> > I'm thinking of a different approach, that will not fix > > > >> > > everything, > > > >> > > > > but > > > >> > > > > >> > will allow adding new requests without code duplication > > > (and > > > >> > > > therefore > > > >> > > > > >> > unblock KIP-4): > > > >> > > > > >> > > > > >> > > > > >> > RequestChannel.request currently takes a buffer and > > parses > > > it > > > >> > into > > > >> > > > an > > > >> > > > > >> "old" > > > >> > > > > >> > request object. Since the objects are byte-compatibly, > we > > > >> should > > > >> > > be > > > >> > > > > >> able to > > > >> > > > > >> > parse existing requests into both old and new objects. > > New > > > >> > > requests > > > >> > > > > will > > > >> > > > > >> > only be parsed into new objects. > > > >> > > > > >> > > > > >> > > > > >> > Basically: > > > >> > > > > >> > val requestId = buffer.getShort() > > > >> > > > > >> > if (requestId in keyToNameAndDeserializerMap) { > > > >> > > > > >> > requestObj = > > > >> > RequestKeys.deserializerForKey(requestId)(buffer) > > > >> > > > > >> > header: RequestHeader = RequestHeader.parse(buffer) > > > >> > > > > >> > body: Struct = > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > > > > >> > > > > > >> > > > > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] > > > >> > > > > >> > } else { > > > >> > > > > >> > requestObj = null > > > >> > > > > >> > header: RequestHeader = RequestHeader.parse(buffer) > > > >> > > > > >> > body: Struct = > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > > > > >> > > > > > >> > > > > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] > > > >> > > > > >> > } > > > >> > > > > >> > > > > >> > > > > >> > This way existing KafkaApis will keep working as > normal. > > > The > > > >> new > > > >> > > > Apis > > > >> > > > > >> can > > > >> > > > > >> > implement just the new header/body requests. > > > >> > > > > >> > We'll do the same on the send-side: > BoundedByteBufferSend > > > can > > > >> > > have a > > > >> > > > > >> > constructor that takes header/body instead of just a > > > response > > > >> > > > object. > > > >> > > > > >> > > > > >> > > > > >> > Does that make sense? > > > >> > > > > >> > > > > >> > > > > >> > Once we have this in, we can move to: > > > >> > > > > >> > * Adding the missing request/response to the client > code > > > >> > > > > >> > * Replacing requests that can be replaced > > > >> > > > > >> > > > > >> > > > > >> > It will also make life easier by having us review and > > tests > > > >> > > smaller > > > >> > > > > >> chunks > > > >> > > > > >> > of work (the existing patch is *huge* , touches nearly > > > every > > > >> > core > > > >> > > > > >> component > > > >> > > > > >> > and I'm not done yet...) > > > >> > > > > >> > > > > >> > > > > >> > Gwen > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps < > > > >> > jay.kr...@gmail.com> > > > >> > > > > >> wrote: > > > >> > > > > >> > > > > >> > > > > >> > > Ack, yeah, forgot about that. > > > >> > > > > >> > > > > > >> > > > > >> > > It's not just a difference of wrappers. The server > side > > > >> > actually > > > >> > > > > sends > > > >> > > > > >> > the > > > >> > > > > >> > > bytes lazily using FileChannel.transferTo. We need to > > > make > > > >> it > > > >> > > > > >> possible to > > > >> > > > > >> > > carry over that optimization. In some sense what we > > want > > > >> to be > > > >> > > > able > > > >> > > > > >> to do > > > >> > > > > >> > > is set a value to a Send instead of a ByteBuffer. > > > >> > > > > >> > > > > > >> > > > > >> > > Let me try to add that support to the protocol > > definition > > > >> > stuff, > > > >> > > > > will > > > >> > > > > >> > > probably take me a few days to free up time. > > > >> > > > > >> > > > > > >> > > > > >> > > -Jay > > > >> > > > > >> > > > > > >> > > > > >> > > On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira < > > > >> > > > > gshap...@cloudera.com> > > > >> > > > > >> > > wrote: > > > >> > > > > >> > > > > > >> > > > > >> > > > In case anyone is still following this thread, I > > need a > > > >> bit > > > >> > of > > > >> > > > > help > > > >> > > > > >> :) > > > >> > > > > >> > > > > > > >> > > > > >> > > > The old FetchResponse.PartitionData included a > > > MessageSet > > > >> > > > object. > > > >> > > > > >> > > > The new FetchResponse.PartitionData includes a > > > >> ByteBuffer. > > > >> > > > > >> > > > > > > >> > > > > >> > > > However, when we read from logs, we return a > > > MessageSet, > > > >> and > > > >> > > as > > > >> > > > > far > > > >> > > > > >> as > > > >> > > > > >> > I > > > >> > > > > >> > > > can see, these can't be converted to ByteBuffers > (at > > > >> least > > > >> > not > > > >> > > > > >> without > > > >> > > > > >> > > > copying their data). > > > >> > > > > >> > > > > > > >> > > > > >> > > > Did anyone consider how to reconcile the > MessageSets > > > with > > > >> > the > > > >> > > > new > > > >> > > > > >> > > > FetchResponse objects? > > > >> > > > > >> > > > > > > >> > > > > >> > > > Gwen > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira < > > > >> > > > > >> gshap...@cloudera.com> > > > >> > > > > >> > > > wrote: > > > >> > > > > >> > > > > > > >> > > > > >> > > > > Note: I'm also treating ZkUtils as if it was a > > public > > > >> API > > > >> > > > (i.e. > > > >> > > > > >> > > > converting > > > >> > > > > >> > > > > objects that are returned into o.a.k.common > > > equivalents > > > >> > but > > > >> > > > not > > > >> > > > > >> > > changing > > > >> > > > > >> > > > > ZkUtils itself). > > > >> > > > > >> > > > > I know its not public, but I suspect I'm not the > > only > > > >> > > > developer > > > >> > > > > >> here > > > >> > > > > >> > > who > > > >> > > > > >> > > > > has tons of external code that uses it. > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > Gwen > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > On Wed, Mar 18, 2015 at 5:48 PM, Gwen Shapira < > > > >> > > > > >> gshap...@cloudera.com > > > >> > > > > >> > > > > > >> > > > > >> > > > > wrote: > > > >> > > > > >> > > > > > > > >> > > > > >> > > > >> We can't rip them out completely, unfortunately > - > > > the > > > >> > > > > >> SimpleConsumer > > > >> > > > > >> > > > uses > > > >> > > > > >> > > > >> them. > > > >> > > > > >> > > > >> > > > >> > > > > >> > > > >> So we'll need conversion at some point. I'll try > > to > > > >> make > > > >> > > the > > > >> > > > > >> > > > >> conversion point "just before hitting a public > API > > > >> that > > > >> > we > > > >> > > > > can't > > > >> > > > > >> > > > >> modify", and hopefully it won't look too > > arbitrary. > > > >> > > > > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > >> > > > >> On Wed, Mar 18, 2015 at 5:24 PM, Jay Kreps < > > > >> > > > > jay.kr...@gmail.com> > > > >> > > > > >> > > wrote: > > > >> > > > > >> > > > >> > I think either approach is okay in the short > > term. > > > >> > > However > > > >> > > > > our > > > >> > > > > >> > goal > > > >> > > > > >> > > > >> should > > > >> > > > > >> > > > >> > be to eventually get rid of that duplicate > code, > > > so > > > >> if > > > >> > > you > > > >> > > > > are > > > >> > > > > >> up > > > >> > > > > >> > > for > > > >> > > > > >> > > > >> just > > > >> > > > > >> > > > >> > ripping and cutting that may get us there > > sooner. > > > >> > > > > >> > > > >> > > > > >> > > > > >> > > > >> > -Jay > > > >> > > > > >> > > > >> > > > > >> > > > > >> > > > >> > On Wed, Mar 18, 2015 at 5:19 PM, Gwen Shapira > < > > > >> > > > > >> > > gshap...@cloudera.com> > > > >> > > > > >> > > > >> wrote: > > > >> > > > > >> > > > >> > > > > >> > > > > >> > > > >> >> Thanks! > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> >> Another clarification: > > > >> > > > > >> > > > >> >> The Common request/responses use slightly > > > different > > > >> > > > > >> > infrastructure > > > >> > > > > >> > > > >> >> objects: Node instead of Broker, > TopicPartition > > > >> > instead > > > >> > > of > > > >> > > > > >> > > > >> >> TopicAndPartition and few more. > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> >> I can write utilities to convert Node to > Broker > > > to > > > >> > > > minimize > > > >> > > > > >> the > > > >> > > > > >> > > scope > > > >> > > > > >> > > > >> >> of the change. > > > >> > > > > >> > > > >> >> Or I can start replacing Brokers with Nodes > > > across > > > >> the > > > >> > > > > board. > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> >> I'm currently taking the second approach - > i.e, > > > if > > > >> > > > > >> > MetadataRequest > > > >> > > > > >> > > is > > > >> > > > > >> > > > >> >> now returning Node, I'm changing the entire > > line > > > of > > > >> > > > > >> dependencies > > > >> > > > > >> > to > > > >> > > > > >> > > > >> >> use Nodes instead of broker. > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> >> Is this acceptable, or do we want to take a > > more > > > >> > minimal > > > >> > > > > >> approach > > > >> > > > > >> > > for > > > >> > > > > >> > > > >> >> this patch and do a larger replacement as a > > > follow > > > >> up? > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> >> Gwen > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> >> On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps < > > > >> > > > > >> jay.kr...@gmail.com> > > > >> > > > > >> > > > >> wrote: > > > >> > > > > >> > > > >> >> > Great. > > > >> > > > > >> > > > >> >> > > > > >> > > > > >> > > > >> >> > For (3) yeah I think we should just think > > > through > > > >> > the > > > >> > > > > >> > end-to-end > > > >> > > > > >> > > > >> pattern > > > >> > > > > >> > > > >> >> > for these versioned requests since it seems > > > like > > > >> we > > > >> > > will > > > >> > > > > >> have a > > > >> > > > > >> > > > >> number of > > > >> > > > > >> > > > >> >> > them. The serialization code used as you > > > >> described > > > >> > > gets > > > >> > > > us > > > >> > > > > >> to > > > >> > > > > >> > the > > > >> > > > > >> > > > >> right > > > >> > > > > >> > > > >> >> > Struct which the user would then wrap in > > > >> something > > > >> > > like > > > >> > > > > >> > > > >> ProduceRequest. > > > >> > > > > >> > > > >> >> > Presumably there would just be one > > > ProduceRequest > > > >> > that > > > >> > > > > would > > > >> > > > > >> > > > >> internally > > > >> > > > > >> > > > >> >> > fill in things like null or otherwise adapt > > the > > > >> > struct > > > >> > > > to > > > >> > > > > a > > > >> > > > > >> > > usable > > > >> > > > > >> > > > >> >> object. > > > >> > > > > >> > > > >> >> > On the response side we would have the > > version > > > >> from > > > >> > > the > > > >> > > > > >> request > > > >> > > > > >> > > to > > > >> > > > > >> > > > >> use > > > >> > > > > >> > > > >> >> for > > > >> > > > > >> > > > >> >> > correct versioning. On question is whether > > this > > > >> is > > > >> > > > enough > > > >> > > > > or > > > >> > > > > >> > > > whether > > > >> > > > > >> > > > >> we > > > >> > > > > >> > > > >> >> > need to have switches in KafkaApis to do > > things > > > >> like > > > >> > > > > >> > > > >> >> > if(produceRequest.version == 3) > > > >> > > > > >> > > > >> >> > // do something > > > >> > > > > >> > > > >> >> > else > > > >> > > > > >> > > > >> >> > // do something else > > > >> > > > > >> > > > >> >> > > > > >> > > > > >> > > > >> >> > Basically it would be good to be able to > > write > > > a > > > >> > quick > > > >> > > > > wiki > > > >> > > > > >> > that > > > >> > > > > >> > > > was > > > >> > > > > >> > > > >> like > > > >> > > > > >> > > > >> >> > "how to add or modify a kafka api" that > > > explained > > > >> > the > > > >> > > > > right > > > >> > > > > >> way > > > >> > > > > >> > > to > > > >> > > > > >> > > > >> do all > > > >> > > > > >> > > > >> >> > this. > > > >> > > > > >> > > > >> >> > > > > >> > > > > >> > > > >> >> > I don't think any of this necessarily > blocks > > > this > > > >> > > ticket > > > >> > > > > >> since > > > >> > > > > >> > at > > > >> > > > > >> > > > the > > > >> > > > > >> > > > >> >> > moment we don't have tons of versions of > > > requests > > > >> > out > > > >> > > > > there. > > > >> > > > > >> > > > >> >> > > > > >> > > > > >> > > > >> >> > -Jay > > > >> > > > > >> > > > >> >> > > > > >> > > > > >> > > > >> >> > On Wed, Mar 18, 2015 at 2:50 PM, Gwen > > Shapira < > > > >> > > > > >> > > > gshap...@cloudera.com > > > >> > > > > >> > > > >> > > > > >> > > > > >> > > > >> >> wrote: > > > >> > > > > >> > > > >> >> > > > > >> > > > > >> > > > >> >> >> See inline responses: > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> On Wed, Mar 18, 2015 at 2:26 PM, Jay > Kreps < > > > >> > > > > >> > jay.kr...@gmail.com > > > >> > > > > >> > > > > > > >> > > > > >> > > > >> wrote: > > > >> > > > > >> > > > >> >> >> > Hey Gwen, > > > >> > > > > >> > > > >> >> >> > > > > >> > > > > >> > > > >> >> >> > This makes sense to me. > > > >> > > > > >> > > > >> >> >> > > > > >> > > > > >> > > > >> >> >> > A couple of thoughts, mostly confirming > > what > > > >> you > > > >> > > > said I > > > >> > > > > >> > think: > > > >> > > > > >> > > > >> >> >> > > > > >> > > > > >> > > > >> >> >> > 1. Ideally we would move completely > > over > > > to > > > >> > the > > > >> > > > new > > > >> > > > > >> style > > > >> > > > > >> > > of > > > >> > > > > >> > > > >> >> request > > > >> > > > > >> > > > >> >> >> > definition for server-side > processing, > > > even > > > >> > for > > > >> > > > the > > > >> > > > > >> > > internal > > > >> > > > > >> > > > >> >> >> requests. This > > > >> > > > > >> > > > >> >> >> > way all requests would have the same > > > >> > header/body > > > >> > > > > >> struct > > > >> > > > > >> > > > stuff. > > > >> > > > > >> > > > >> As > > > >> > > > > >> > > > >> >> you > > > >> > > > > >> > > > >> >> >> say > > > >> > > > > >> > > > >> >> >> > for the internal requests we can just > > > >> delete > > > >> > the > > > >> > > > > scala > > > >> > > > > >> > > code. > > > >> > > > > >> > > > >> For > > > >> > > > > >> > > > >> >> the > > > >> > > > > >> > > > >> >> >> old > > > >> > > > > >> > > > >> >> >> > clients they will continue to use > their > > > old > > > >> > > > request > > > >> > > > > >> > > > definitions > > > >> > > > > >> > > > >> >> until > > > >> > > > > >> > > > >> >> >> we > > > >> > > > > >> > > > >> >> >> > eol them. I would propose that new > > > changes > > > >> > will > > > >> > > go > > > >> > > > > >> only > > > >> > > > > >> > > into > > > >> > > > > >> > > > >> the > > > >> > > > > >> > > > >> >> new > > > >> > > > > >> > > > >> >> >> > request/response objects and the old > > > scala > > > >> > ones > > > >> > > > will > > > >> > > > > >> be > > > >> > > > > >> > > > >> permanently > > > >> > > > > >> > > > >> >> >> stuck > > > >> > > > > >> > > > >> >> >> > on their current version until > > > >> discontinued. > > > >> > So > > > >> > > > > after > > > >> > > > > >> > this > > > >> > > > > >> > > > >> change > > > >> > > > > >> > > > >> >> >> that old > > > >> > > > > >> > > > >> >> >> > scala code could be considered > frozen. > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> SimpleConsumer is obviously stuck with the > > old > > > >> > > > > >> > request/response. > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> The Producers can be converted to the > common > > > >> > > > > >> request/response > > > >> > > > > >> > > > >> without > > > >> > > > > >> > > > >> >> >> breaking compatibility. > > > >> > > > > >> > > > >> >> >> I think we should do this (even though it > > > >> requires > > > >> > > > > fiddling > > > >> > > > > >> > with > > > >> > > > > >> > > > >> >> >> additional network serialization code), > just > > > so > > > >> we > > > >> > > can > > > >> > > > > >> throw > > > >> > > > > >> > the > > > >> > > > > >> > > > old > > > >> > > > > >> > > > >> >> >> ProduceRequest away. > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> Does that make sense? > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> > 2. I think it would be reasonable to > > keep > > > >> all > > > >> > > the > > > >> > > > > >> > requests > > > >> > > > > >> > > > >> under > > > >> > > > > >> > > > >> >> >> common, > > > >> > > > > >> > > > >> >> >> > even though as you point out there is > > > >> > currently > > > >> > > no > > > >> > > > > use > > > >> > > > > >> > for > > > >> > > > > >> > > > >> some of > > > >> > > > > >> > > > >> >> >> them > > > >> > > > > >> > > > >> >> >> > beyond broker-to-broker communication > > at > > > >> the > > > >> > > > moment. > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> Yep. > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> > 3. We should think a little about how > > > >> > versioning > > > >> > > > > will > > > >> > > > > >> > work. > > > >> > > > > >> > > > >> Making > > > >> > > > > >> > > > >> >> >> this > > > >> > > > > >> > > > >> >> >> > convenient on the server side is an > > > >> important > > > >> > > goal > > > >> > > > > for > > > >> > > > > >> > the > > > >> > > > > >> > > > new > > > >> > > > > >> > > > >> >> style > > > >> > > > > >> > > > >> >> >> of > > > >> > > > > >> > > > >> >> >> > request definition. At the > > serialization > > > >> level > > > >> > > we > > > >> > > > > now > > > >> > > > > >> > > handle > > > >> > > > > >> > > > >> >> >> versioning but > > > >> > > > > >> > > > >> >> >> > the question we should discuss and > work > > > >> out is > > > >> > > how > > > >> > > > > >> this > > > >> > > > > >> > > will > > > >> > > > > >> > > > >> map to > > > >> > > > > >> > > > >> >> >> the > > > >> > > > > >> > > > >> >> >> > request objects (which I assume will > > > remain > > > >> > > > > >> unversioned). > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> The way I see it working (I just started > on > > > >> this, > > > >> > so > > > >> > > I > > > >> > > > > may > > > >> > > > > >> > have > > > >> > > > > >> > > > >> gaps): > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> * Request header contains the version > > > >> > > > > >> > > > >> >> >> * When we read the request, we use > > > >> > > > > ProtoUtils.requestSchema > > > >> > > > > >> > > which > > > >> > > > > >> > > > >> >> >> takes version as a parameter and is > > > responsible > > > >> to > > > >> > > give > > > >> > > > > us > > > >> > > > > >> the > > > >> > > > > >> > > > right > > > >> > > > > >> > > > >> >> >> Schema, which we use to read the buffer > and > > > get > > > >> the > > > >> > > > > correct > > > >> > > > > >> > > > struct. > > > >> > > > > >> > > > >> >> >> * KafkaApis handlers have the header, so > > they > > > >> can > > > >> > use > > > >> > > > it > > > >> > > > > to > > > >> > > > > >> > > access > > > >> > > > > >> > > > >> the > > > >> > > > > >> > > > >> >> >> correct fields, build the correct > response, > > > etc. > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> Does that sound about right? > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> > 4. Ideally after this refactoring the > > > >> network > > > >> > > > > package > > > >> > > > > >> > > should > > > >> > > > > >> > > > >> not be > > > >> > > > > >> > > > >> >> >> > dependent on the individual request > > > >> objects. > > > >> > The > > > >> > > > > >> > intention > > > >> > > > > >> > > is > > > >> > > > > >> > > > >> that > > > >> > > > > >> > > > >> >> >> stuff in > > > >> > > > > >> > > > >> >> >> > kafka.network is meant to be generic > > > >> network > > > >> > > > > >> > infrastructure > > > >> > > > > >> > > > >> that > > > >> > > > > >> > > > >> >> >> doesn't > > > >> > > > > >> > > > >> >> >> > know about the particular > fetch/produce > > > >> apis > > > >> > we > > > >> > > > have > > > >> > > > > >> > > > >> implemented on > > > >> > > > > >> > > > >> >> >> top. > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> I'll make a note to validate that this is > > the > > > >> case. > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> >> > > > > >> > > > > >> > > > >> >> >> > -Jay > > > >> > > > > >> > > > >> >> >> > > > > >> > > > > >> > > > >> >> >> > On Wed, Mar 18, 2015 at 11:11 AM, Gwen > > > >> Shapira < > > > >> > > > > >> > > > >> gshap...@cloudera.com > > > >> > > > > >> > > > >> >> > > > > >> > > > > >> > > > >> >> >> > wrote: > > > >> > > > > >> > > > >> >> >> > > > > >> > > > > >> > > > >> >> >> >> Hi Jun, > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> I was taking a slightly different > > approach. > > > >> Let > > > >> > me > > > >> > > > > know > > > >> > > > > >> if > > > >> > > > > >> > it > > > >> > > > > >> > > > >> makes > > > >> > > > > >> > > > >> >> >> >> sense to you: > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> 1. Get the bytes from network (kinda > > > >> > > unavoidable...) > > > >> > > > > >> > > > >> >> >> >> 2. Modify RequestChannel.Request to > > contain > > > >> > header > > > >> > > > and > > > >> > > > > >> body > > > >> > > > > >> > > > >> (instead > > > >> > > > > >> > > > >> >> >> >> of a single object) > > > >> > > > > >> > > > >> >> >> >> 3. Create the head and body from bytes > as > > > >> > follow: > > > >> > > > > >> > > > >> >> >> >> val header: RequestHeader = > > > >> > > > > >> RequestHeader.parse(buffer) > > > >> > > > > >> > > > >> >> >> >> val apiKey: Int = header.apiKey > > > >> > > > > >> > > > >> >> >> >> val body: Struct = > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > >> > > > > >> > > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > > > > >> > > > > > >> > > > > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] > > > >> > > > > >> > > > >> >> >> >> 4. KafkaAPIs will continue getting > > > >> > > > > >> RequestChannel.Request, > > > >> > > > > >> > > but > > > >> > > > > >> > > > >> will > > > >> > > > > >> > > > >> >> >> >> now have access to body and header > > > >> separately. > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> I agree that I need a Request/Response > > > >> objects > > > >> > > that > > > >> > > > > >> contain > > > >> > > > > >> > > > only > > > >> > > > > >> > > > >> the > > > >> > > > > >> > > > >> >> >> >> body for all requests objects. > > > >> > > > > >> > > > >> >> >> >> I'm thinking of implementing them in > > > >> > > > > >> o.a.k.Common.Requests > > > >> > > > > >> > in > > > >> > > > > >> > > > >> Java > > > >> > > > > >> > > > >> >> for > > > >> > > > > >> > > > >> >> >> >> consistency. > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> When we are discussing the > > > requests/responses > > > >> > used > > > >> > > > in > > > >> > > > > >> > > > >> SimpleConsumer, > > > >> > > > > >> > > > >> >> >> >> we mean everything used in javaapi, > > right? > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> Gwen > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> On Wed, Mar 18, 2015 at 9:55 AM, Jun > Rao > > < > > > >> > > > > >> j...@confluent.io > > > >> > > > > >> > > > > > >> > > > > >> > > > >> wrote: > > > >> > > > > >> > > > >> >> >> >> > Hi, Gwen, > > > >> > > > > >> > > > >> >> >> >> > > > > >> > > > > >> > > > >> >> >> >> > I was thinking that we will be doing > > the > > > >> > > following > > > >> > > > > in > > > >> > > > > >> > > > >> KAFKA-1927. > > > >> > > > > >> > > > >> >> >> >> > > > > >> > > > > >> > > > >> >> >> >> > 1. Get the bytes from network. > > > >> > > > > >> > > > >> >> >> >> > 2. Use a new generic approach to > > convert > > > >> bytes > > > >> > > > into > > > >> > > > > >> > request > > > >> > > > > >> > > > >> >> objects. > > > >> > > > > >> > > > >> >> >> >> > 2.1 Read the fixed request header > > (using > > > >> the > > > >> > > util > > > >> > > > in > > > >> > > > > >> > > client). > > > >> > > > > >> > > > >> >> >> >> > 2.2 Based on the request id in the > > > header, > > > >> > > > > deserialize > > > >> > > > > >> > the > > > >> > > > > >> > > > >> rest of > > > >> > > > > >> > > > >> >> the > > > >> > > > > >> > > > >> >> >> >> > bytes into a request specific object > > > (using > > > >> > the > > > >> > > > new > > > >> > > > > >> java > > > >> > > > > >> > > > >> objects). > > > >> > > > > >> > > > >> >> >> >> > 3. We will then be passing a header > and > > > an > > > >> > > > > >> > > > >> AbstractRequestResponse > > > >> > > > > >> > > > >> >> to > > > >> > > > > >> > > > >> >> >> >> > KafkaApis. > > > >> > > > > >> > > > >> >> >> >> > > > > >> > > > > >> > > > >> >> >> >> > In order to do that, we will need to > > > create > > > >> > > > similar > > > >> > > > > >> > > > >> >> request/response > > > >> > > > > >> > > > >> >> >> >> > objects for internal requests such as > > > >> > > StopReplica, > > > >> > > > > >> > > > >> LeaderAndIsr, > > > >> > > > > >> > > > >> >> >> >> > UpdateMetadata, ControlledShutdown. > Not > > > >> sure > > > >> > > > whether > > > >> > > > > >> they > > > >> > > > > >> > > > >> should be > > > >> > > > > >> > > > >> >> >> >> written > > > >> > > > > >> > > > >> >> >> >> > in java or scala, but perhaps they > > should > > > >> be > > > >> > > only > > > >> > > > in > > > >> > > > > >> the > > > >> > > > > >> > > core > > > >> > > > > >> > > > >> >> project. > > > >> > > > > >> > > > >> >> >> >> > > > > >> > > > > >> > > > >> >> >> >> > Also note, there are some scala > > > >> > > requests/responses > > > >> > > > > >> used > > > >> > > > > >> > > > >> directly in > > > >> > > > > >> > > > >> >> >> >> > SimpleConsumer. Since that's our > public > > > >> api, > > > >> > we > > > >> > > > > can't > > > >> > > > > >> > > remove > > > >> > > > > >> > > > >> those > > > >> > > > > >> > > > >> >> >> scala > > > >> > > > > >> > > > >> >> >> >> > objects until the old consumer is > > phased > > > >> out. > > > >> > We > > > >> > > > can > > > >> > > > > >> > remove > > > >> > > > > >> > > > the > > > >> > > > > >> > > > >> >> rest > > > >> > > > > >> > > > >> >> >> of > > > >> > > > > >> > > > >> >> >> >> the > > > >> > > > > >> > > > >> >> >> >> > scala request objects. > > > >> > > > > >> > > > >> >> >> >> > > > > >> > > > > >> > > > >> >> >> >> > Thanks, > > > >> > > > > >> > > > >> >> >> >> > > > > >> > > > > >> > > > >> >> >> >> > Jun > > > >> > > > > >> > > > >> >> >> >> > > > > >> > > > > >> > > > >> >> >> >> > > > > >> > > > > >> > > > >> >> >> >> > On Tue, Mar 17, 2015 at 6:08 PM, Gwen > > > >> Shapira > > > >> > < > > > >> > > > > >> > > > >> >> gshap...@cloudera.com> > > > >> > > > > >> > > > >> >> >> >> wrote: > > > >> > > > > >> > > > >> >> >> >> > > > > >> > > > > >> > > > >> >> >> >> >> Hi, > > > >> > > > > >> > > > >> >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> >> I'm starting this thread for the > > various > > > >> > > > questions > > > >> > > > > I > > > >> > > > > >> run > > > >> > > > > >> > > > into > > > >> > > > > >> > > > >> >> while > > > >> > > > > >> > > > >> >> >> >> >> refactoring the server to use client > > > >> requests > > > >> > > and > > > >> > > > > >> > > responses. > > > >> > > > > >> > > > >> >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> >> Help is appreciated :) > > > >> > > > > >> > > > >> >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> >> First question: LEADER_AND_ISR > request > > > and > > > >> > > > > >> STOP_REPLICA > > > >> > > > > >> > > > >> request > > > >> > > > > >> > > > >> >> are > > > >> > > > > >> > > > >> >> >> >> >> unimplemented in the client. > > > >> > > > > >> > > > >> >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> >> Do we want to implement them as part > > of > > > >> this > > > >> > > > > >> > refactoring? > > > >> > > > > >> > > > >> >> >> >> >> Or should we continue using the > scala > > > >> > > > > implementation > > > >> > > > > >> for > > > >> > > > > >> > > > >> those? > > > >> > > > > >> > > > >> >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> >> Gwen > > > >> > > > > >> > > > >> >> >> >> >> > > > >> > > > > >> > > > >> >> >> >> > > > >> > > > > >> > > > >> >> >> > > > >> > > > > >> > > > >> >> > > > >> > > > > >> > > > >> > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > >