+1 to making the API use bytes and push serialization into the client. This
is effectively what I am doing currently anyway. I implemented a generic
Encoder<ByteString> which just passes the bytes through.

I also like the idea of the client being written in pure Java. Interacting
with Scala code from Java isn't nearly as nice as the other way around.

Just my 2 cents.

-Xavier



On Fri, Jul 26, 2013 at 2:46 PM, Jason Rosenberg <j...@squareup.com> wrote:

> Jay,
>
> This seems like a great direction.  Simplifying the consumer client would
> be a big win, and +1 for more native java client integration.
>
> On the last point, regarding memory usage for buffering per partition.  I
> would think it could be possible to devise a dynamic queuing system, to
> allow higher volume partitions to have larger effective buffers than
> smaller, low-volume partitions.  Thus, if you reserve a fixed
> total.buffer.memory, you could allocate units of buffer space which could
> then be composed to make larger buffers (perhaps not necessarily
> contiguous).  The long-tail of low-volume partitions could also be moved to
> some sort of auxiliary, non-collated buffer space, as they are less likely
> to benefit from contiguous buffering anyway.
>
> Fun stuff.
>
> Jason
>
> Jason
>
>
> On Fri, Jul 26, 2013 at 3:00 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > I sent around a wiki a few weeks back proposing a set of client
> > improvements that essentially amount to a rewrite of the producer and
> > consumer java clients.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> >
> > The below discussion assumes you have read this wiki.
> >
> > I started to do a little prototyping for the producer and wanted to share
> > some of the ideas that came up to get early feedback.
> >
> > First, a few simple but perhaps controversial things to discuss.
> >
> > Rollout
> > Phase 1: We add the new clients. No change on the server. Old clients
> still
> > exist. The new clients will be entirely in a new package so there will be
> > no possibility of name collision.
> > Phase 2: We swap out all shared code on the server to use the new client
> > stuff. At this point the old clients still exist but are essentially
> > deprecated.
> > Phase 3: We remove the old client code.
> >
> > Java
> > I think we should do the clients in java. Making our users deal with
> > scala's non-compatability issues and crazy stack traces causes people a
> lot
> > of pain. Furthermore we end up having to wrap everything now to get a
> > usable java api anyway for non-scala people. This does mean maintaining a
> > substantial chunk of java code, which is maybe less fun than scala. But
> > basically i think we should optimize for the end user and produce a
> > standalone pure-java jar with no dependencies.
> >
> > Jars
> > We definitely want to separate out the client jar. There is also a fair
> > amount of code shared between both (exceptions, protocol definition,
> utils,
> > and the message set implementation). Two approaches.
> > Two jar approach: split kafka.jar into kafka-clients.jar and
> > kafka-server.jar with the server depending on the clients. The advantage
> of
> > this is that it is simple. The disadvantage is that things like utils and
> > protocol definition will be in the client jar though technical they
> belong
> > equally to the server.
> > Many jar approach: split kafka.jar into kafka-common.jar,
> > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > kafka-server.jar. The disadvantage of this is that the user needs two
> jars
> > (common + something) which is for sure going to confuse people. I also
> > think this will tend to spawn more jars over time.
> >
> > Background threads
> > I am thinking of moving both serialization and compression out of the
> > background send thread. I will explain a little about this idea below.
> >
> > Serialization
> > I am not sure if we should handle serialization in the client at all.
> > Basically I wonder if our own API wouldn't just be a lot simpler if we
> took
> > a byte[] key and byte[] value and let people serialize stuff themselves.
> > Injecting a class name for us to create the serializer is more roundabout
> > and has a lot of problems if the serializer itself requires a lot of
> > configuration or other objects to be instantiated.
> >
> > Partitioning
> > The real question with serialization is whether the partitioning should
> > happen on the java object or on the byte array key. The argument for
> doing
> > it on the java object is that it is easier to do something like a range
> > partition on the object. The problem with doing it on the object is that
> > the consumer may not be in java and so may not be able to reproduce the
> > partitioning. For example we currently use Object.hashCode which is a
> > little sketchy. We would be better off doing a standardized hash function
> > on the key bytes. If we want to give the partitioner access to the
> original
> > java object then obviously we need to handle serialization behind our
> api.
> >
> > Names
> > I think good names are important. I would like to rename the following
> > classes in the new client:
> >   Message=>Record: Now that the message has both a message and a key it
> is
> > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> >   MessageSet=>Records: This isn't too important but nit pickers complain
> > that it is not technically a Set but rather a List or Sequence but
> > MessageList sounds funny to me.
> >
> > The actual clients will not interact with these classes. They will
> interact
> > with a ProducerRecord and ConsumerRecord. The reason for having different
> > fields is because the different clients
> > Proposed producer API:
> > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> >
> > Protocol Definition
> >
> > Here is what I am thinking about protocol definition. I see a couple of
> > problems with what we are doing currently. First the protocol definition
> is
> > spread throughout a bunch of custom java objects. The error reporting in
> > these object is really terrible because they don't record the field
> names.
> > Furthermore people keep adding business logic into the protocol objects
> > which is pretty nasty.
> >
> > I would like to move to having a single Protocol.java file that defines
> the
> > protocol in a readable DSL. Here is what I am thinking:
> >
> >   public static Schema REQUEST_HEADER =
> >
> >     new Schema(new Field("api_key", INT16, "The id of the request
> type."),
> >
> >                new Field("api_version", INT16, "The version of the
> API."),
> >
> >                  new Field("correlation_id", INT32, "A user-supplied
> > integer value that will be passed back with the response"),
> >
> >                  new Field("client_id", STRING, "A user specified
> > identifier for the client making the request."));
> >
> > To parse one of these requests you would do
> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >    short apiKey = struct.get("api_key");
> >
> > Internally Struct is just an Object[] with one entry per field which is
> > populated from the schema. The mapping of name to array index is a hash
> > table lookup. We can optimize access for performance critical areas by
> > allowing:
> >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> > this once to lookup the index of the field
> >    ...
> >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> >    short apiKey = struct.get(apiKeyField); // now this is just an array
> > access
> >
> > One advantage of this is this level of indirection will make it really
> easy
> > for us to handle backwards compatability in a more principled way. The
> > protocol file will actually contain ALL versions of the schema and we
> will
> > always use the appropriate version to read the request (as specified in
> the
> > header).
> >
> > NIO layer
> >
> > The plan is to add a non-blocking multi-connection abstraction that would
> > be used by both clients.
> >
> > class Selector {
> >   /* create a new connection and associate it with the given id */
> >   public void connect(int id, InetSocketAddress address,
> intsendBufferSize,
> > int receiveBufferSize)
> >   /* wakeup this selector if it is currently awaiting data */
> >   public void wakeup()
> >   /* user provides sends, recieves, and a timeout. this method will
> > populate "completed" and "disconnects" lists. Method blocks for up to the
> > timeout waiting for data to read. */
> >   public void poll(long timeout, List<Send> sends, List<Send> completed,
> > List<Receive> receives, List<Integer> disconnects)
> > }
> >
> > The consumer and producer would then each define their own logic to
> manage
> > their set of in-flight requests.
> >
> > Producer Implementation
> >
> > There are a couple of interesting changes I think we can make to the
> > producer implementation.
> >
> > We retain the single background "sender" thread.
> >
> > But we can remove the definition of sync vs async clients. We always
> return
> > a "future" response immediately. Both sync and async sends would go
> through
> > the buffering that we currently do for the async layer. This would mean
> > that even in sync mode while the event loop is doing network IO if many
> > requests accumulate they will be sent in a single batch. This effectively
> > acts as a kind of "group commit". So instead of having an "async" mode
> that
> > acts differently in some way you just have a max.delay time that controls
> > how long the client will linger waiting for more data to accumulate.
> > max.delay=0 is equivalent to the current sync producer.
> >
> > I would also propose changing our buffering strategy. Currently we queue
> > unserialized requests in a BlockingQueue. This is not ideal as it is very
> > difficult to reason about the memory usage of this queue. One 5MB message
> > may be bigger than 10k small messages. I propose that (1) we change our
> > queuing strategy to queue per-partition and (2) we directly write the
> > messages to the ByteBuffer which will eventually be sent and use that as
> > the "queue". The batch size should likewise be in bytes not in number of
> > messages.
> >
> > If you think about it our current queuing strategy doesn't really make
> > sense any more now that we always load balance over brokers. You set a
> > batch size of N and we do a request when we have N messages in queue but
> > this says nothing about the size of the requests that will be sent. You
> > might end up sending all N messages to one server or you might end up
> > sending 1 message to N different servers (totally defeating the purpose
> of
> > batching).
> >
> > There are two granularities of batching that could make sense: the broker
> > level or the partition level. We do the send requests at the broker level
> > but we do the disk IO at the partition level. I propose making the queues
> > per-partition rather than per broker to avoid having to reshuffle the
> > contents of queues when leadership changes. This could be debated,
> though.
> >
> > If you actually look at the byte path of the producer this approach
> allows
> > cleaning a ton of stuff up. We can do in-pace writes to the destination
> > buffer that we will eventually send. This does mean moving serialization
> > and compression to the user thread. But I think this is good as these may
> > be slow but aren't unpredictably slow.
> >
> > The per-partition queues are thus implemented with a bunch of
> pre-allocated
> > ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> > time elapses that buffer is sent.
> >
> > By doing this we could actually just reuse the buffers when the send is
> > complete. This would be nice because since the buffers are used for
> > allocation they will likely fall out of young gen.
> >
> > A question to think about is how we want to bound memory usage. I think
> > what we want is the max.batch.size which controls the size of the
> > individual buffers and total.buffer.memory which controls the total
> memory
> > used by all buffers. One problem with this is that there is the
> possibility
> > of some fragmentation. I.e. image a situation with 5k partitions being
> > produced to, each getting a low but steady message rate. Giving each of
> > these a 1MB buffer would require 5GB of buffer space to have a buffer for
> > each partition. I'm not sure how bad this is since at least the memory
> > usage is predictable and the reality is that holding thousands of java
> > objects has huge overhead compared to contiguous byte arrays.
> >
> > -Jay
> >
>

Reply via email to