+1 to making the API use bytes and push serialization into the client. This is effectively what I am doing currently anyway. I implemented a generic Encoder<ByteString> which just passes the bytes through.
I also like the idea of the client being written in pure Java. Interacting with Scala code from Java isn't nearly as nice as the other way around. Just my 2 cents. -Xavier On Fri, Jul 26, 2013 at 2:46 PM, Jason Rosenberg <j...@squareup.com> wrote: > Jay, > > This seems like a great direction. Simplifying the consumer client would > be a big win, and +1 for more native java client integration. > > On the last point, regarding memory usage for buffering per partition. I > would think it could be possible to devise a dynamic queuing system, to > allow higher volume partitions to have larger effective buffers than > smaller, low-volume partitions. Thus, if you reserve a fixed > total.buffer.memory, you could allocate units of buffer space which could > then be composed to make larger buffers (perhaps not necessarily > contiguous). The long-tail of low-volume partitions could also be moved to > some sort of auxiliary, non-collated buffer space, as they are less likely > to benefit from contiguous buffering anyway. > > Fun stuff. > > Jason > > Jason > > > On Fri, Jul 26, 2013 at 3:00 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > I sent around a wiki a few weeks back proposing a set of client > > improvements that essentially amount to a rewrite of the producer and > > consumer java clients. > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite > > > > The below discussion assumes you have read this wiki. > > > > I started to do a little prototyping for the producer and wanted to share > > some of the ideas that came up to get early feedback. > > > > First, a few simple but perhaps controversial things to discuss. > > > > Rollout > > Phase 1: We add the new clients. No change on the server. Old clients > still > > exist. The new clients will be entirely in a new package so there will be > > no possibility of name collision. > > Phase 2: We swap out all shared code on the server to use the new client > > stuff. At this point the old clients still exist but are essentially > > deprecated. > > Phase 3: We remove the old client code. > > > > Java > > I think we should do the clients in java. Making our users deal with > > scala's non-compatability issues and crazy stack traces causes people a > lot > > of pain. Furthermore we end up having to wrap everything now to get a > > usable java api anyway for non-scala people. This does mean maintaining a > > substantial chunk of java code, which is maybe less fun than scala. But > > basically i think we should optimize for the end user and produce a > > standalone pure-java jar with no dependencies. > > > > Jars > > We definitely want to separate out the client jar. There is also a fair > > amount of code shared between both (exceptions, protocol definition, > utils, > > and the message set implementation). Two approaches. > > Two jar approach: split kafka.jar into kafka-clients.jar and > > kafka-server.jar with the server depending on the clients. The advantage > of > > this is that it is simple. The disadvantage is that things like utils and > > protocol definition will be in the client jar though technical they > belong > > equally to the server. > > Many jar approach: split kafka.jar into kafka-common.jar, > > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and > > kafka-server.jar. The disadvantage of this is that the user needs two > jars > > (common + something) which is for sure going to confuse people. I also > > think this will tend to spawn more jars over time. > > > > Background threads > > I am thinking of moving both serialization and compression out of the > > background send thread. I will explain a little about this idea below. > > > > Serialization > > I am not sure if we should handle serialization in the client at all. > > Basically I wonder if our own API wouldn't just be a lot simpler if we > took > > a byte[] key and byte[] value and let people serialize stuff themselves. > > Injecting a class name for us to create the serializer is more roundabout > > and has a lot of problems if the serializer itself requires a lot of > > configuration or other objects to be instantiated. > > > > Partitioning > > The real question with serialization is whether the partitioning should > > happen on the java object or on the byte array key. The argument for > doing > > it on the java object is that it is easier to do something like a range > > partition on the object. The problem with doing it on the object is that > > the consumer may not be in java and so may not be able to reproduce the > > partitioning. For example we currently use Object.hashCode which is a > > little sketchy. We would be better off doing a standardized hash function > > on the key bytes. If we want to give the partitioner access to the > original > > java object then obviously we need to handle serialization behind our > api. > > > > Names > > I think good names are important. I would like to rename the following > > classes in the new client: > > Message=>Record: Now that the message has both a message and a key it > is > > more of a KeyedMessage. Another name for a KeyedMessage is a Record. > > MessageSet=>Records: This isn't too important but nit pickers complain > > that it is not technically a Set but rather a List or Sequence but > > MessageList sounds funny to me. > > > > The actual clients will not interact with these classes. They will > interact > > with a ProducerRecord and ConsumerRecord. The reason for having different > > fields is because the different clients > > Proposed producer API: > > SendResponse r = producer.send(new ProducerRecord(topic, key, value)) > > > > Protocol Definition > > > > Here is what I am thinking about protocol definition. I see a couple of > > problems with what we are doing currently. First the protocol definition > is > > spread throughout a bunch of custom java objects. The error reporting in > > these object is really terrible because they don't record the field > names. > > Furthermore people keep adding business logic into the protocol objects > > which is pretty nasty. > > > > I would like to move to having a single Protocol.java file that defines > the > > protocol in a readable DSL. Here is what I am thinking: > > > > public static Schema REQUEST_HEADER = > > > > new Schema(new Field("api_key", INT16, "The id of the request > type."), > > > > new Field("api_version", INT16, "The version of the > API."), > > > > new Field("correlation_id", INT32, "A user-supplied > > integer value that will be passed back with the response"), > > > > new Field("client_id", STRING, "A user specified > > identifier for the client making the request.")); > > > > To parse one of these requests you would do > > Struct struct = REQUEST_HEADER.parse(bytebuffer); > > short apiKey = struct.get("api_key"); > > > > Internally Struct is just an Object[] with one entry per field which is > > populated from the schema. The mapping of name to array index is a hash > > table lookup. We can optimize access for performance critical areas by > > allowing: > > static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do > > this once to lookup the index of the field > > ... > > Struct struct = REQUEST_HEADER.parse(bytebuffer); > > short apiKey = struct.get(apiKeyField); // now this is just an array > > access > > > > One advantage of this is this level of indirection will make it really > easy > > for us to handle backwards compatability in a more principled way. The > > protocol file will actually contain ALL versions of the schema and we > will > > always use the appropriate version to read the request (as specified in > the > > header). > > > > NIO layer > > > > The plan is to add a non-blocking multi-connection abstraction that would > > be used by both clients. > > > > class Selector { > > /* create a new connection and associate it with the given id */ > > public void connect(int id, InetSocketAddress address, > intsendBufferSize, > > int receiveBufferSize) > > /* wakeup this selector if it is currently awaiting data */ > > public void wakeup() > > /* user provides sends, recieves, and a timeout. this method will > > populate "completed" and "disconnects" lists. Method blocks for up to the > > timeout waiting for data to read. */ > > public void poll(long timeout, List<Send> sends, List<Send> completed, > > List<Receive> receives, List<Integer> disconnects) > > } > > > > The consumer and producer would then each define their own logic to > manage > > their set of in-flight requests. > > > > Producer Implementation > > > > There are a couple of interesting changes I think we can make to the > > producer implementation. > > > > We retain the single background "sender" thread. > > > > But we can remove the definition of sync vs async clients. We always > return > > a "future" response immediately. Both sync and async sends would go > through > > the buffering that we currently do for the async layer. This would mean > > that even in sync mode while the event loop is doing network IO if many > > requests accumulate they will be sent in a single batch. This effectively > > acts as a kind of "group commit". So instead of having an "async" mode > that > > acts differently in some way you just have a max.delay time that controls > > how long the client will linger waiting for more data to accumulate. > > max.delay=0 is equivalent to the current sync producer. > > > > I would also propose changing our buffering strategy. Currently we queue > > unserialized requests in a BlockingQueue. This is not ideal as it is very > > difficult to reason about the memory usage of this queue. One 5MB message > > may be bigger than 10k small messages. I propose that (1) we change our > > queuing strategy to queue per-partition and (2) we directly write the > > messages to the ByteBuffer which will eventually be sent and use that as > > the "queue". The batch size should likewise be in bytes not in number of > > messages. > > > > If you think about it our current queuing strategy doesn't really make > > sense any more now that we always load balance over brokers. You set a > > batch size of N and we do a request when we have N messages in queue but > > this says nothing about the size of the requests that will be sent. You > > might end up sending all N messages to one server or you might end up > > sending 1 message to N different servers (totally defeating the purpose > of > > batching). > > > > There are two granularities of batching that could make sense: the broker > > level or the partition level. We do the send requests at the broker level > > but we do the disk IO at the partition level. I propose making the queues > > per-partition rather than per broker to avoid having to reshuffle the > > contents of queues when leadership changes. This could be debated, > though. > > > > If you actually look at the byte path of the producer this approach > allows > > cleaning a ton of stuff up. We can do in-pace writes to the destination > > buffer that we will eventually send. This does mean moving serialization > > and compression to the user thread. But I think this is good as these may > > be slow but aren't unpredictably slow. > > > > The per-partition queues are thus implemented with a bunch of > pre-allocated > > ByteBuffers sized to max.batch.size, when the buffer is full or the delay > > time elapses that buffer is sent. > > > > By doing this we could actually just reuse the buffers when the send is > > complete. This would be nice because since the buffers are used for > > allocation they will likely fall out of young gen. > > > > A question to think about is how we want to bound memory usage. I think > > what we want is the max.batch.size which controls the size of the > > individual buffers and total.buffer.memory which controls the total > memory > > used by all buffers. One problem with this is that there is the > possibility > > of some fragmentation. I.e. image a situation with 5k partitions being > > produced to, each getting a low but steady message rate. Giving each of > > these a 1MB buffer would require 5GB of buffer space to have a buffer for > > each partition. I'm not sure how bad this is since at least the memory > > usage is predictable and the reality is that holding thousands of java > > objects has huge overhead compared to contiguous byte arrays. > > > > -Jay > > >