Jay,

This looks very cool, and will certainly make writing future APIs more sane and maintainable.

Few questions/comments:

* There's one part of the current API (I forget where) that does not use a length-prefixed repeating element, how would (or would you) represent this using Schema? IMO, we should be consistent and always use length prefixing for variable length things. * I've been meaning to ask for a while, but will there be a way to interrogate Kafka for the current API version(s)? Clients will definitely need this * Looks like Schema#read eagerly read values, think there is any reason why you'd want to do this lazily? * Now that Struct is in the mix, it seems we have 2x objects. We should keep an eye on the GC impact of this * Using Struct, which is more of a dynamic thing, the conversion to a typed response class (like MetadataResponse) looks a little hairy and probably error prone. Maybe it just needs to be decomposed a bit. Maybe if Struct could return a "sub-Struct" for nested/repeating elements so parsing logic could be organized better.

While we are looking at protocols, maybe we should think about what we could do to make Kafka more amenable to large payloads. I'm not sure how we could do that with a nice abstraction like you have, but it's worth thinking about.

As for storing the protocol definitions, it would be really sweet if we could keep them in a text form (something like BNF, if not just BNF itself). This would allow other languages to write parsers for it and automatically generate protocols. If not using a text form as the source of truth, maybe Schema could simply generate a BNF text file.

Again, this is really cool and could definitely be it's own standalone library :)

-David


On 2/7/14 3:56 PM, Jay Kreps wrote:
Okay this is the last discussion item for the new client code. :-)

Previously to define an api you would implement a request and response
scala object that read and wrote its own bytes. There were a few problems
with this:
1. The consistency of the protocol was very hard to maintain.
2. You ended up hand-coding size estimation which was very tedious and
error prone
3. Error messages wouldn't give any field name information you would just
get some kind of BufferUnderflowException with no information about what or
why. Fixing these were hard because each object would have to implement
this good error handling.
4. There wasn't good support for api versioning. We have an api version
that is included in the request, but there was no easy way to maintain both
the old format and the new format.
5. The header information was baked into each request and it was only
though great care that we could keep the header standard throughout the
requests.
6. The same class that defined the protocol was used throughout the code.
So what were intended to be dumb DTOs ended up getting loaded up with
domain logic. Invariably aspects of this representation would end up
leaking into the protocol.
7. It was very hard to figure out what the protocol was from the code since
the definition was embedded in byte munging code spread out over dozens of
files.

So that was definitely bad.

We considered moving to an off-the-shelf protocol definition language like
avro or protocol buffers. But prior experience with these is that they are
great for whipping together a quick service but for a stable protocol it is
actually better to define the protocol rather than specifying an
implementation like avro or protocol buffers. This is similar to what is
done with AMQP which I think does a fantastic job of providing a well
specified messaging protocol (that protocol is not suitable for the type of
system we are building, but their method of specifying it I think is very
good).

So the conclusion was to retain our BNF-specified protocol and instead
implement a simple library for implementing this protocol. This would have
the advantage of letting us retain our existing protocol and also to add a
few Kafka-specific optimizations. This library is just a helper utility for
implementing our protocol spec, the spec remains the source of truth.

I implemented this as part of the new client effort. I will describe how my
library works and the pattern I think we should use with it.

The code for defining the protocol is in
org.apache.kafka.common.protocol.types. Note that this is meant to be a
stand-alone library for serialization, it doesn't know anything about our
actual request and responses or even that the messages being defined will
be sent over a network. The definition of our protocol is defined in
org.apache.kafka.common.protocol.Protocol, this is just the protocol and is
decoupled from the network layer and everything else.

We define a set of types that match our protocol, namely:
- fixed length primitives: int8, int16, int32, int64
- variable-length primitives: string, bytes
- container types: arrayof, struct

You define a message using types. All types extend
org.apache.kafka.common.protocol.types.Type.java. Each type knows how to
read, write, validate, and estimate the size of a single java object type.
Here is the correspondence
   Type.INT8: java.lang.Byte
   Type.INT16: java.lang.Short
   Type.INT32: java.lang.Integer
   Type.INT32: java.lang.Long
   Type.STRING: java.lang.String
   Type.BYTES: java.nio.ByteBuffer
   ArrayOf: Object[]
   Schema: Struct
The correspondence here can be thought of as that between a class and an
object: the class specifies the layout of the object, the object is an
instantiation of that class with particular values. Each message is defined
by a Schema, which can be used to read and write a Struct. The schema
specifies the fields in the type, and the Struct is an "instantiation" of
those fields with actual values. A struct can be thought of as a special
purpose hashmap.

An example will make this more clear. Here is how you define the request
header schema:
    new Schema(new Field("api_key", INT16, "The id of the request type."),
               new Field("api_version", INT16, "The version of the API."),
               new Field("correlation_id", INT32, "documentation string"),
               new Field("client_id", STRING, "more documentation."));

So a request header is a message that consists of a short api key followed
by a short api version followed by a correlation id and client id.

Here is a more complex example, the producer response:

new Schema(new Field("responses", new ArrayOf(new Schema(new Field("topic",
STRING), new Field("partition_responses", new ArrayOf(new Schema(new Field(
"partition", INT32), new Field("error_code", INT16), new Field("base_offset"
, INT64))))))))

(indentation in email is tricky). Note that this has a schema which
contains an array of sub-records which in turn have a sub-array of records.
As this nesting gets more complicated it can get a bit hard to read, so you
can break it up using variables. An equivalent definition would be:

Schema partitionResponse = new Schema(new Field("partition", INT32),

                                       new Field("error_code", INT16),

                                       new Field("base_offset", INT64));

Schema topicResponse = new Schema(new Field("topic", STRING),

                                   new Field("partition_responses", new
ArrayOf(partitionResponse)));

Schema producerResposne = new Schema(new Field("responses", new
  ArrayOf(topicResponse)));

Note that this is exactly equivalent.

Okay once such a schema is defined you can write an object in the following
way:

Struct header = new Struct(headerSchema);

header.set("api_key", (short) 1);

header.set("api_version", (short), 0);

...

headerSchema.write(buffer, header);

And you can read an instance of a header by doing:

Struct header = headerSchema.read(buffer);

Short apiKey = (Short) header.get("api_key");

Field apiVersionField = header.field("api_version");

Short apiKey = header.get(apiVersionField);

Note the two different field access styles. Accessing a field by name has
the performance of a hash table lookup. However for performance critical
situations you can get the Field object that represents that entry in the
struct. Getting this field object takes a hash table lookup but once you
have it it will get that field out of any instance of that struct with the
performance of an array access. So this is useful in cases where you can
statically fetch all the fields and then use them on every request (and
assuming you need to optimize performance).

These raw structs are logic-free and act as the "DTO" for data that will be
sent over the network.

For the more complex requests and responses interacting with the raw struct
is not very pleasent. My recommendation is that we still maintain a java
object that is the "domain object" for the request and knows how to read
and write itself to the struct. This is what you would end up passing down
into KafkaApis. This will have all the convenience methods that people were
wanting to add to the protocol objects before. The downside of this is that
in some ways you define the request twice, but I think both of these layers
are actually needed and would evolve independently (the struct only when
the protocol changes and the domain object with the needs of the code that
use it). I haven't actually done this in the produce yet, in part because I
think to make these domain objects properly you need to use them on the
server side too which we aren't ready for yet. However I did add a version
of this for metadata on KAFKA-1238 here:

https://issues.apache.org/jira/secure/attachment/12627654/KAFKA-1238-v1.patch

Okay, it would be great to get feedback on this code and this general
approach to protocol definition. If everyone likes it then I am going to
consider all the discussion items for the new code wrapped up and move on
to the more detailed code review and testing.

-Jay


Reply via email to