Jay,

This seems like a great direction.  Simplifying the consumer client would
be a big win, and +1 for more native java client integration.

On the last point, regarding memory usage for buffering per partition.  I
would think it could be possible to devise a dynamic queuing system, to
allow higher volume partitions to have larger effective buffers than
smaller, low-volume partitions.  Thus, if you reserve a fixed
total.buffer.memory, you could allocate units of buffer space which could
then be composed to make larger buffers (perhaps not necessarily
contiguous).  The long-tail of low-volume partitions could also be moved to
some sort of auxiliary, non-collated buffer space, as they are less likely
to benefit from contiguous buffering anyway.

Fun stuff.

Jason

Jason


On Fri, Jul 26, 2013 at 3:00 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients still
> exist. The new clients will be entirely in a new package so there will be
> no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we took
> a byte[] key and byte[] value and let people serialize stuff themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with these classes. They will interact
> with a ProducerRecord and ConsumerRecord. The reason for having different
> fields is because the different clients
> Proposed producer API:
> SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>
> Protocol Definition
>
> Here is what I am thinking about protocol definition. I see a couple of
> problems with what we are doing currently. First the protocol definition is
> spread throughout a bunch of custom java objects. The error reporting in
> these object is really terrible because they don't record the field names.
> Furthermore people keep adding business logic into the protocol objects
> which is pretty nasty.
>
> I would like to move to having a single Protocol.java file that defines the
> protocol in a readable DSL. Here is what I am thinking:
>
>   public static Schema REQUEST_HEADER =
>
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>
>                new Field("api_version", INT16, "The version of the API."),
>
>                  new Field("correlation_id", INT32, "A user-supplied
> integer value that will be passed back with the response"),
>
>                  new Field("client_id", STRING, "A user specified
> identifier for the client making the request."));
>
> To parse one of these requests you would do
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get("api_key");
>
> Internally Struct is just an Object[] with one entry per field which is
> populated from the schema. The mapping of name to array index is a hash
> table lookup. We can optimize access for performance critical areas by
> allowing:
>    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> this once to lookup the index of the field
>    ...
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get(apiKeyField); // now this is just an array
> access
>
> One advantage of this is this level of indirection will make it really easy
> for us to handle backwards compatability in a more principled way. The
> protocol file will actually contain ALL versions of the schema and we will
> always use the appropriate version to read the request (as specified in the
> header).
>
> NIO layer
>
> The plan is to add a non-blocking multi-connection abstraction that would
> be used by both clients.
>
> class Selector {
>   /* create a new connection and associate it with the given id */
>   public void connect(int id, InetSocketAddress address, intsendBufferSize,
> int receiveBufferSize)
>   /* wakeup this selector if it is currently awaiting data */
>   public void wakeup()
>   /* user provides sends, recieves, and a timeout. this method will
> populate "completed" and "disconnects" lists. Method blocks for up to the
> timeout waiting for data to read. */
>   public void poll(long timeout, List<Send> sends, List<Send> completed,
> List<Receive> receives, List<Integer> disconnects)
> }
>
> The consumer and producer would then each define their own logic to manage
> their set of in-flight requests.
>
> Producer Implementation
>
> There are a couple of interesting changes I think we can make to the
> producer implementation.
>
> We retain the single background "sender" thread.
>
> But we can remove the definition of sync vs async clients. We always return
> a "future" response immediately. Both sync and async sends would go through
> the buffering that we currently do for the async layer. This would mean
> that even in sync mode while the event loop is doing network IO if many
> requests accumulate they will be sent in a single batch. This effectively
> acts as a kind of "group commit". So instead of having an "async" mode that
> acts differently in some way you just have a max.delay time that controls
> how long the client will linger waiting for more data to accumulate.
> max.delay=0 is equivalent to the current sync producer.
>
> I would also propose changing our buffering strategy. Currently we queue
> unserialized requests in a BlockingQueue. This is not ideal as it is very
> difficult to reason about the memory usage of this queue. One 5MB message
> may be bigger than 10k small messages. I propose that (1) we change our
> queuing strategy to queue per-partition and (2) we directly write the
> messages to the ByteBuffer which will eventually be sent and use that as
> the "queue". The batch size should likewise be in bytes not in number of
> messages.
>
> If you think about it our current queuing strategy doesn't really make
> sense any more now that we always load balance over brokers. You set a
> batch size of N and we do a request when we have N messages in queue but
> this says nothing about the size of the requests that will be sent. You
> might end up sending all N messages to one server or you might end up
> sending 1 message to N different servers (totally defeating the purpose of
> batching).
>
> There are two granularities of batching that could make sense: the broker
> level or the partition level. We do the send requests at the broker level
> but we do the disk IO at the partition level. I propose making the queues
> per-partition rather than per broker to avoid having to reshuffle the
> contents of queues when leadership changes. This could be debated, though.
>
> If you actually look at the byte path of the producer this approach allows
> cleaning a ton of stuff up. We can do in-pace writes to the destination
> buffer that we will eventually send. This does mean moving serialization
> and compression to the user thread. But I think this is good as these may
> be slow but aren't unpredictably slow.
>
> The per-partition queues are thus implemented with a bunch of pre-allocated
> ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> time elapses that buffer is sent.
>
> By doing this we could actually just reuse the buffers when the send is
> complete. This would be nice because since the buffers are used for
> allocation they will likely fall out of young gen.
>
> A question to think about is how we want to bound memory usage. I think
> what we want is the max.batch.size which controls the size of the
> individual buffers and total.buffer.memory which controls the total memory
> used by all buffers. One problem with this is that there is the possibility
> of some fragmentation. I.e. image a situation with 5k partitions being
> produced to, each getting a low but steady message rate. Giving each of
> these a 1MB buffer would require 5GB of buffer space to have a buffer for
> each partition. I'm not sure how bad this is since at least the memory
> usage is predictable and the reality is that holding thousands of java
> objects has huge overhead compared to contiguous byte arrays.
>
> -Jay
>

Reply via email to