Thanks! On Fri, Oct 24, 2014 at 9:03 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> I think 0.8.2 already used the new producer as the standard client. > > Guozhang > > On Fri, Oct 24, 2014 at 8:51 AM, Rajiv Kurian <ra...@signalfuse.com> > wrote: > > > Thanks I'll take a look at both. Just to be sure we are talking about > > client version 0.82 right? > > > > > > > > On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Rajiv, > > > > > > The new producer does maintain a buffer per partition, but you need to > > > consider synchronizing the access to the buffer since it can take data > > from > > > multiple caller threads. I think Jay's suggestion 1) does the same > thing > > > for your purpose if you already have the data buffer storing your data: > > by > > > creating a ProduceRecord it would not incur copying the data into a > > > temporary buffer, but instead mark the offset / length of in the byte > > > buffer that user specifies, the when producer.send() is called the > > > underlying buffer is copied to the producer buffer. If you do not have > > the > > > data buffer but just want to write directly to the producer buffer that > > is > > > one step further. > > > > > > As for the consumer, you can take a look at the MemoryRecord's iterator > > > implementation, which I think already implements what you want. > > > > > > Guozhang > > > > > > On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian <ra...@signalfuse.com> > > > wrote: > > > > > > > I want to avoid allocations since I am using Java in a C mode. Even > > > though > > > > creating objects is a mere thread local pointer bump in Java, freeing > > > them > > > > is not so cheap and causes uncontrollable jitter. The second > > motivation > > > is > > > > to avoid copying of data. Since I have objects which really look > like C > > > > structs that can be sent over the wire it's most efficient for me to > > > write > > > > them out in the very exact buffer that will be sent over the wire. > > > > > > > > As for the bad API I completely agree - it is a very C style API and > > > > definitely not usable in a productive way by most developers. My > point > > > was > > > > that this work is done by the protocol handling layer in any case, > > maybe > > > it > > > > can be extended to allow a user access to it's internals in a safe > way > > > both > > > > during writing and reading. The present API then can be written as a > > > layer > > > > over this "ugly" non allocating API. > > > > > > > > Re (1) and (2). Instead of giving out keys, values as bytes which > > implies > > > > copies, I'd ideally like to scribble them straight into the buffer > that > > > you > > > > are accumulating data onto before sending it. I am guessing you > already > > > > need a single buffer per partition or you have a single buffer per > > > broker. > > > > All of this probably implies a single threaded producer where I can > be > > in > > > > charge of the event loop. > > > > > > > > Right now my data is within ByteBuffer/Unsafe buffer based data > > > structures. > > > > They can be put on the wire without any serialization step if I was > > using > > > > Java NIO. Similarly they can be consumed on the other side without > any > > > > deserialization step. But with the current kafka API I have to: > > > > i) Copy data from my ByteBuffers onto new byte arrays. > > > > ii) Wrap byte arrays from (i) in a new object. I can't even re-use > > this > > > > object since I don't know when kafka's send thread/serialization > thread > > > is > > > > really done with it. > > > > iii) Write an encoder that just takes the byte array from this > wrapper > > > > object and hands it to Kafka. > > > > > > > > Similarly on the consumer: > > > > i) Kafka will make copies of slices (representing user values) of > the > > > > ByteBuffer that was transferred from a broker into byte arrays. > > > > ii) Allocate an object (using the decoder) that wraps these byte > > arrays > > > > and hand them to me. > > > > > > > > My imaginary (admittedly non-java-esque manual allocation style) API > > > would > > > > give me a pointer to Kafka's ByteBuffer that it has been accumulating > > > > protocol messages on for either writing (on producer) or reading (on > > > > consumer). I know it's a long shot but I still wanted to get the > team's > > > > thoughts on it. I'd be happy to contribute if we can come to an > > agreement > > > > on the API design. My hypothesis is that if the internal protocol > > parsing > > > > and buffer creation logic is written like this, it wouldn't be too > > tough > > > to > > > > expose it's innards and have the current encoding/decoding APIs just > > use > > > > this low level API/ > > > > > > > > Thanks for listening to my rant. > > > > > > > > > > > > On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > > > > > > > > It sounds like you are primarily interested in optimizing the > > producer? > > > > > > > > > > There is no way to produce data without any allocation being done > > and I > > > > > think getting to that would be pretty hard and lead to bad apis, > but > > > > > avoiding memory allocation entirely shouldn't be necessary. Small > > > > transient > > > > > objects in java are pretty cheap to allocate and deallocate. The > new > > > > Kafka > > > > > producer API that is on trunk and will be in 0.8.2 is much more > > > > disciplined > > > > > in it's usage of memory though there is still some allocation. The > > goal > > > > is > > > > > to avoid copying the *data* multiple times, even if we do end up > > > creating > > > > > some small helper objects along the way (the idea is that the data > > may > > > be > > > > > rather large). > > > > > > > > > > If you wanted to further optimize the new producer there are two > > things > > > > > that could be done that would help: > > > > > 1. Avoid the copy when creating the ProducerRecord instance. This > > could > > > > be > > > > > done by accepting a length/offset along with the key and value and > > > making > > > > > use of this when writing to the records instance. As it is your key > > and > > > > > value need to be complete byte arrays. > > > > > 2. Avoid the copy during request serialization. This is a little > > > > trickier. > > > > > During request serialization we need to take the records for each > > > > partition > > > > > and create a request that contains all of them. It is possible to > do > > > this > > > > > with no further recopying of data but somewhat tricky. > > > > > > > > > > My recommendation would be to try the new producer api and see how > > that > > > > > goes. If you need to optimize further we would definitely take > > patches > > > > for > > > > > (1) and (2). > > > > > > > > > > -Jay > > > > > > > > > > On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian < > ra...@signalfuse.com> > > > > > wrote: > > > > > > > > > > > I have a flyweight style protocol that I use for my messages. > Thus > > > they > > > > > > require no serialization/deserialization to be processed. The > > > messages > > > > > are > > > > > > just offset, length pairs within a ByteBuffer. > > > > > > > > > > > > Is there a producer and consumer API that forgoes allocation? I > > just > > > > want > > > > > > to give the kakfa producer offsets from a ByteBuffer. Similarly > it > > > > would > > > > > be > > > > > > ideal if I could get a ByteBuffer and offsets into it from the > > > > consumer. > > > > > > Even if I could get byte arrays (implying a copy but no decoding > > > phase) > > > > > on > > > > > > the consumer that would be great. Right now it seems to me that > the > > > > only > > > > > > way to get messages from Kafka is through a message object, which > > > > implies > > > > > > Kafka allocates these messages all the time. I am willing to use > > the > > > > > > upcoming 0.9 API too. > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >