Hi Chain Head,

Are you seeing any errors, or just getting empty strings for k and v?

If you are seeing empty strings, it could well be that the ByteBuffer returned by Record.key() and Record.value() have their position set to the ByteBuffer's length. i.e. at the end of the ByteBuffer.

Have you checked the value of position() coming back from those ByteBuffers, and also have you tried creating the string from position 0 rather than using the position() result?

(Disclaimer, I haven't tried writing any code to test the values being returned by the key/value position() methods.)

Regards,

David Finnie
Infrasoft

On 28/12/2024 2:17, Chain Head wrote:
Hi,
I am struggling to get the key-value pair from the Produce Request API. I
want to write it to a Buffer for further processing. I can't seem to get
the `k` and `v` values whereas the `.keySize` and `.valueSize` are reported
correctly. Please advise how to extract the key value pairs from the
Produce request API payload.

For better format, seehttps://pastebin.com/ZKad1ET6

         MemoryRecords partitionRecords = (MemoryRecords)
partitionData.records();
         for (RecordBatch batch : partitionRecords.batches()) {
           // Iterate through reords of a batch
           Buffer batchBuffer = Buffer.buffer();
           Iterator<org.apache.kafka.common.record.Record> it =
batch.iterator();
           while (it.hasNext()) {
             org.apache.kafka.common.record.Record record = it.next();

             String k = "";
             String v = "";

             for (Header header : record.headers()) {
               v = new String(header.value());
               // Some logic with k and v to write to a Buffer
             }

             if (record.hasKey()) {
               ByteBuffer keyBuffer = record.key();
               ByteBuffer valueBuffer = record.value();

               if (record.hasValue()) {
                 k = new String(keyBuffer.array(), keyBuffer.position(),
record.keySize());
                 v = new String(valueBuffer.array(), valueBuffer.position(),
record.valueSize());
                 // Some logic with k and v to write to a Buffer
               } else {
                 k = new String(keyBuffer.array(), keyBuffer.position(),
record.keySize());
                 // Some logic with k and v to write to a Buffer
               }
             } else {
               // Empty buffer
             }
           }
           }

Reply via email to