Hi Divij Vaidya, 3. Sounds good, but `ByteBuffer#asReadOnlyBuffer()` returns a read-only `ByteBuffer` which `ByteBuffer#hasArray()` returns false, then it will make `Utils#writeTo(DataOutput, ByteBuffer, int)` perform efficiently Lower (called in `DefaultRecord#writeTo(DataOutputStream, int, long, ByteBuffer, ByteBuffer, Header[])`). By the way, `ByteBufferSerializer#serialize(String, ByteBuffer)` has called `ByteBuffer#flip()` which will modify the offset of the input buffer.
In my opinion, it is acceptable to modify the offset of the input buffer. After all, serialization means reading data and `ByteBuffer` needs to modify the position and limit before reading the data. We just need to assure the user that the input data will not be modified by the Kafka library. Divij Vaidya <divijvaidy...@gmail.com> 于2022年9月29日周四 19:07写道: > 1. You are right. We append the message to the `DefaultRecord` and append > is a copy operation. Hence, the ByteBuffer would be released at the end of > the KafkaProducer#doSend() method. This comment is resolved. > 2. I don't foresee any compatibility issues since #1 is not a problem > anymore. This comment is resolved. > > New comments: > > 3. In the ByteBufferSerializer#serializeToByteBuffer, could we take the > input ByteBuffer from the user application and return a > `data.asReadOnlyBuffer()`? As I understand, it does not involve any data > copy, hence no extra memory cost. On the upside, it would help provide the > guarantee to the user that the data (and the points such as position, cap > etc.) in the input ByteBuffer is not modified by the Kafka library. > > 4. Please change the documentation of the ByteBufferSerializer to clarify > that Kafka code will not modify the buffer (neither the data of the > provided input buffer nor the pointers). > > -- > Divij Vaidya > > > > On Wed, Sep 28, 2022 at 5:35 PM ShunKang Lin <linshunkang....@gmail.com> > wrote: > > > Hi Divij Vaidya, > > > > Thanks for your comments. > > > > 1. I checked the code of KafkaProducer#doSend() > > and RecordAccumulator#append(), if KafkaProducer#doSend() returns it > means > > serializedKey and serializedValue have been appended to > > ProducerBatch#recordsBuilder and we don't keep reference of serializedKey > > and serializedValue. > > > > 2. According to 1, the user application can reuse the ByteBuffer to send > > consecutive KafkaProducer#send() requests without breaking the user > > application. If we are concerned about compatibility, we can provide > > another Serializer, such as ZeroCopyByteBufferSerializer, and keep the > > original ByteBufferSerializer unchanged. > > > > In my opinion, kafka-clients should provide some way for users who want > to > > improve application performance, if users want to improve application > > performance, they should use lower level code and understand the > underlying > > implementation of these codes. > > > > Best, > > ShunKang > > > > Divij Vaidya <divijvaidy...@gmail.com> 于2022年9月28日周三 19:58写道: > > > > > Hello > > > > > > I believe that the current behaviour of creating a copy of the user > > > provided input is the correct approach because of the following > reasons: > > > > > > 1. In the existing implementation (considering cases when T is > ByteBuffer > > > in Serializer#serialize(String,Headers,T)) we copy the data (T) into a > > new > > > byte[]. In the new approach, we would continue to re-use the ByteBuffer > > > even after doSend() which means the `ProducerRecord` object cannot go > out > > > of scope from a GC perspective at the end of doSend(). Hence, the new > > > approach may lead to increased heap memory usage for a greater period > of > > > time. > > > > > > 2. The new approach may break certain user applications e.g. consider > an > > > user application which re-uses the ByteBuffer (maybe it's a memory > mapped > > > byte buffer) to send consecutive Producer.send() requests. Prior to > this > > > change, they could do that because we copy the data from user provided > > > input before storing it in the accumulator but after this change, they > > will > > > have to allocate a new ByteBuffer for every ProduceRecord. > > > > > > In general, I am of the opinion that any user provided data should be > > > copied to internal data structures at the interface of an opaque > library > > > (like client) so that the user doesn't have to guess about memory > > lifetime > > > of the objects they provided to the opaque library. > > > > > > What do you think? > > > > > > -- > > > Divij Vaidya > > > > > > > > > > > > On Sun, Sep 25, 2022 at 5:59 PM ShunKang Lin < > linshunkang....@gmail.com> > > > wrote: > > > > > > > Hi all, I'd like to start a new discussion thread on KIP-872 (Kafka > > > Client) > > > > which proposes that add Serializer#serializeToByteBuffer() to reduce > > > memory > > > > copying. > > > > > > > > KIP: > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828 > > > > Thanks, ShunKang > > > > > > > > > >