If what you are saying is that we should define the java code and kind of "automatically" generate the protocol off of that then I would be against that as I really want the protocol definition to be the source of truth. It should be 100% obvious when you are changing the protocol.
-Jay On Fri, Feb 7, 2014 at 3:22 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > Hey Guozhang, > > 1,2. Yes, you need to define the new format and add it to the list of > versions. > 3. Not necessarily. It depends on the change. If the formats are totally > different the code can take the version and just have some if statement to > read one or the other. But most changes are just the simple addition or > removal of fields. In this case the schema DSL supports default values that > will be filled in to help ease backwards compatability for simple cases. > 4. For the client presumably you always use the latest version so the > existing logic new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA > .id)) should suffice? > > I'm not sure if I know what it is you are proposing. Care to elaborate? > > -Jay > > > On Fri, Feb 7, 2014 at 2:29 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Just to clarify, in this case when we are evolving, say MetadataResponse >> to >> version 1, We will: >> >> 1. In Protocol.java, add another variable METADATA_RESPONSE_V1 >> >> 2. In Protocol.java, change this line to: >> >> public static Schema[] METADATA_RESPONSE = new Schema[] { >> METADATA_RESPONSE_V1 }; >> >> 3. In MetadataResponse.java, change the bytes-to-object function (i.e., >> MetadataResponse(Struct struct)) to have both parsing logic for V0 and V1. >> >> 4. In MetadataResponse.java, change the object-to-bytes function (i.e., >> Struct toStruct() ) to V1. >> >> Is this correct? >> >> My feeling is that it would be good if we do not have to get the field >> names like "topics" in multiple places: Protocol and MetadataResponse, for >> example, and although the toStruct() only needs to maintain the logic for >> the latest version, MetadataResponse(Struct struct) would probably needs >> to >> handle all versions, which will make it very complicated. So I am >> wondering >> can these two functions be provided automatically? I am not advocating for >> another round of compilation but would like to see if it is possible for >> these procedures be done in a more programmable way. >> >> Guozhang >> >> >> >> On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> >> > Okay this is the last discussion item for the new client code. :-) >> > >> > Previously to define an api you would implement a request and response >> > scala object that read and wrote its own bytes. There were a few >> problems >> > with this: >> > 1. The consistency of the protocol was very hard to maintain. >> > 2. You ended up hand-coding size estimation which was very tedious and >> > error prone >> > 3. Error messages wouldn't give any field name information you would >> just >> > get some kind of BufferUnderflowException with no information about >> what or >> > why. Fixing these were hard because each object would have to implement >> > this good error handling. >> > 4. There wasn't good support for api versioning. We have an api version >> > that is included in the request, but there was no easy way to maintain >> both >> > the old format and the new format. >> > 5. The header information was baked into each request and it was only >> > though great care that we could keep the header standard throughout the >> > requests. >> > 6. The same class that defined the protocol was used throughout the >> code. >> > So what were intended to be dumb DTOs ended up getting loaded up with >> > domain logic. Invariably aspects of this representation would end up >> > leaking into the protocol. >> > 7. It was very hard to figure out what the protocol was from the code >> since >> > the definition was embedded in byte munging code spread out over dozens >> of >> > files. >> > >> > So that was definitely bad. >> > >> > We considered moving to an off-the-shelf protocol definition language >> like >> > avro or protocol buffers. But prior experience with these is that they >> are >> > great for whipping together a quick service but for a stable protocol >> it is >> > actually better to define the protocol rather than specifying an >> > implementation like avro or protocol buffers. This is similar to what is >> > done with AMQP which I think does a fantastic job of providing a well >> > specified messaging protocol (that protocol is not suitable for the >> type of >> > system we are building, but their method of specifying it I think is >> very >> > good). >> > >> > So the conclusion was to retain our BNF-specified protocol and instead >> > implement a simple library for implementing this protocol. This would >> have >> > the advantage of letting us retain our existing protocol and also to >> add a >> > few Kafka-specific optimizations. This library is just a helper utility >> for >> > implementing our protocol spec, the spec remains the source of truth. >> > >> > I implemented this as part of the new client effort. I will describe >> how my >> > library works and the pattern I think we should use with it. >> > >> > The code for defining the protocol is in >> > org.apache.kafka.common.protocol.types. Note that this is meant to be a >> > stand-alone library for serialization, it doesn't know anything about >> our >> > actual request and responses or even that the messages being defined >> will >> > be sent over a network. The definition of our protocol is defined in >> > org.apache.kafka.common.protocol.Protocol, this is just the protocol >> and is >> > decoupled from the network layer and everything else. >> > >> > We define a set of types that match our protocol, namely: >> > - fixed length primitives: int8, int16, int32, int64 >> > - variable-length primitives: string, bytes >> > - container types: arrayof, struct >> > >> > You define a message using types. All types extend >> > org.apache.kafka.common.protocol.types.Type.java. Each type knows how to >> > read, write, validate, and estimate the size of a single java object >> type. >> > Here is the correspondence >> > Type.INT8: java.lang.Byte >> > Type.INT16: java.lang.Short >> > Type.INT32: java.lang.Integer >> > Type.INT32: java.lang.Long >> > Type.STRING: java.lang.String >> > Type.BYTES: java.nio.ByteBuffer >> > ArrayOf: Object[] >> > Schema: Struct >> > The correspondence here can be thought of as that between a class and an >> > object: the class specifies the layout of the object, the object is an >> > instantiation of that class with particular values. Each message is >> defined >> > by a Schema, which can be used to read and write a Struct. The schema >> > specifies the fields in the type, and the Struct is an "instantiation" >> of >> > those fields with actual values. A struct can be thought of as a special >> > purpose hashmap. >> > >> > An example will make this more clear. Here is how you define the request >> > header schema: >> > new Schema(new Field("api_key", INT16, "The id of the request >> type."), >> > new Field("api_version", INT16, "The version of the >> API."), >> > new Field("correlation_id", INT32, "documentation >> string"), >> > new Field("client_id", STRING, "more documentation.")); >> > >> > So a request header is a message that consists of a short api key >> followed >> > by a short api version followed by a correlation id and client id. >> > >> > Here is a more complex example, the producer response: >> > >> > new Schema(new Field("responses", new ArrayOf(new Schema(new >> Field("topic", >> > STRING), new Field("partition_responses", new ArrayOf(new Schema(new >> Field( >> > "partition", INT32), new Field("error_code", INT16), new >> > Field("base_offset" >> > , INT64)))))))) >> > >> > (indentation in email is tricky). Note that this has a schema which >> > contains an array of sub-records which in turn have a sub-array of >> records. >> > As this nesting gets more complicated it can get a bit hard to read, so >> you >> > can break it up using variables. An equivalent definition would be: >> > >> > Schema partitionResponse = new Schema(new Field("partition", INT32), >> > >> > new Field("error_code", INT16), >> > >> > new Field("base_offset", INT64)); >> > >> > Schema topicResponse = new Schema(new Field("topic", STRING), >> > >> > new Field("partition_responses", new >> > ArrayOf(partitionResponse))); >> > >> > Schema producerResposne = new Schema(new Field("responses", new >> > ArrayOf(topicResponse))); >> > >> > Note that this is exactly equivalent. >> > >> > Okay once such a schema is defined you can write an object in the >> following >> > way: >> > >> > Struct header = new Struct(headerSchema); >> > >> > header.set("api_key", (short) 1); >> > >> > header.set("api_version", (short), 0); >> > >> > ... >> > >> > headerSchema.write(buffer, header); >> > >> > And you can read an instance of a header by doing: >> > >> > Struct header = headerSchema.read(buffer); >> > >> > Short apiKey = (Short) header.get("api_key"); >> > >> > Field apiVersionField = header.field("api_version"); >> > >> > Short apiKey = header.get(apiVersionField); >> > >> > Note the two different field access styles. Accessing a field by name >> has >> > the performance of a hash table lookup. However for performance critical >> > situations you can get the Field object that represents that entry in >> the >> > struct. Getting this field object takes a hash table lookup but once you >> > have it it will get that field out of any instance of that struct with >> the >> > performance of an array access. So this is useful in cases where you can >> > statically fetch all the fields and then use them on every request (and >> > assuming you need to optimize performance). >> > >> > These raw structs are logic-free and act as the "DTO" for data that >> will be >> > sent over the network. >> > >> > For the more complex requests and responses interacting with the raw >> struct >> > is not very pleasent. My recommendation is that we still maintain a java >> > object that is the "domain object" for the request and knows how to read >> > and write itself to the struct. This is what you would end up passing >> down >> > into KafkaApis. This will have all the convenience methods that people >> were >> > wanting to add to the protocol objects before. The downside of this is >> that >> > in some ways you define the request twice, but I think both of these >> layers >> > are actually needed and would evolve independently (the struct only when >> > the protocol changes and the domain object with the needs of the code >> that >> > use it). I haven't actually done this in the produce yet, in part >> because I >> > think to make these domain objects properly you need to use them on the >> > server side too which we aren't ready for yet. However I did add a >> version >> > of this for metadata on KAFKA-1238 here: >> > >> > >> > >> https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch >> > >> > Okay, it would be great to get feedback on this code and this general >> > approach to protocol definition. If everyone likes it then I am going to >> > consider all the discussion items for the new code wrapped up and move >> on >> > to the more detailed code review and testing. >> > >> > -Jay >> > >> >> >> >> -- >> -- Guozhang >> > >