Hey Jay, 3,4. What I meant is it is that you still need to set/get the field values by their field names:
// in toStruct body.set("topics", topics.toArray()); // in toMetadataResponse int nodeId = (Integer) broker.get("node_id"); And I was propose just the opposite of your understanding: after the protocol file is defined, auto-generate the the toStruct and parseRequest functions; if this cannot be done I would rather enforce programmers to only using the field object references and not allowing field names, since string values may be used for other fields during a version change. Guozhang On Fri, Feb 7, 2014 at 3:23 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > If what you are saying is that we should define the java code and kind of > "automatically" generate the protocol off of that then I would be against > that as I really want the protocol definition to be the source of truth. It > should be 100% obvious when you are changing the protocol. > > -Jay > > > On Fri, Feb 7, 2014 at 3:22 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Hey Guozhang, > > > > 1,2. Yes, you need to define the new format and add it to the list of > > versions. > > 3. Not necessarily. It depends on the change. If the formats are totally > > different the code can take the version and just have some if statement > to > > read one or the other. But most changes are just the simple addition or > > removal of fields. In this case the schema DSL supports default values > that > > will be filled in to help ease backwards compatability for simple cases. > > 4. For the client presumably you always use the latest version so the > > existing logic new > Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA > > .id)) should suffice? > > > > I'm not sure if I know what it is you are proposing. Care to elaborate? > > > > -Jay > > > > > > On Fri, Feb 7, 2014 at 2:29 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Just to clarify, in this case when we are evolving, say MetadataResponse > >> to > >> version 1, We will: > >> > >> 1. In Protocol.java, add another variable METADATA_RESPONSE_V1 > >> > >> 2. In Protocol.java, change this line to: > >> > >> public static Schema[] METADATA_RESPONSE = new Schema[] { > >> METADATA_RESPONSE_V1 }; > >> > >> 3. In MetadataResponse.java, change the bytes-to-object function (i.e., > >> MetadataResponse(Struct struct)) to have both parsing logic for V0 and > V1. > >> > >> 4. In MetadataResponse.java, change the object-to-bytes function (i.e., > >> Struct toStruct() ) to V1. > >> > >> Is this correct? > >> > >> My feeling is that it would be good if we do not have to get the field > >> names like "topics" in multiple places: Protocol and MetadataResponse, > for > >> example, and although the toStruct() only needs to maintain the logic > for > >> the latest version, MetadataResponse(Struct struct) would probably needs > >> to > >> handle all versions, which will make it very complicated. So I am > >> wondering > >> can these two functions be provided automatically? I am not advocating > for > >> another round of compilation but would like to see if it is possible for > >> these procedures be done in a more programmable way. > >> > >> Guozhang > >> > >> > >> > >> On Fri, Feb 7, 2014 at 12:56 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> > >> > Okay this is the last discussion item for the new client code. :-) > >> > > >> > Previously to define an api you would implement a request and response > >> > scala object that read and wrote its own bytes. There were a few > >> problems > >> > with this: > >> > 1. The consistency of the protocol was very hard to maintain. > >> > 2. You ended up hand-coding size estimation which was very tedious and > >> > error prone > >> > 3. Error messages wouldn't give any field name information you would > >> just > >> > get some kind of BufferUnderflowException with no information about > >> what or > >> > why. Fixing these were hard because each object would have to > implement > >> > this good error handling. > >> > 4. There wasn't good support for api versioning. We have an api > version > >> > that is included in the request, but there was no easy way to maintain > >> both > >> > the old format and the new format. > >> > 5. The header information was baked into each request and it was only > >> > though great care that we could keep the header standard throughout > the > >> > requests. > >> > 6. The same class that defined the protocol was used throughout the > >> code. > >> > So what were intended to be dumb DTOs ended up getting loaded up with > >> > domain logic. Invariably aspects of this representation would end up > >> > leaking into the protocol. > >> > 7. It was very hard to figure out what the protocol was from the code > >> since > >> > the definition was embedded in byte munging code spread out over > dozens > >> of > >> > files. > >> > > >> > So that was definitely bad. > >> > > >> > We considered moving to an off-the-shelf protocol definition language > >> like > >> > avro or protocol buffers. But prior experience with these is that they > >> are > >> > great for whipping together a quick service but for a stable protocol > >> it is > >> > actually better to define the protocol rather than specifying an > >> > implementation like avro or protocol buffers. This is similar to what > is > >> > done with AMQP which I think does a fantastic job of providing a well > >> > specified messaging protocol (that protocol is not suitable for the > >> type of > >> > system we are building, but their method of specifying it I think is > >> very > >> > good). > >> > > >> > So the conclusion was to retain our BNF-specified protocol and instead > >> > implement a simple library for implementing this protocol. This would > >> have > >> > the advantage of letting us retain our existing protocol and also to > >> add a > >> > few Kafka-specific optimizations. This library is just a helper > utility > >> for > >> > implementing our protocol spec, the spec remains the source of truth. > >> > > >> > I implemented this as part of the new client effort. I will describe > >> how my > >> > library works and the pattern I think we should use with it. > >> > > >> > The code for defining the protocol is in > >> > org.apache.kafka.common.protocol.types. Note that this is meant to be > a > >> > stand-alone library for serialization, it doesn't know anything about > >> our > >> > actual request and responses or even that the messages being defined > >> will > >> > be sent over a network. The definition of our protocol is defined in > >> > org.apache.kafka.common.protocol.Protocol, this is just the protocol > >> and is > >> > decoupled from the network layer and everything else. > >> > > >> > We define a set of types that match our protocol, namely: > >> > - fixed length primitives: int8, int16, int32, int64 > >> > - variable-length primitives: string, bytes > >> > - container types: arrayof, struct > >> > > >> > You define a message using types. All types extend > >> > org.apache.kafka.common.protocol.types.Type.java. Each type knows how > to > >> > read, write, validate, and estimate the size of a single java object > >> type. > >> > Here is the correspondence > >> > Type.INT8: java.lang.Byte > >> > Type.INT16: java.lang.Short > >> > Type.INT32: java.lang.Integer > >> > Type.INT32: java.lang.Long > >> > Type.STRING: java.lang.String > >> > Type.BYTES: java.nio.ByteBuffer > >> > ArrayOf: Object[] > >> > Schema: Struct > >> > The correspondence here can be thought of as that between a class and > an > >> > object: the class specifies the layout of the object, the object is an > >> > instantiation of that class with particular values. Each message is > >> defined > >> > by a Schema, which can be used to read and write a Struct. The schema > >> > specifies the fields in the type, and the Struct is an "instantiation" > >> of > >> > those fields with actual values. A struct can be thought of as a > special > >> > purpose hashmap. > >> > > >> > An example will make this more clear. Here is how you define the > request > >> > header schema: > >> > new Schema(new Field("api_key", INT16, "The id of the request > >> type."), > >> > new Field("api_version", INT16, "The version of the > >> API."), > >> > new Field("correlation_id", INT32, "documentation > >> string"), > >> > new Field("client_id", STRING, "more documentation.")); > >> > > >> > So a request header is a message that consists of a short api key > >> followed > >> > by a short api version followed by a correlation id and client id. > >> > > >> > Here is a more complex example, the producer response: > >> > > >> > new Schema(new Field("responses", new ArrayOf(new Schema(new > >> Field("topic", > >> > STRING), new Field("partition_responses", new ArrayOf(new Schema(new > >> Field( > >> > "partition", INT32), new Field("error_code", INT16), new > >> > Field("base_offset" > >> > , INT64)))))))) > >> > > >> > (indentation in email is tricky). Note that this has a schema which > >> > contains an array of sub-records which in turn have a sub-array of > >> records. > >> > As this nesting gets more complicated it can get a bit hard to read, > so > >> you > >> > can break it up using variables. An equivalent definition would be: > >> > > >> > Schema partitionResponse = new Schema(new Field("partition", INT32), > >> > > >> > new Field("error_code", INT16), > >> > > >> > new Field("base_offset", > INT64)); > >> > > >> > Schema topicResponse = new Schema(new Field("topic", STRING), > >> > > >> > new Field("partition_responses", new > >> > ArrayOf(partitionResponse))); > >> > > >> > Schema producerResposne = new Schema(new Field("responses", new > >> > ArrayOf(topicResponse))); > >> > > >> > Note that this is exactly equivalent. > >> > > >> > Okay once such a schema is defined you can write an object in the > >> following > >> > way: > >> > > >> > Struct header = new Struct(headerSchema); > >> > > >> > header.set("api_key", (short) 1); > >> > > >> > header.set("api_version", (short), 0); > >> > > >> > ... > >> > > >> > headerSchema.write(buffer, header); > >> > > >> > And you can read an instance of a header by doing: > >> > > >> > Struct header = headerSchema.read(buffer); > >> > > >> > Short apiKey = (Short) header.get("api_key"); > >> > > >> > Field apiVersionField = header.field("api_version"); > >> > > >> > Short apiKey = header.get(apiVersionField); > >> > > >> > Note the two different field access styles. Accessing a field by name > >> has > >> > the performance of a hash table lookup. However for performance > critical > >> > situations you can get the Field object that represents that entry in > >> the > >> > struct. Getting this field object takes a hash table lookup but once > you > >> > have it it will get that field out of any instance of that struct with > >> the > >> > performance of an array access. So this is useful in cases where you > can > >> > statically fetch all the fields and then use them on every request > (and > >> > assuming you need to optimize performance). > >> > > >> > These raw structs are logic-free and act as the "DTO" for data that > >> will be > >> > sent over the network. > >> > > >> > For the more complex requests and responses interacting with the raw > >> struct > >> > is not very pleasent. My recommendation is that we still maintain a > java > >> > object that is the "domain object" for the request and knows how to > read > >> > and write itself to the struct. This is what you would end up passing > >> down > >> > into KafkaApis. This will have all the convenience methods that people > >> were > >> > wanting to add to the protocol objects before. The downside of this is > >> that > >> > in some ways you define the request twice, but I think both of these > >> layers > >> > are actually needed and would evolve independently (the struct only > when > >> > the protocol changes and the domain object with the needs of the code > >> that > >> > use it). I haven't actually done this in the produce yet, in part > >> because I > >> > think to make these domain objects properly you need to use them on > the > >> > server side too which we aren't ready for yet. However I did add a > >> version > >> > of this for metadata on KAFKA-1238 here: > >> > > >> > > >> > > >> > https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch > >> > > >> > Okay, it would be great to get feedback on this code and this general > >> > approach to protocol definition. If everyone likes it then I am going > to > >> > consider all the discussion items for the new code wrapped up and move > >> on > >> > to the more detailed code review and testing. > >> > > >> > -Jay > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > -- -- Guozhang