I believe there are some open source C++ producer implementations. At linkedin we have a C++ implementation. We would like to open source this if there is interest. We would like to eventually include a C++ consumer as well.
-Jay On Mon, Jul 29, 2013 at 6:03 AM, Sybrandy, Casey < casey.sybra...@six3systems.com> wrote: > In the past there was some discussion about having a C client for non-JVM > languages. Is this still planned as well? Being able to work with Kafka > from other languages would be a great thing. Where I work, we interact > with Kafka via Java and Ruby (producer), so having an official C library > that can be used from within Ruby would make it easier to have the same > version of the client in Java and Ruby. > > -----Original Message----- > From: Jay Kreps [mailto:jay.kr...@gmail.com] > Sent: Friday, July 26, 2013 3:00 PM > To: d...@kafka.apache.org; users@kafka.apache.org > Subject: Client improvement discussion > > I sent around a wiki a few weeks back proposing a set of client > improvements that essentially amount to a rewrite of the producer and > consumer java clients. > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite > > The below discussion assumes you have read this wiki. > > I started to do a little prototyping for the producer and wanted to share > some of the ideas that came up to get early feedback. > > First, a few simple but perhaps controversial things to discuss. > > Rollout > Phase 1: We add the new clients. No change on the server. Old clients > still exist. The new clients will be entirely in a new package so there > will be no possibility of name collision. > Phase 2: We swap out all shared code on the server to use the new client > stuff. At this point the old clients still exist but are essentially > deprecated. > Phase 3: We remove the old client code. > > Java > I think we should do the clients in java. Making our users deal with > scala's non-compatability issues and crazy stack traces causes people a lot > of pain. Furthermore we end up having to wrap everything now to get a > usable java api anyway for non-scala people. This does mean maintaining a > substantial chunk of java code, which is maybe less fun than scala. But > basically i think we should optimize for the end user and produce a > standalone pure-java jar with no dependencies. > > Jars > We definitely want to separate out the client jar. There is also a fair > amount of code shared between both (exceptions, protocol definition, utils, > and the message set implementation). Two approaches. > Two jar approach: split kafka.jar into kafka-clients.jar and > kafka-server.jar with the server depending on the clients. The advantage of > this is that it is simple. The disadvantage is that things like utils and > protocol definition will be in the client jar though technical they belong > equally to the server. > Many jar approach: split kafka.jar into kafka-common.jar, > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and > kafka-server.jar. The disadvantage of this is that the user needs two jars > (common + something) which is for sure going to confuse people. I also > think this will tend to spawn more jars over time. > > Background threads > I am thinking of moving both serialization and compression out of the > background send thread. I will explain a little about this idea below. > > Serialization > I am not sure if we should handle serialization in the client at all. > Basically I wonder if our own API wouldn't just be a lot simpler if we > took a byte[] key and byte[] value and let people serialize stuff > themselves. > Injecting a class name for us to create the serializer is more roundabout > and has a lot of problems if the serializer itself requires a lot of > configuration or other objects to be instantiated. > > Partitioning > The real question with serialization is whether the partitioning should > happen on the java object or on the byte array key. The argument for doing > it on the java object is that it is easier to do something like a range > partition on the object. The problem with doing it on the object is that > the consumer may not be in java and so may not be able to reproduce the > partitioning. For example we currently use Object.hashCode which is a > little sketchy. We would be better off doing a standardized hash function > on the key bytes. If we want to give the partitioner access to the original > java object then obviously we need to handle serialization behind our api. > > Names > I think good names are important. I would like to rename the following > classes in the new client: > Message=>Record: Now that the message has both a message and a key it is > more of a KeyedMessage. Another name for a KeyedMessage is a Record. > MessageSet=>Records: This isn't too important but nit pickers complain > that it is not technically a Set but rather a List or Sequence but > MessageList sounds funny to me. > > The actual clients will not interact with these classes. They will > interact with a ProducerRecord and ConsumerRecord. The reason for having > different fields is because the different clients Proposed producer API: > SendResponse r = producer.send(new ProducerRecord(topic, key, value)) > > Protocol Definition > > Here is what I am thinking about protocol definition. I see a couple of > problems with what we are doing currently. First the protocol definition is > spread throughout a bunch of custom java objects. The error reporting in > these object is really terrible because they don't record the field names. > Furthermore people keep adding business logic into the protocol objects > which is pretty nasty. > > I would like to move to having a single Protocol.java file that defines > the protocol in a readable DSL. Here is what I am thinking: > > public static Schema REQUEST_HEADER = > > new Schema(new Field("api_key", INT16, "The id of the request type."), > > new Field("api_version", INT16, "The version of the API."), > > new Field("correlation_id", INT32, "A user-supplied > integer value that will be passed back with the response"), > > new Field("client_id", STRING, "A user specified > identifier for the client making the request.")); > > To parse one of these requests you would do > Struct struct = REQUEST_HEADER.parse(bytebuffer); > short apiKey = struct.get("api_key"); > > Internally Struct is just an Object[] with one entry per field which is > populated from the schema. The mapping of name to array index is a hash > table lookup. We can optimize access for performance critical areas by > allowing: > static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do > this once to lookup the index of the field > ... > Struct struct = REQUEST_HEADER.parse(bytebuffer); > short apiKey = struct.get(apiKeyField); // now this is just an array > access > > One advantage of this is this level of indirection will make it really > easy for us to handle backwards compatability in a more principled way. The > protocol file will actually contain ALL versions of the schema and we will > always use the appropriate version to read the request (as specified in the > header). > > NIO layer > > The plan is to add a non-blocking multi-connection abstraction that would > be used by both clients. > > class Selector { > /* create a new connection and associate it with the given id */ > public void connect(int id, InetSocketAddress address, > intsendBufferSize, int receiveBufferSize) > /* wakeup this selector if it is currently awaiting data */ > public void wakeup() > /* user provides sends, recieves, and a timeout. this method will > populate "completed" and "disconnects" lists. Method blocks for up to the > timeout waiting for data to read. */ > public void poll(long timeout, List<Send> sends, List<Send> completed, > List<Receive> receives, List<Integer> disconnects) } > > The consumer and producer would then each define their own logic to manage > their set of in-flight requests. > > Producer Implementation > > There are a couple of interesting changes I think we can make to the > producer implementation. > > We retain the single background "sender" thread. > > But we can remove the definition of sync vs async clients. We always > return a "future" response immediately. Both sync and async sends would go > through the buffering that we currently do for the async layer. This would > mean that even in sync mode while the event loop is doing network IO if > many requests accumulate they will be sent in a single batch. This > effectively acts as a kind of "group commit". So instead of having an > "async" mode that acts differently in some way you just have a max.delay > time that controls how long the client will linger waiting for more data to > accumulate. > max.delay=0 is equivalent to the current sync producer. > > I would also propose changing our buffering strategy. Currently we queue > unserialized requests in a BlockingQueue. This is not ideal as it is very > difficult to reason about the memory usage of this queue. One 5MB message > may be bigger than 10k small messages. I propose that (1) we change our > queuing strategy to queue per-partition and (2) we directly write the > messages to the ByteBuffer which will eventually be sent and use that as > the "queue". The batch size should likewise be in bytes not in number of > messages. > > If you think about it our current queuing strategy doesn't really make > sense any more now that we always load balance over brokers. You set a > batch size of N and we do a request when we have N messages in queue but > this says nothing about the size of the requests that will be sent. You > might end up sending all N messages to one server or you might end up > sending 1 message to N different servers (totally defeating the purpose of > batching). > > There are two granularities of batching that could make sense: the broker > level or the partition level. We do the send requests at the broker level > but we do the disk IO at the partition level. I propose making the queues > per-partition rather than per broker to avoid having to reshuffle the > contents of queues when leadership changes. This could be debated, though. > > If you actually look at the byte path of the producer this approach allows > cleaning a ton of stuff up. We can do in-pace writes to the destination > buffer that we will eventually send. This does mean moving serialization > and compression to the user thread. But I think this is good as these may > be slow but aren't unpredictably slow. > > The per-partition queues are thus implemented with a bunch of > pre-allocated ByteBuffers sized to max.batch.size, when the buffer is full > or the delay time elapses that buffer is sent. > > By doing this we could actually just reuse the buffers when the send is > complete. This would be nice because since the buffers are used for > allocation they will likely fall out of young gen. > > A question to think about is how we want to bound memory usage. I think > what we want is the max.batch.size which controls the size of the > individual buffers and total.buffer.memory which controls the total memory > used by all buffers. One problem with this is that there is the possibility > of some fragmentation. I.e. image a situation with 5k partitions being > produced to, each getting a low but steady message rate. Giving each of > these a 1MB buffer would require 5GB of buffer space to have a buffer for > each partition. I'm not sure how bad this is since at least the memory > usage is predictable and the reality is that holding thousands of java > objects has huge overhead compared to contiguous byte arrays. > > -Jay >