Jay Kreps created KAFKA-643:
-------------------------------

             Summary: Refactor api definition layer
                 Key: KAFKA-643
                 URL: https://issues.apache.org/jira/browse/KAFKA-643
             Project: Kafka
          Issue Type: Improvement
    Affects Versions: 0.8.1
            Reporter: Jay Kreps
            Assignee: Jay Kreps


The way we are defining our protocol is really a bit embarrassing. It is full 
of ad hoc serialization code for each API. This code is very fiddly and opaque 
and when it has errors they are hard to debug. Since it is all done one-off it 
is also very easy for it to become inconsistent. This was tolerable when there 
were only two apis with a few fields each, but now there are a half dozen more 
complex apis. By my count there is now over 1000 lines of code in kafka.apis.*.

One option would be to use protocol buffers or thrift or another 
schema-oriented code gen RPC language. However I think this is probably the 
wrong direction for a couple reasons. One is that we want something that works 
well with our I/O model, both network and disk, which is very NIO-centric. So 
it should work directly with ByteBuffers. Second I feel that these systems 
complicate the specification of the protocol. They give a schema, which is a 
great high-level description, but the translation of that to bytes is 
essentially whatever their code-gen engine chooses to do. These things are a 
great way to build application services, but not great for something like what 
we are building.

Instead I think we should do what we have done, specify the protocol as a wiki. 
However we should write a little helper code to make our lives easier.

Here is my recommendation for how this code would work. We add two helper 
classes: Schema and Record.

You define messages formats like this:
import Types._
val FetchRequestProtocol = 
  Schema("ReplicaId"->int32, 
               "MaxWaitTime"->int32, 
               "MinBytes"->int32,
               Seq("TopicName"->utf8,
                      Seq("Partition"->int32, 
                             "FetchOffset"->int64, 
                             "MaxBytes"->int32)))

Note that this almost exactly matches the BNF for the fetch request: 
  
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Once defined this schema can be used to parse messages:
  val record: Record = FetchRequestProtocol.readFrom(buffer)
A record is just a wrapper around an array. The readFrom method parses out the 
fields specified in the schema and populates the array. Fields in the record 
can be accessed by name, e.g. 
  record("ReplicaId")
For common access this is probably good enough. However since the position is 
fixed, it is also possible to get the element by a Field object, which gets rid 
of the hashmap lookup and goes directly to the right slot. E.g.
  val ReplicaIdField = FetchRequestProtocol("ReplicaId") // do this as a global 
variable
  ...
  record(ReplicaIdField)
This will be for cases where we are a bit performance conscious and don't want 
to do umpteen hashmap lookups to resolve string field names.

Likewise the other direction, to write out a record:
  record.writeTo(buffer)
and to get the size in bytes:
  record.size

Implementing a single read, write, and size method with generic schemas will 
not only make the underlying protocol clearly defined but also ensure good 
error handling, error reporting, etc. It will be a bit slower, maybe not much 
because we can optimize this code.

I do realize that this is essentially what Avro or Thrift or ProtocolBuffers 
do, but I think this is much simpler, and can be implemented in a few hundred 
lines of code with no dependencies. Furthermore it is a way to implement our 
protocol, not a way to define a protocol.

In terms of how we use this, this is what I have in mind:

I think we should split the apis into a generic and a specific portion. With 
the generic piece being the header shared by all requests and responses, and 
the specific portion being the bits for that message. I recommend we officially 
implement versioning by allowing multiple versions of the schemas and always 
looking up the right schema for the incoming and outgoing messages. I think we 
can keep the existing case classes, and just map the scala objects to and from 
the record instances in a wrapper layer prior to the existing KafkaApis. The 
KafkaApis.handle method would disappear and instead this wrapper would handle 
message deserialization and calling the right method with the right request 
object.



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to