Hey Guozhang,

1,2. Yes, you need to define the new format and add it to the list of
versions.
3. Not necessarily. It depends on the change. If the formats are totally
different the code can take the version and just have some if statement to
read one or the other. But most changes are just the simple addition or
removal of fields. In this case the schema DSL supports default values that
will be filled in to help ease backwards compatability for simple cases.
4. For the client presumably you always use the latest version so the
existing logic new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.
id)) should suffice?

I'm not sure if I know what it is you are proposing. Care to elaborate?

-Jay


On Fri, Feb 7, 2014 at 2:29 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Just to clarify, in this case when we are evolving, say MetadataResponse to
> version 1, We will:
>
> 1. In Protocol.java, add another variable METADATA_RESPONSE_V1
>
> 2. In Protocol.java, change this line to:
>
> public static Schema[] METADATA_RESPONSE = new Schema[] {
> METADATA_RESPONSE_V1 };
>
> 3. In MetadataResponse.java, change the bytes-to-object function (i.e.,
> MetadataResponse(Struct struct)) to have both parsing logic for V0 and V1.
>
> 4. In MetadataResponse.java, change the object-to-bytes function (i.e.,
> Struct toStruct() ) to V1.
>
> Is this correct?
>
> My feeling is that it would be good if we do not have to get the field
> names like "topics" in multiple places: Protocol and MetadataResponse, for
> example, and although the toStruct() only needs to maintain the logic for
> the latest version, MetadataResponse(Struct struct) would probably needs to
> handle all versions, which will make it very complicated. So I am wondering
> can these two functions be provided automatically? I am not advocating for
> another round of compilation but would like to see if it is possible for
> these procedures be done in a more programmable way.
>
> Guozhang
>
>
>
> On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Okay this is the last discussion item for the new client code. :-)
> >
> > Previously to define an api you would implement a request and response
> > scala object that read and wrote its own bytes. There were a few problems
> > with this:
> > 1. The consistency of the protocol was very hard to maintain.
> > 2. You ended up hand-coding size estimation which was very tedious and
> > error prone
> > 3. Error messages wouldn't give any field name information you would just
> > get some kind of BufferUnderflowException with no information about what
> or
> > why. Fixing these were hard because each object would have to implement
> > this good error handling.
> > 4. There wasn't good support for api versioning. We have an api version
> > that is included in the request, but there was no easy way to maintain
> both
> > the old format and the new format.
> > 5. The header information was baked into each request and it was only
> > though great care that we could keep the header standard throughout the
> > requests.
> > 6. The same class that defined the protocol was used throughout the code.
> > So what were intended to be dumb DTOs ended up getting loaded up with
> > domain logic. Invariably aspects of this representation would end up
> > leaking into the protocol.
> > 7. It was very hard to figure out what the protocol was from the code
> since
> > the definition was embedded in byte munging code spread out over dozens
> of
> > files.
> >
> > So that was definitely bad.
> >
> > We considered moving to an off-the-shelf protocol definition language
> like
> > avro or protocol buffers. But prior experience with these is that they
> are
> > great for whipping together a quick service but for a stable protocol it
> is
> > actually better to define the protocol rather than specifying an
> > implementation like avro or protocol buffers. This is similar to what is
> > done with AMQP which I think does a fantastic job of providing a well
> > specified messaging protocol (that protocol is not suitable for the type
> of
> > system we are building, but their method of specifying it I think is very
> > good).
> >
> > So the conclusion was to retain our BNF-specified protocol and instead
> > implement a simple library for implementing this protocol. This would
> have
> > the advantage of letting us retain our existing protocol and also to add
> a
> > few Kafka-specific optimizations. This library is just a helper utility
> for
> > implementing our protocol spec, the spec remains the source of truth.
> >
> > I implemented this as part of the new client effort. I will describe how
> my
> > library works and the pattern I think we should use with it.
> >
> > The code for defining the protocol is in
> > org.apache.kafka.common.protocol.types. Note that this is meant to be a
> > stand-alone library for serialization, it doesn't know anything about our
> > actual request and responses or even that the messages being defined will
> > be sent over a network. The definition of our protocol is defined in
> > org.apache.kafka.common.protocol.Protocol, this is just the protocol and
> is
> > decoupled from the network layer and everything else.
> >
> > We define a set of types that match our protocol, namely:
> > - fixed length primitives: int8, int16, int32, int64
> > - variable-length primitives: string, bytes
> > - container types: arrayof, struct
> >
> > You define a message using types. All types extend
> > org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
> > read, write, validate, and estimate the size of a single java object
> type.
> > Here is the correspondence
> >   Type.INT8: java.lang.Byte
> >   Type.INT16: java.lang.Short
> >   Type.INT32: java.lang.Integer
> >   Type.INT32: java.lang.Long
> >   Type.STRING: java.lang.String
> >   Type.BYTES: java.nio.ByteBuffer
> >   ArrayOf: Object[]
> >   Schema: Struct
> > The correspondence here can be thought of as that between a class and an
> > object: the class specifies the layout of the object, the object is an
> > instantiation of that class with particular values. Each message is
> defined
> > by a Schema, which can be used to read and write a Struct. The schema
> > specifies the fields in the type, and the Struct is an "instantiation" of
> > those fields with actual values. A struct can be thought of as a special
> > purpose hashmap.
> >
> > An example will make this more clear. Here is how you define the request
> > header schema:
> >    new Schema(new Field("api_key", INT16, "The id of the request type."),
> >               new Field("api_version", INT16, "The version of the API."),
> >               new Field("correlation_id", INT32, "documentation string"),
> >               new Field("client_id", STRING, "more documentation."));
> >
> > So a request header is a message that consists of a short api key
> followed
> > by a short api version followed by a correlation id and client id.
> >
> > Here is a more complex example, the producer response:
> >
> > new Schema(new Field("responses", new ArrayOf(new Schema(new
> Field("topic",
> > STRING), new Field("partition_responses", new ArrayOf(new Schema(new
> Field(
> > "partition", INT32), new Field("error_code", INT16), new
> > Field("base_offset"
> > , INT64))))))))
> >
> > (indentation in email is tricky). Note that this has a schema which
> > contains an array of sub-records which in turn have a sub-array of
> records.
> > As this nesting gets more complicated it can get a bit hard to read, so
> you
> > can break it up using variables. An equivalent definition would be:
> >
> > Schema partitionResponse = new Schema(new Field("partition", INT32),
> >
> >                                       new Field("error_code", INT16),
> >
> >                                       new Field("base_offset", INT64));
> >
> > Schema topicResponse = new Schema(new Field("topic", STRING),
> >
> >                                   new Field("partition_responses", new
> > ArrayOf(partitionResponse)));
> >
> > Schema producerResposne = new Schema(new Field("responses", new
> >  ArrayOf(topicResponse)));
> >
> > Note that this is exactly equivalent.
> >
> > Okay once such a schema is defined you can write an object in the
> following
> > way:
> >
> > Struct header = new Struct(headerSchema);
> >
> > header.set("api_key", (short) 1);
> >
> > header.set("api_version", (short), 0);
> >
> > ...
> >
> > headerSchema.write(buffer, header);
> >
> > And you can read an instance of a header by doing:
> >
> > Struct header = headerSchema.read(buffer);
> >
> > Short apiKey = (Short) header.get("api_key");
> >
> > Field apiVersionField = header.field("api_version");
> >
> > Short apiKey = header.get(apiVersionField);
> >
> > Note the two different field access styles. Accessing a field by name has
> > the performance of a hash table lookup. However for performance critical
> > situations you can get the Field object that represents that entry in the
> > struct. Getting this field object takes a hash table lookup but once you
> > have it it will get that field out of any instance of that struct with
> the
> > performance of an array access. So this is useful in cases where you can
> > statically fetch all the fields and then use them on every request (and
> > assuming you need to optimize performance).
> >
> > These raw structs are logic-free and act as the "DTO" for data that will
> be
> > sent over the network.
> >
> > For the more complex requests and responses interacting with the raw
> struct
> > is not very pleasent. My recommendation is that we still maintain a java
> > object that is the "domain object" for the request and knows how to read
> > and write itself to the struct. This is what you would end up passing
> down
> > into KafkaApis. This will have all the convenience methods that people
> were
> > wanting to add to the protocol objects before. The downside of this is
> that
> > in some ways you define the request twice, but I think both of these
> layers
> > are actually needed and would evolve independently (the struct only when
> > the protocol changes and the domain object with the needs of the code
> that
> > use it). I haven't actually done this in the produce yet, in part
> because I
> > think to make these domain objects properly you need to use them on the
> > server side too which we aren't ready for yet. However I did add a
> version
> > of this for metadata on KAFKA-1238 here:
> >
> >
> >
> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch
> >
> > Okay, it would be great to get feedback on this code and this general
> > approach to protocol definition. If everyone likes it then I am going to
> > consider all the discussion items for the new code wrapped up and move on
> > to the more detailed code review and testing.
> >
> > -Jay
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to