I'm thinking of a different approach, that will not fix everything, but
will allow adding new requests without code duplication (and therefore
unblock KIP-4):

RequestChannel.request currently takes a buffer and parses it into an "old"
request object. Since the objects are byte-compatibly, we should be able to
parse existing requests into both old and new objects. New requests will
only be parsed into new objects.

Basically:
val requestId = buffer.getShort()
if (requestId in keyToNameAndDeserializerMap) {
   requestObj = RequestKeys.deserializerForKey(requestId)(buffer)
   header: RequestHeader = RequestHeader.parse(buffer)
   body: Struct =
ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
} else {
   requestObj = null
    header: RequestHeader = RequestHeader.parse(buffer)
   body: Struct =
ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
}

This way existing KafkaApis will keep working as normal. The new Apis can
implement just the new header/body requests.
We'll do the same on the send-side: BoundedByteBufferSend can have a
constructor that takes header/body instead of just a response object.

Does that make sense?

Once we have this in, we can move to:
* Adding the missing request/response to the client code
* Replacing requests that can be replaced

It will also make life easier by having us review and tests smaller chunks
of work (the existing patch is *huge* , touches nearly every core component
and I'm not done yet...)

Gwen




On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Ack, yeah, forgot about that.
>
> It's not just a difference of wrappers. The server side actually sends the
> bytes lazily using FileChannel.transferTo. We need to make it possible to
> carry over that optimization. In some sense what we want to be able to do
> is set a value to a Send instead of a ByteBuffer.
>
> Let me try to add that support to the protocol definition stuff, will
> probably take me a few days to free up time.
>
> -Jay
>
> On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira <gshap...@cloudera.com>
> wrote:
>
> > In case anyone is still following this thread, I need a bit of help :)
> >
> > The old FetchResponse.PartitionData included a MessageSet object.
> > The new FetchResponse.PartitionData includes a ByteBuffer.
> >
> > However, when we read from logs, we return a MessageSet, and as far as I
> > can see, these can't be converted to ByteBuffers (at least not without
> > copying their data).
> >
> > Did anyone consider how to reconcile the MessageSets with the new
> > FetchResponse objects?
> >
> > Gwen
> >
> >
> > On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira <gshap...@cloudera.com>
> > wrote:
> >
> > > Note: I'm also treating ZkUtils as if it was a public API (i.e.
> > converting
> > > objects that are returned into o.a.k.common equivalents but not
> changing
> > > ZkUtils itself).
> > > I know its not public, but I suspect I'm not the only developer here
> who
> > > has tons of external code that uses it.
> > >
> > > Gwen
> > >
> > > On Wed, Mar 18, 2015 at 5:48 PM, Gwen Shapira <gshap...@cloudera.com>
> > > wrote:
> > >
> > >> We can't rip them out completely, unfortunately - the SimpleConsumer
> > uses
> > >> them.
> > >>
> > >> So we'll need conversion at some point. I'll try to make the
> > >> conversion point "just before hitting a public API that we can't
> > >> modify", and hopefully it won't look too arbitrary.
> > >>
> > >>
> > >>
> > >> On Wed, Mar 18, 2015 at 5:24 PM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >> > I think either approach is okay in the short term. However our goal
> > >> should
> > >> > be to eventually get rid of that duplicate code, so if you are up
> for
> > >> just
> > >> > ripping and cutting that may get us there sooner.
> > >> >
> > >> > -Jay
> > >> >
> > >> > On Wed, Mar 18, 2015 at 5:19 PM, Gwen Shapira <
> gshap...@cloudera.com>
> > >> wrote:
> > >> >
> > >> >> Thanks!
> > >> >>
> > >> >> Another clarification:
> > >> >> The Common request/responses use slightly different infrastructure
> > >> >> objects: Node instead of Broker, TopicPartition instead of
> > >> >> TopicAndPartition and few more.
> > >> >>
> > >> >> I can write utilities to convert Node to Broker to minimize the
> scope
> > >> >> of the change.
> > >> >> Or I can start replacing Brokers with Nodes across the board.
> > >> >>
> > >> >> I'm currently taking the second approach - i.e, if MetadataRequest
> is
> > >> >> now returning Node, I'm changing the entire line of dependencies to
> > >> >> use Nodes instead of broker.
> > >> >>
> > >> >> Is this acceptable, or do we want to take a more minimal approach
> for
> > >> >> this patch and do a larger replacement as a follow up?
> > >> >>
> > >> >> Gwen
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >> On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps <jay.kr...@gmail.com>
> > >> wrote:
> > >> >> > Great.
> > >> >> >
> > >> >> > For (3) yeah I think we should just think through the end-to-end
> > >> pattern
> > >> >> > for these versioned requests since it seems like we will have a
> > >> number of
> > >> >> > them. The serialization code used as you described gets us to the
> > >> right
> > >> >> > Struct which the user would then wrap in something like
> > >> ProduceRequest.
> > >> >> > Presumably there would just be one ProduceRequest that would
> > >> internally
> > >> >> > fill in things like null or otherwise adapt the struct to a
> usable
> > >> >> object.
> > >> >> > On the response side we would have the version from the request
> to
> > >> use
> > >> >> for
> > >> >> > correct versioning. On question is whether this is enough or
> > whether
> > >> we
> > >> >> > need to have switches in KafkaApis to do things like
> > >> >> >    if(produceRequest.version == 3)
> > >> >> >        // do something
> > >> >> >    else
> > >> >> >       // do something else
> > >> >> >
> > >> >> > Basically it would be good to be able to write a quick wiki that
> > was
> > >> like
> > >> >> > "how to add or modify a kafka api" that explained the right way
> to
> > >> do all
> > >> >> > this.
> > >> >> >
> > >> >> > I don't think any of this necessarily blocks this ticket since at
> > the
> > >> >> > moment we don't have tons of versions of requests out there.
> > >> >> >
> > >> >> > -Jay
> > >> >> >
> > >> >> > On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira <
> > gshap...@cloudera.com
> > >> >
> > >> >> wrote:
> > >> >> >
> > >> >> >> See inline responses:
> > >> >> >>
> > >> >> >> On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps <jay.kr...@gmail.com
> >
> > >> wrote:
> > >> >> >> > Hey Gwen,
> > >> >> >> >
> > >> >> >> > This makes sense to me.
> > >> >> >> >
> > >> >> >> > A couple of thoughts, mostly confirming what you said I think:
> > >> >> >> >
> > >> >> >> >    1. Ideally we would move completely over to the new style
> of
> > >> >> request
> > >> >> >> >    definition for server-side processing, even for the
> internal
> > >> >> >> requests. This
> > >> >> >> >    way all requests would have the same header/body struct
> > stuff.
> > >> As
> > >> >> you
> > >> >> >> say
> > >> >> >> >    for the internal requests we can just delete the scala
> code.
> > >> For
> > >> >> the
> > >> >> >> old
> > >> >> >> >    clients they will continue to use their old request
> > definitions
> > >> >> until
> > >> >> >> we
> > >> >> >> >    eol them. I would propose that new changes will go only
> into
> > >> the
> > >> >> new
> > >> >> >> >    request/response objects and the old scala ones will be
> > >> permanently
> > >> >> >> stuck
> > >> >> >> >    on their current version until discontinued. So after this
> > >> change
> > >> >> >> that old
> > >> >> >> >    scala code could be considered frozen.
> > >> >> >>
> > >> >> >> SimpleConsumer is obviously stuck with the old request/response.
> > >> >> >>
> > >> >> >> The Producers can be converted to the common request/response
> > >> without
> > >> >> >> breaking compatibility.
> > >> >> >> I think we should do this (even though it requires fiddling with
> > >> >> >> additional network serialization code), just so we can throw the
> > old
> > >> >> >> ProduceRequest away.
> > >> >> >>
> > >> >> >> Does that make sense?
> > >> >> >>
> > >> >> >>
> > >> >> >> >    2. I think it would be reasonable to keep all the requests
> > >> under
> > >> >> >> common,
> > >> >> >> >    even though as you point out there is currently no use for
> > >> some of
> > >> >> >> them
> > >> >> >> >    beyond broker-to-broker communication at the moment.
> > >> >> >>
> > >> >> >> Yep.
> > >> >> >>
> > >> >> >> >    3. We should think a little about how versioning will work.
> > >> Making
> > >> >> >> this
> > >> >> >> >    convenient on the server side is an important goal for the
> > new
> > >> >> style
> > >> >> >> of
> > >> >> >> >    request definition. At the serialization level we now
> handle
> > >> >> >> versioning but
> > >> >> >> >    the question we should discuss and work out is how this
> will
> > >> map to
> > >> >> >> the
> > >> >> >> >    request objects (which I assume will remain unversioned).
> > >> >> >>
> > >> >> >> The way I see it working (I just started on this, so I may have
> > >> gaps):
> > >> >> >>
> > >> >> >> * Request header contains the version
> > >> >> >> * When we read the request, we use ProtoUtils.requestSchema
> which
> > >> >> >> takes version as a parameter and is responsible to give us the
> > right
> > >> >> >> Schema, which we use to read the buffer and get the correct
> > struct.
> > >> >> >> * KafkaApis handlers have the header, so they can use it to
> access
> > >> the
> > >> >> >> correct fields, build the correct response, etc.
> > >> >> >>
> > >> >> >> Does that sound about right?
> > >> >> >>
> > >> >> >>
> > >> >> >> >    4. Ideally after this refactoring the network package
> should
> > >> not be
> > >> >> >> >    dependent on the individual request objects. The intention
> is
> > >> that
> > >> >> >> stuff in
> > >> >> >> >    kafka.network is meant to be generic network infrastructure
> > >> that
> > >> >> >> doesn't
> > >> >> >> >    know about the particular fetch/produce apis we have
> > >> implemented on
> > >> >> >> top.
> > >> >> >>
> > >> >> >> I'll make a note to validate that this is the case.
> > >> >> >>
> > >> >> >> >
> > >> >> >> > -Jay
> > >> >> >> >
> > >> >> >> > On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira <
> > >> gshap...@cloudera.com
> > >> >> >
> > >> >> >> > wrote:
> > >> >> >> >
> > >> >> >> >> Hi Jun,
> > >> >> >> >>
> > >> >> >> >> I was taking a slightly different approach. Let me know if it
> > >> makes
> > >> >> >> >> sense to you:
> > >> >> >> >>
> > >> >> >> >> 1. Get the bytes from network (kinda unavoidable...)
> > >> >> >> >> 2. Modify RequestChannel.Request to contain header and body
> > >> (instead
> > >> >> >> >> of a single object)
> > >> >> >> >> 3. Create the head and body from bytes as follow:
> > >> >> >> >>     val header: RequestHeader = RequestHeader.parse(buffer)
> > >> >> >> >>     val apiKey: Int = header.apiKey
> > >> >> >> >>     val body: Struct =
> > >> >> >> >>
> > >> >> >>
> > >> >>
> > >>
> > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
> > >> >> >> >> 4. KafkaAPIs will continue getting RequestChannel.Request,
> but
> > >> will
> > >> >> >> >> now have access to body and header separately.
> > >> >> >> >>
> > >> >> >> >> I agree that I need a Request/Response objects that contain
> > only
> > >> the
> > >> >> >> >> body for all requests objects.
> > >> >> >> >> I'm thinking of implementing them in o.a.k.Common.Requests in
> > >> Java
> > >> >> for
> > >> >> >> >> consistency.
> > >> >> >> >>
> > >> >> >> >> When we are discussing the requests/responses used in
> > >> SimpleConsumer,
> > >> >> >> >> we mean everything used in javaapi, right?
> > >> >> >> >>
> > >> >> >> >> Gwen
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >> On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao <j...@confluent.io>
> > >> wrote:
> > >> >> >> >> > Hi, Gwen,
> > >> >> >> >> >
> > >> >> >> >> > I was thinking that we will be doing the following in
> > >> KAFKA-1927.
> > >> >> >> >> >
> > >> >> >> >> > 1. Get the bytes from network.
> > >> >> >> >> > 2. Use a new generic approach to convert bytes into request
> > >> >> objects.
> > >> >> >> >> > 2.1 Read the fixed request header (using the util in
> client).
> > >> >> >> >> > 2.2 Based on the request id in the header, deserialize the
> > >> rest of
> > >> >> the
> > >> >> >> >> > bytes into a request specific object (using the new java
> > >> objects).
> > >> >> >> >> > 3. We will then be passing a header and an
> > >> AbstractRequestResponse
> > >> >> to
> > >> >> >> >> > KafkaApis.
> > >> >> >> >> >
> > >> >> >> >> > In order to do that, we will need to create similar
> > >> >> request/response
> > >> >> >> >> > objects for internal requests such as StopReplica,
> > >> LeaderAndIsr,
> > >> >> >> >> > UpdateMetadata, ControlledShutdown. Not sure whether they
> > >> should be
> > >> >> >> >> written
> > >> >> >> >> > in java or scala, but perhaps they should be only in the
> core
> > >> >> project.
> > >> >> >> >> >
> > >> >> >> >> > Also note, there are some scala requests/responses used
> > >> directly in
> > >> >> >> >> > SimpleConsumer. Since that's our public api, we can't
> remove
> > >> those
> > >> >> >> scala
> > >> >> >> >> > objects until the old consumer is phased out. We can remove
> > the
> > >> >> rest
> > >> >> >> of
> > >> >> >> >> the
> > >> >> >> >> > scala request objects.
> > >> >> >> >> >
> > >> >> >> >> > Thanks,
> > >> >> >> >> >
> > >> >> >> >> > Jun
> > >> >> >> >> >
> > >> >> >> >> >
> > >> >> >> >> > On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira <
> > >> >> gshap...@cloudera.com>
> > >> >> >> >> wrote:
> > >> >> >> >> >
> > >> >> >> >> >> Hi,
> > >> >> >> >> >>
> > >> >> >> >> >> I'm starting this thread for the various questions I run
> > into
> > >> >> while
> > >> >> >> >> >> refactoring the server to use client requests and
> responses.
> > >> >> >> >> >>
> > >> >> >> >> >> Help is appreciated :)
> > >> >> >> >> >>
> > >> >> >> >> >> First question: LEADER_AND_ISR request and STOP_REPLICA
> > >> request
> > >> >> are
> > >> >> >> >> >> unimplemented in the client.
> > >> >> >> >> >>
> > >> >> >> >> >> Do we want to implement them as part of this refactoring?
> > >> >> >> >> >> Or should we continue using the scala implementation for
> > >> those?
> > >> >> >> >> >>
> > >> >> >> >> >> Gwen
> > >> >> >> >> >>
> > >> >> >> >>
> > >> >> >>
> > >> >>
> > >>
> > >
> > >
> >
>

Reply via email to