Apologies for disappearing. This is how I got it working. In hindsight, I
should have known better and tried this first.

byte[] keyBytes = new byte[record.keySize()];
record.key().get(keyBytes);
byte[] valueBytes = new byte[record.valueSize()];
record.value().get(valueBytes);

On Sun, Dec 29, 2024 at 7:01 AM David Finnie <david.fin...@infrasoft.com.au>
wrote:

> Hi Chain Head,
>
> Are you seeing any errors, or just getting empty strings for k and v?
>
> If you are seeing empty strings, it could well be that the ByteBuffer
> returned by Record.key() and Record.value() have their position set to
> the ByteBuffer's length. i.e. at the end of the ByteBuffer.
>
> Have you checked the value of position() coming back from those
> ByteBuffers, and also have you tried creating the string from position 0
> rather than using the position() result?
>
> (Disclaimer, I haven't tried writing any code to test the values being
> returned by the key/value position() methods.)
>
> Regards,
>
> David Finnie
> Infrasoft
>
> On 28/12/2024 2:17, Chain Head wrote:
> > Hi,
> > I am struggling to get the key-value pair from the Produce Request API. I
> > want to write it to a Buffer for further processing. I can't seem to get
> > the `k` and `v` values whereas the `.keySize` and `.valueSize` are
> reported
> > correctly. Please advise how to extract the key value pairs from the
> > Produce request API payload.
> >
> > For better format, seehttps://pastebin.com/ZKad1ET6
> >
> >          MemoryRecords partitionRecords = (MemoryRecords)
> > partitionData.records();
> >          for (RecordBatch batch : partitionRecords.batches()) {
> >            // Iterate through reords of a batch
> >            Buffer batchBuffer = Buffer.buffer();
> >            Iterator<org.apache.kafka.common.record.Record> it =
> > batch.iterator();
> >            while (it.hasNext()) {
> >              org.apache.kafka.common.record.Record record = it.next();
> >
> >              String k = "";
> >              String v = "";
> >
> >              for (Header header : record.headers()) {
> >                v = new String(header.value());
> >                // Some logic with k and v to write to a Buffer
> >              }
> >
> >              if (record.hasKey()) {
> >                ByteBuffer keyBuffer = record.key();
> >                ByteBuffer valueBuffer = record.value();
> >
> >                if (record.hasValue()) {
> >                  k = new String(keyBuffer.array(), keyBuffer.position(),
> > record.keySize());
> >                  v = new String(valueBuffer.array(),
> valueBuffer.position(),
> > record.valueSize());
> >                  // Some logic with k and v to write to a Buffer
> >                } else {
> >                  k = new String(keyBuffer.array(), keyBuffer.position(),
> > record.keySize());
> >                  // Some logic with k and v to write to a Buffer
> >                }
> >              } else {
> >                // Empty buffer
> >              }
> >            }
> >            }
> >

Reply via email to