If what you are saying is that we should define the java code and kind of
"automatically" generate the protocol off of that then I would be against
that as I really want the protocol definition to be the source of truth. It
should be 100% obvious when you are changing the protocol.

-Jay


On Fri, Feb 7, 2014 at 3:22 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Hey Guozhang,
>
> 1,2. Yes, you need to define the new format and add it to the list of
> versions.
> 3. Not necessarily. It depends on the change. If the formats are totally
> different the code can take the version and just have some if statement to
> read one or the other. But most changes are just the simple addition or
> removal of fields. In this case the schema DSL supports default values that
> will be filled in to help ease backwards compatability for simple cases.
> 4. For the client presumably you always use the latest version so the
> existing logic new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA
> .id)) should suffice?
>
> I'm not sure if I know what it is you are proposing. Care to elaborate?
>
> -Jay
>
>
> On Fri, Feb 7, 2014 at 2:29 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Just to clarify, in this case when we are evolving, say MetadataResponse
>> to
>> version 1, We will:
>>
>> 1. In Protocol.java, add another variable METADATA_RESPONSE_V1
>>
>> 2. In Protocol.java, change this line to:
>>
>> public static Schema[] METADATA_RESPONSE = new Schema[] {
>> METADATA_RESPONSE_V1 };
>>
>> 3. In MetadataResponse.java, change the bytes-to-object function (i.e.,
>> MetadataResponse(Struct struct)) to have both parsing logic for V0 and V1.
>>
>> 4. In MetadataResponse.java, change the object-to-bytes function (i.e.,
>> Struct toStruct() ) to V1.
>>
>> Is this correct?
>>
>> My feeling is that it would be good if we do not have to get the field
>> names like "topics" in multiple places: Protocol and MetadataResponse, for
>> example, and although the toStruct() only needs to maintain the logic for
>> the latest version, MetadataResponse(Struct struct) would probably needs
>> to
>> handle all versions, which will make it very complicated. So I am
>> wondering
>> can these two functions be provided automatically? I am not advocating for
>> another round of compilation but would like to see if it is possible for
>> these procedures be done in a more programmable way.
>>
>> Guozhang
>>
>>
>>
>> On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>
>> > Okay this is the last discussion item for the new client code. :-)
>> >
>> > Previously to define an api you would implement a request and response
>> > scala object that read and wrote its own bytes. There were a few
>> problems
>> > with this:
>> > 1. The consistency of the protocol was very hard to maintain.
>> > 2. You ended up hand-coding size estimation which was very tedious and
>> > error prone
>> > 3. Error messages wouldn't give any field name information you would
>> just
>> > get some kind of BufferUnderflowException with no information about
>> what or
>> > why. Fixing these were hard because each object would have to implement
>> > this good error handling.
>> > 4. There wasn't good support for api versioning. We have an api version
>> > that is included in the request, but there was no easy way to maintain
>> both
>> > the old format and the new format.
>> > 5. The header information was baked into each request and it was only
>> > though great care that we could keep the header standard throughout the
>> > requests.
>> > 6. The same class that defined the protocol was used throughout the
>> code.
>> > So what were intended to be dumb DTOs ended up getting loaded up with
>> > domain logic. Invariably aspects of this representation would end up
>> > leaking into the protocol.
>> > 7. It was very hard to figure out what the protocol was from the code
>> since
>> > the definition was embedded in byte munging code spread out over dozens
>> of
>> > files.
>> >
>> > So that was definitely bad.
>> >
>> > We considered moving to an off-the-shelf protocol definition language
>> like
>> > avro or protocol buffers. But prior experience with these is that they
>> are
>> > great for whipping together a quick service but for a stable protocol
>> it is
>> > actually better to define the protocol rather than specifying an
>> > implementation like avro or protocol buffers. This is similar to what is
>> > done with AMQP which I think does a fantastic job of providing a well
>> > specified messaging protocol (that protocol is not suitable for the
>> type of
>> > system we are building, but their method of specifying it I think is
>> very
>> > good).
>> >
>> > So the conclusion was to retain our BNF-specified protocol and instead
>> > implement a simple library for implementing this protocol. This would
>> have
>> > the advantage of letting us retain our existing protocol and also to
>> add a
>> > few Kafka-specific optimizations. This library is just a helper utility
>> for
>> > implementing our protocol spec, the spec remains the source of truth.
>> >
>> > I implemented this as part of the new client effort. I will describe
>> how my
>> > library works and the pattern I think we should use with it.
>> >
>> > The code for defining the protocol is in
>> > org.apache.kafka.common.protocol.types. Note that this is meant to be a
>> > stand-alone library for serialization, it doesn't know anything about
>> our
>> > actual request and responses or even that the messages being defined
>> will
>> > be sent over a network. The definition of our protocol is defined in
>> > org.apache.kafka.common.protocol.Protocol, this is just the protocol
>> and is
>> > decoupled from the network layer and everything else.
>> >
>> > We define a set of types that match our protocol, namely:
>> > - fixed length primitives: int8, int16, int32, int64
>> > - variable-length primitives: string, bytes
>> > - container types: arrayof, struct
>> >
>> > You define a message using types. All types extend
>> > org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
>> > read, write, validate, and estimate the size of a single java object
>> type.
>> > Here is the correspondence
>> >   Type.INT8: java.lang.Byte
>> >   Type.INT16: java.lang.Short
>> >   Type.INT32: java.lang.Integer
>> >   Type.INT32: java.lang.Long
>> >   Type.STRING: java.lang.String
>> >   Type.BYTES: java.nio.ByteBuffer
>> >   ArrayOf: Object[]
>> >   Schema: Struct
>> > The correspondence here can be thought of as that between a class and an
>> > object: the class specifies the layout of the object, the object is an
>> > instantiation of that class with particular values. Each message is
>> defined
>> > by a Schema, which can be used to read and write a Struct. The schema
>> > specifies the fields in the type, and the Struct is an "instantiation"
>> of
>> > those fields with actual values. A struct can be thought of as a special
>> > purpose hashmap.
>> >
>> > An example will make this more clear. Here is how you define the request
>> > header schema:
>> >    new Schema(new Field("api_key", INT16, "The id of the request
>> type."),
>> >               new Field("api_version", INT16, "The version of the
>> API."),
>> >               new Field("correlation_id", INT32, "documentation
>> string"),
>> >               new Field("client_id", STRING, "more documentation."));
>> >
>> > So a request header is a message that consists of a short api key
>> followed
>> > by a short api version followed by a correlation id and client id.
>> >
>> > Here is a more complex example, the producer response:
>> >
>> > new Schema(new Field("responses", new ArrayOf(new Schema(new
>> Field("topic",
>> > STRING), new Field("partition_responses", new ArrayOf(new Schema(new
>> Field(
>> > "partition", INT32), new Field("error_code", INT16), new
>> > Field("base_offset"
>> > , INT64))))))))
>> >
>> > (indentation in email is tricky). Note that this has a schema which
>> > contains an array of sub-records which in turn have a sub-array of
>> records.
>> > As this nesting gets more complicated it can get a bit hard to read, so
>> you
>> > can break it up using variables. An equivalent definition would be:
>> >
>> > Schema partitionResponse = new Schema(new Field("partition", INT32),
>> >
>> >                                       new Field("error_code", INT16),
>> >
>> >                                       new Field("base_offset", INT64));
>> >
>> > Schema topicResponse = new Schema(new Field("topic", STRING),
>> >
>> >                                   new Field("partition_responses", new
>> > ArrayOf(partitionResponse)));
>> >
>> > Schema producerResposne = new Schema(new Field("responses", new
>> >  ArrayOf(topicResponse)));
>> >
>> > Note that this is exactly equivalent.
>> >
>> > Okay once such a schema is defined you can write an object in the
>> following
>> > way:
>> >
>> > Struct header = new Struct(headerSchema);
>> >
>> > header.set("api_key", (short) 1);
>> >
>> > header.set("api_version", (short), 0);
>> >
>> > ...
>> >
>> > headerSchema.write(buffer, header);
>> >
>> > And you can read an instance of a header by doing:
>> >
>> > Struct header = headerSchema.read(buffer);
>> >
>> > Short apiKey = (Short) header.get("api_key");
>> >
>> > Field apiVersionField = header.field("api_version");
>> >
>> > Short apiKey = header.get(apiVersionField);
>> >
>> > Note the two different field access styles. Accessing a field by name
>> has
>> > the performance of a hash table lookup. However for performance critical
>> > situations you can get the Field object that represents that entry in
>> the
>> > struct. Getting this field object takes a hash table lookup but once you
>> > have it it will get that field out of any instance of that struct with
>> the
>> > performance of an array access. So this is useful in cases where you can
>> > statically fetch all the fields and then use them on every request (and
>> > assuming you need to optimize performance).
>> >
>> > These raw structs are logic-free and act as the "DTO" for data that
>> will be
>> > sent over the network.
>> >
>> > For the more complex requests and responses interacting with the raw
>> struct
>> > is not very pleasent. My recommendation is that we still maintain a java
>> > object that is the "domain object" for the request and knows how to read
>> > and write itself to the struct. This is what you would end up passing
>> down
>> > into KafkaApis. This will have all the convenience methods that people
>> were
>> > wanting to add to the protocol objects before. The downside of this is
>> that
>> > in some ways you define the request twice, but I think both of these
>> layers
>> > are actually needed and would evolve independently (the struct only when
>> > the protocol changes and the domain object with the needs of the code
>> that
>> > use it). I haven't actually done this in the produce yet, in part
>> because I
>> > think to make these domain objects properly you need to use them on the
>> > server side too which we aren't ready for yet. However I did add a
>> version
>> > of this for metadata on KAFKA-1238 here:
>> >
>> >
>> >
>> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch
>> >
>> > Okay, it would be great to get feedback on this code and this general
>> > approach to protocol definition. If everyone likes it then I am going to
>> > consider all the discussion items for the new code wrapped up and move
>> on
>> > to the more detailed code review and testing.
>> >
>> > -Jay
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to