Bump this thread to see if there are any comments/thoughts. Best, ShunKang
ShunKang Lin <linshunkang....@gmail.com> 于2022年9月30日周五 23:58写道: > Hi Divij Vaidya, > > 3. Sounds good, but `ByteBuffer#asReadOnlyBuffer()` returns a read-only > `ByteBuffer` which `ByteBuffer#hasArray()` returns false, then it will make > `Utils#writeTo(DataOutput, ByteBuffer, int)` perform efficiently Lower > (called in `DefaultRecord#writeTo(DataOutputStream, int, long, ByteBuffer, > ByteBuffer, Header[])`). By the way, > `ByteBufferSerializer#serialize(String, ByteBuffer)` has called > `ByteBuffer#flip()` which will modify the offset of the input buffer. > > In my opinion, it is acceptable to modify the offset of the input buffer. > After all, serialization means reading data and `ByteBuffer` needs to > modify the position and limit before reading the data. We just need to > assure the user that the input data will not be modified by the Kafka > library. > > Divij Vaidya <divijvaidy...@gmail.com> 于2022年9月29日周四 19:07写道: > >> 1. You are right. We append the message to the `DefaultRecord` and append >> is a copy operation. Hence, the ByteBuffer would be released at the end of >> the KafkaProducer#doSend() method. This comment is resolved. >> 2. I don't foresee any compatibility issues since #1 is not a problem >> anymore. This comment is resolved. >> >> New comments: >> >> 3. In the ByteBufferSerializer#serializeToByteBuffer, could we take the >> input ByteBuffer from the user application and return a >> `data.asReadOnlyBuffer()`? As I understand, it does not involve any data >> copy, hence no extra memory cost. On the upside, it would help provide the >> guarantee to the user that the data (and the points such as position, cap >> etc.) in the input ByteBuffer is not modified by the Kafka library. >> >> 4. Please change the documentation of the ByteBufferSerializer to clarify >> that Kafka code will not modify the buffer (neither the data of the >> provided input buffer nor the pointers). >> >> -- >> Divij Vaidya >> >> >> >> On Wed, Sep 28, 2022 at 5:35 PM ShunKang Lin <linshunkang....@gmail.com> >> wrote: >> >> > Hi Divij Vaidya, >> > >> > Thanks for your comments. >> > >> > 1. I checked the code of KafkaProducer#doSend() >> > and RecordAccumulator#append(), if KafkaProducer#doSend() returns it >> means >> > serializedKey and serializedValue have been appended to >> > ProducerBatch#recordsBuilder and we don't keep reference of >> serializedKey >> > and serializedValue. >> > >> > 2. According to 1, the user application can reuse the ByteBuffer to send >> > consecutive KafkaProducer#send() requests without breaking the user >> > application. If we are concerned about compatibility, we can provide >> > another Serializer, such as ZeroCopyByteBufferSerializer, and keep the >> > original ByteBufferSerializer unchanged. >> > >> > In my opinion, kafka-clients should provide some way for users who want >> to >> > improve application performance, if users want to improve application >> > performance, they should use lower level code and understand the >> underlying >> > implementation of these codes. >> > >> > Best, >> > ShunKang >> > >> > Divij Vaidya <divijvaidy...@gmail.com> 于2022年9月28日周三 19:58写道: >> > >> > > Hello >> > > >> > > I believe that the current behaviour of creating a copy of the user >> > > provided input is the correct approach because of the following >> reasons: >> > > >> > > 1. In the existing implementation (considering cases when T is >> ByteBuffer >> > > in Serializer#serialize(String,Headers,T)) we copy the data (T) into a >> > new >> > > byte[]. In the new approach, we would continue to re-use the >> ByteBuffer >> > > even after doSend() which means the `ProducerRecord` object cannot go >> out >> > > of scope from a GC perspective at the end of doSend(). Hence, the new >> > > approach may lead to increased heap memory usage for a greater period >> of >> > > time. >> > > >> > > 2. The new approach may break certain user applications e.g. consider >> an >> > > user application which re-uses the ByteBuffer (maybe it's a memory >> mapped >> > > byte buffer) to send consecutive Producer.send() requests. Prior to >> this >> > > change, they could do that because we copy the data from user provided >> > > input before storing it in the accumulator but after this change, they >> > will >> > > have to allocate a new ByteBuffer for every ProduceRecord. >> > > >> > > In general, I am of the opinion that any user provided data should be >> > > copied to internal data structures at the interface of an opaque >> library >> > > (like client) so that the user doesn't have to guess about memory >> > lifetime >> > > of the objects they provided to the opaque library. >> > > >> > > What do you think? >> > > >> > > -- >> > > Divij Vaidya >> > > >> > > >> > > >> > > On Sun, Sep 25, 2022 at 5:59 PM ShunKang Lin < >> linshunkang....@gmail.com> >> > > wrote: >> > > >> > > > Hi all, I'd like to start a new discussion thread on KIP-872 (Kafka >> > > Client) >> > > > which proposes that add Serializer#serializeToByteBuffer() to reduce >> > > memory >> > > > copying. >> > > > >> > > > KIP: >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828 >> > > > Thanks, ShunKang >> > > > >> > > >> > >> >