Hey Jay,

3,4. What I meant is it is that you still need to set/get the field values
by their field names:

// in toStruct
body.set("topics", topics.toArray());

// in toMetadataResponse
int nodeId = (Integer) broker.get("node_id");

And I was propose just the opposite of your understanding: after the
protocol file is defined, auto-generate the the toStruct and parseRequest
functions; if this cannot be done I would rather enforce programmers to
only using the field object references and not allowing field names, since
string values may be used for other fields during a version change.

Guozhang





On Fri, Feb 7, 2014 at 3:23 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> If what you are saying is that we should define the java code and kind of
> "automatically" generate the protocol off of that then I would be against
> that as I really want the protocol definition to be the source of truth. It
> should be 100% obvious when you are changing the protocol.
>
> -Jay
>
>
> On Fri, Feb 7, 2014 at 3:22 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Hey Guozhang,
> >
> > 1,2. Yes, you need to define the new format and add it to the list of
> > versions.
> > 3. Not necessarily. It depends on the change. If the formats are totally
> > different the code can take the version and just have some if statement
> to
> > read one or the other. But most changes are just the simple addition or
> > removal of fields. In this case the schema DSL supports default values
> that
> > will be filled in to help ease backwards compatability for simple cases.
> > 4. For the client presumably you always use the latest version so the
> > existing logic new
> Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA
> > .id)) should suffice?
> >
> > I'm not sure if I know what it is you are proposing. Care to elaborate?
> >
> > -Jay
> >
> >
> > On Fri, Feb 7, 2014 at 2:29 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Just to clarify, in this case when we are evolving, say MetadataResponse
> >> to
> >> version 1, We will:
> >>
> >> 1. In Protocol.java, add another variable METADATA_RESPONSE_V1
> >>
> >> 2. In Protocol.java, change this line to:
> >>
> >> public static Schema[] METADATA_RESPONSE = new Schema[] {
> >> METADATA_RESPONSE_V1 };
> >>
> >> 3. In MetadataResponse.java, change the bytes-to-object function (i.e.,
> >> MetadataResponse(Struct struct)) to have both parsing logic for V0 and
> V1.
> >>
> >> 4. In MetadataResponse.java, change the object-to-bytes function (i.e.,
> >> Struct toStruct() ) to V1.
> >>
> >> Is this correct?
> >>
> >> My feeling is that it would be good if we do not have to get the field
> >> names like "topics" in multiple places: Protocol and MetadataResponse,
> for
> >> example, and although the toStruct() only needs to maintain the logic
> for
> >> the latest version, MetadataResponse(Struct struct) would probably needs
> >> to
> >> handle all versions, which will make it very complicated. So I am
> >> wondering
> >> can these two functions be provided automatically? I am not advocating
> for
> >> another round of compilation but would like to see if it is possible for
> >> these procedures be done in a more programmable way.
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >>
> >> > Okay this is the last discussion item for the new client code. :-)
> >> >
> >> > Previously to define an api you would implement a request and response
> >> > scala object that read and wrote its own bytes. There were a few
> >> problems
> >> > with this:
> >> > 1. The consistency of the protocol was very hard to maintain.
> >> > 2. You ended up hand-coding size estimation which was very tedious and
> >> > error prone
> >> > 3. Error messages wouldn't give any field name information you would
> >> just
> >> > get some kind of BufferUnderflowException with no information about
> >> what or
> >> > why. Fixing these were hard because each object would have to
> implement
> >> > this good error handling.
> >> > 4. There wasn't good support for api versioning. We have an api
> version
> >> > that is included in the request, but there was no easy way to maintain
> >> both
> >> > the old format and the new format.
> >> > 5. The header information was baked into each request and it was only
> >> > though great care that we could keep the header standard throughout
> the
> >> > requests.
> >> > 6. The same class that defined the protocol was used throughout the
> >> code.
> >> > So what were intended to be dumb DTOs ended up getting loaded up with
> >> > domain logic. Invariably aspects of this representation would end up
> >> > leaking into the protocol.
> >> > 7. It was very hard to figure out what the protocol was from the code
> >> since
> >> > the definition was embedded in byte munging code spread out over
> dozens
> >> of
> >> > files.
> >> >
> >> > So that was definitely bad.
> >> >
> >> > We considered moving to an off-the-shelf protocol definition language
> >> like
> >> > avro or protocol buffers. But prior experience with these is that they
> >> are
> >> > great for whipping together a quick service but for a stable protocol
> >> it is
> >> > actually better to define the protocol rather than specifying an
> >> > implementation like avro or protocol buffers. This is similar to what
> is
> >> > done with AMQP which I think does a fantastic job of providing a well
> >> > specified messaging protocol (that protocol is not suitable for the
> >> type of
> >> > system we are building, but their method of specifying it I think is
> >> very
> >> > good).
> >> >
> >> > So the conclusion was to retain our BNF-specified protocol and instead
> >> > implement a simple library for implementing this protocol. This would
> >> have
> >> > the advantage of letting us retain our existing protocol and also to
> >> add a
> >> > few Kafka-specific optimizations. This library is just a helper
> utility
> >> for
> >> > implementing our protocol spec, the spec remains the source of truth.
> >> >
> >> > I implemented this as part of the new client effort. I will describe
> >> how my
> >> > library works and the pattern I think we should use with it.
> >> >
> >> > The code for defining the protocol is in
> >> > org.apache.kafka.common.protocol.types. Note that this is meant to be
> a
> >> > stand-alone library for serialization, it doesn't know anything about
> >> our
> >> > actual request and responses or even that the messages being defined
> >> will
> >> > be sent over a network. The definition of our protocol is defined in
> >> > org.apache.kafka.common.protocol.Protocol, this is just the protocol
> >> and is
> >> > decoupled from the network layer and everything else.
> >> >
> >> > We define a set of types that match our protocol, namely:
> >> > - fixed length primitives: int8, int16, int32, int64
> >> > - variable-length primitives: string, bytes
> >> > - container types: arrayof, struct
> >> >
> >> > You define a message using types. All types extend
> >> > org.apache.kafka.common.protocol.types.Type.java. Each type knows how
> to
> >> > read, write, validate, and estimate the size of a single java object
> >> type.
> >> > Here is the correspondence
> >> >   Type.INT8: java.lang.Byte
> >> >   Type.INT16: java.lang.Short
> >> >   Type.INT32: java.lang.Integer
> >> >   Type.INT32: java.lang.Long
> >> >   Type.STRING: java.lang.String
> >> >   Type.BYTES: java.nio.ByteBuffer
> >> >   ArrayOf: Object[]
> >> >   Schema: Struct
> >> > The correspondence here can be thought of as that between a class and
> an
> >> > object: the class specifies the layout of the object, the object is an
> >> > instantiation of that class with particular values. Each message is
> >> defined
> >> > by a Schema, which can be used to read and write a Struct. The schema
> >> > specifies the fields in the type, and the Struct is an "instantiation"
> >> of
> >> > those fields with actual values. A struct can be thought of as a
> special
> >> > purpose hashmap.
> >> >
> >> > An example will make this more clear. Here is how you define the
> request
> >> > header schema:
> >> >    new Schema(new Field("api_key", INT16, "The id of the request
> >> type."),
> >> >               new Field("api_version", INT16, "The version of the
> >> API."),
> >> >               new Field("correlation_id", INT32, "documentation
> >> string"),
> >> >               new Field("client_id", STRING, "more documentation."));
> >> >
> >> > So a request header is a message that consists of a short api key
> >> followed
> >> > by a short api version followed by a correlation id and client id.
> >> >
> >> > Here is a more complex example, the producer response:
> >> >
> >> > new Schema(new Field("responses", new ArrayOf(new Schema(new
> >> Field("topic",
> >> > STRING), new Field("partition_responses", new ArrayOf(new Schema(new
> >> Field(
> >> > "partition", INT32), new Field("error_code", INT16), new
> >> > Field("base_offset"
> >> > , INT64))))))))
> >> >
> >> > (indentation in email is tricky). Note that this has a schema which
> >> > contains an array of sub-records which in turn have a sub-array of
> >> records.
> >> > As this nesting gets more complicated it can get a bit hard to read,
> so
> >> you
> >> > can break it up using variables. An equivalent definition would be:
> >> >
> >> > Schema partitionResponse = new Schema(new Field("partition", INT32),
> >> >
> >> >                                       new Field("error_code", INT16),
> >> >
> >> >                                       new Field("base_offset",
> INT64));
> >> >
> >> > Schema topicResponse = new Schema(new Field("topic", STRING),
> >> >
> >> >                                   new Field("partition_responses", new
> >> > ArrayOf(partitionResponse)));
> >> >
> >> > Schema producerResposne = new Schema(new Field("responses", new
> >> >  ArrayOf(topicResponse)));
> >> >
> >> > Note that this is exactly equivalent.
> >> >
> >> > Okay once such a schema is defined you can write an object in the
> >> following
> >> > way:
> >> >
> >> > Struct header = new Struct(headerSchema);
> >> >
> >> > header.set("api_key", (short) 1);
> >> >
> >> > header.set("api_version", (short), 0);
> >> >
> >> > ...
> >> >
> >> > headerSchema.write(buffer, header);
> >> >
> >> > And you can read an instance of a header by doing:
> >> >
> >> > Struct header = headerSchema.read(buffer);
> >> >
> >> > Short apiKey = (Short) header.get("api_key");
> >> >
> >> > Field apiVersionField = header.field("api_version");
> >> >
> >> > Short apiKey = header.get(apiVersionField);
> >> >
> >> > Note the two different field access styles. Accessing a field by name
> >> has
> >> > the performance of a hash table lookup. However for performance
> critical
> >> > situations you can get the Field object that represents that entry in
> >> the
> >> > struct. Getting this field object takes a hash table lookup but once
> you
> >> > have it it will get that field out of any instance of that struct with
> >> the
> >> > performance of an array access. So this is useful in cases where you
> can
> >> > statically fetch all the fields and then use them on every request
> (and
> >> > assuming you need to optimize performance).
> >> >
> >> > These raw structs are logic-free and act as the "DTO" for data that
> >> will be
> >> > sent over the network.
> >> >
> >> > For the more complex requests and responses interacting with the raw
> >> struct
> >> > is not very pleasent. My recommendation is that we still maintain a
> java
> >> > object that is the "domain object" for the request and knows how to
> read
> >> > and write itself to the struct. This is what you would end up passing
> >> down
> >> > into KafkaApis. This will have all the convenience methods that people
> >> were
> >> > wanting to add to the protocol objects before. The downside of this is
> >> that
> >> > in some ways you define the request twice, but I think both of these
> >> layers
> >> > are actually needed and would evolve independently (the struct only
> when
> >> > the protocol changes and the domain object with the needs of the code
> >> that
> >> > use it). I haven't actually done this in the produce yet, in part
> >> because I
> >> > think to make these domain objects properly you need to use them on
> the
> >> > server side too which we aren't ready for yet. However I did add a
> >> version
> >> > of this for metadata on KAFKA-1238 here:
> >> >
> >> >
> >> >
> >>
> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch
> >> >
> >> > Okay, it would be great to get feedback on this code and this general
> >> > approach to protocol definition. If everyone likes it then I am going
> to
> >> > consider all the discussion items for the new code wrapped up and move
> >> on
> >> > to the more detailed code review and testing.
> >> >
> >> > -Jay
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to