In the past there was some discussion about having a C client for non-JVM 
languages.  Is this still planned as well?  Being able to work with Kafka from 
other languages would be a great thing.  Where I work, we interact with Kafka 
via Java and Ruby (producer), so having an official C library that can be used 
from within Ruby would make it easier to have the same version of the client in 
Java and Ruby.

-----Original Message-----
From: Jay Kreps [mailto:jay.kr...@gmail.com] 
Sent: Friday, July 26, 2013 3:00 PM
To: d...@kafka.apache.org; users@kafka.apache.org
Subject: Client improvement discussion

I sent around a wiki a few weeks back proposing a set of client improvements 
that essentially amount to a rewrite of the producer and consumer java clients.

https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite

The below discussion assumes you have read this wiki.

I started to do a little prototyping for the producer and wanted to share some 
of the ideas that came up to get early feedback.

First, a few simple but perhaps controversial things to discuss.

Rollout
Phase 1: We add the new clients. No change on the server. Old clients still 
exist. The new clients will be entirely in a new package so there will be no 
possibility of name collision.
Phase 2: We swap out all shared code on the server to use the new client stuff. 
At this point the old clients still exist but are essentially deprecated.
Phase 3: We remove the old client code.

Java
I think we should do the clients in java. Making our users deal with scala's 
non-compatability issues and crazy stack traces causes people a lot of pain. 
Furthermore we end up having to wrap everything now to get a usable java api 
anyway for non-scala people. This does mean maintaining a substantial chunk of 
java code, which is maybe less fun than scala. But basically i think we should 
optimize for the end user and produce a standalone pure-java jar with no 
dependencies.

Jars
We definitely want to separate out the client jar. There is also a fair amount 
of code shared between both (exceptions, protocol definition, utils, and the 
message set implementation). Two approaches.
Two jar approach: split kafka.jar into kafka-clients.jar and kafka-server.jar 
with the server depending on the clients. The advantage of this is that it is 
simple. The disadvantage is that things like utils and protocol definition will 
be in the client jar though technical they belong equally to the server.
Many jar approach: split kafka.jar into kafka-common.jar, kafka-producer.jar, 
kafka-consumer.jar, kafka-admin.jar, and kafka-server.jar. The disadvantage of 
this is that the user needs two jars (common + something) which is for sure 
going to confuse people. I also think this will tend to spawn more jars over 
time.

Background threads
I am thinking of moving both serialization and compression out of the 
background send thread. I will explain a little about this idea below.

Serialization
I am not sure if we should handle serialization in the client at all.
Basically I wonder if our own API wouldn't just be a lot simpler if we took a 
byte[] key and byte[] value and let people serialize stuff themselves.
Injecting a class name for us to create the serializer is more roundabout and 
has a lot of problems if the serializer itself requires a lot of configuration 
or other objects to be instantiated.

Partitioning
The real question with serialization is whether the partitioning should happen 
on the java object or on the byte array key. The argument for doing it on the 
java object is that it is easier to do something like a range partition on the 
object. The problem with doing it on the object is that the consumer may not be 
in java and so may not be able to reproduce the partitioning. For example we 
currently use Object.hashCode which is a little sketchy. We would be better off 
doing a standardized hash function on the key bytes. If we want to give the 
partitioner access to the original java object then obviously we need to handle 
serialization behind our api.

Names
I think good names are important. I would like to rename the following classes 
in the new client:
  Message=>Record: Now that the message has both a message and a key it is more 
of a KeyedMessage. Another name for a KeyedMessage is a Record.
  MessageSet=>Records: This isn't too important but nit pickers complain that 
it is not technically a Set but rather a List or Sequence but MessageList 
sounds funny to me.

The actual clients will not interact with these classes. They will interact 
with a ProducerRecord and ConsumerRecord. The reason for having different 
fields is because the different clients Proposed producer API:
SendResponse r = producer.send(new ProducerRecord(topic, key, value))

Protocol Definition

Here is what I am thinking about protocol definition. I see a couple of 
problems with what we are doing currently. First the protocol definition is 
spread throughout a bunch of custom java objects. The error reporting in these 
object is really terrible because they don't record the field names.
Furthermore people keep adding business logic into the protocol objects which 
is pretty nasty.

I would like to move to having a single Protocol.java file that defines the 
protocol in a readable DSL. Here is what I am thinking:

  public static Schema REQUEST_HEADER =

    new Schema(new Field("api_key", INT16, "The id of the request type."),

               new Field("api_version", INT16, "The version of the API."),

                 new Field("correlation_id", INT32, "A user-supplied integer 
value that will be passed back with the response"),

                 new Field("client_id", STRING, "A user specified identifier 
for the client making the request."));

To parse one of these requests you would do
   Struct struct = REQUEST_HEADER.parse(bytebuffer);
   short apiKey = struct.get("api_key");

Internally Struct is just an Object[] with one entry per field which is 
populated from the schema. The mapping of name to array index is a hash table 
lookup. We can optimize access for performance critical areas by
allowing:
   static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do this 
once to lookup the index of the field
   ...
   Struct struct = REQUEST_HEADER.parse(bytebuffer);
   short apiKey = struct.get(apiKeyField); // now this is just an array access

One advantage of this is this level of indirection will make it really easy for 
us to handle backwards compatability in a more principled way. The protocol 
file will actually contain ALL versions of the schema and we will always use 
the appropriate version to read the request (as specified in the header).

NIO layer

The plan is to add a non-blocking multi-connection abstraction that would be 
used by both clients.

class Selector {
  /* create a new connection and associate it with the given id */
  public void connect(int id, InetSocketAddress address, intsendBufferSize, int 
receiveBufferSize)
  /* wakeup this selector if it is currently awaiting data */
  public void wakeup()
  /* user provides sends, recieves, and a timeout. this method will populate 
"completed" and "disconnects" lists. Method blocks for up to the timeout 
waiting for data to read. */
  public void poll(long timeout, List<Send> sends, List<Send> completed, 
List<Receive> receives, List<Integer> disconnects) }

The consumer and producer would then each define their own logic to manage 
their set of in-flight requests.

Producer Implementation

There are a couple of interesting changes I think we can make to the producer 
implementation.

We retain the single background "sender" thread.

But we can remove the definition of sync vs async clients. We always return a 
"future" response immediately. Both sync and async sends would go through the 
buffering that we currently do for the async layer. This would mean that even 
in sync mode while the event loop is doing network IO if many requests 
accumulate they will be sent in a single batch. This effectively acts as a kind 
of "group commit". So instead of having an "async" mode that acts differently 
in some way you just have a max.delay time that controls how long the client 
will linger waiting for more data to accumulate.
max.delay=0 is equivalent to the current sync producer.

I would also propose changing our buffering strategy. Currently we queue 
unserialized requests in a BlockingQueue. This is not ideal as it is very 
difficult to reason about the memory usage of this queue. One 5MB message may 
be bigger than 10k small messages. I propose that (1) we change our queuing 
strategy to queue per-partition and (2) we directly write the messages to the 
ByteBuffer which will eventually be sent and use that as the "queue". The batch 
size should likewise be in bytes not in number of messages.

If you think about it our current queuing strategy doesn't really make sense 
any more now that we always load balance over brokers. You set a batch size of 
N and we do a request when we have N messages in queue but this says nothing 
about the size of the requests that will be sent. You might end up sending all 
N messages to one server or you might end up sending 1 message to N different 
servers (totally defeating the purpose of batching).

There are two granularities of batching that could make sense: the broker level 
or the partition level. We do the send requests at the broker level but we do 
the disk IO at the partition level. I propose making the queues per-partition 
rather than per broker to avoid having to reshuffle the contents of queues when 
leadership changes. This could be debated, though.

If you actually look at the byte path of the producer this approach allows 
cleaning a ton of stuff up. We can do in-pace writes to the destination buffer 
that we will eventually send. This does mean moving serialization and 
compression to the user thread. But I think this is good as these may be slow 
but aren't unpredictably slow.

The per-partition queues are thus implemented with a bunch of pre-allocated 
ByteBuffers sized to max.batch.size, when the buffer is full or the delay time 
elapses that buffer is sent.

By doing this we could actually just reuse the buffers when the send is 
complete. This would be nice because since the buffers are used for allocation 
they will likely fall out of young gen.

A question to think about is how we want to bound memory usage. I think what we 
want is the max.batch.size which controls the size of the individual buffers 
and total.buffer.memory which controls the total memory used by all buffers. 
One problem with this is that there is the possibility of some fragmentation. 
I.e. image a situation with 5k partitions being produced to, each getting a low 
but steady message rate. Giving each of these a 1MB buffer would require 5GB of 
buffer space to have a buffer for each partition. I'm not sure how bad this is 
since at least the memory usage is predictable and the reality is that holding 
thousands of java objects has huge overhead compared to contiguous byte arrays.

-Jay

Reply via email to