Hi, Thanks for your question.
It appears you're using the legacy consumer API, which was removed in 2.0.0 and is no longer supported. I would strongly suggest building on top of the modern Java Consumer API at this time. The modern API exposes the deserialized headers via the ConsumerRecord#headers method: https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html Hope this helps, Greg On Fri, Dec 27, 2024, 10:19 AM Chain Head <mrchainh...@gmail.com> wrote: > Hi, > I am struggling to get the key-value pair from the Produce Request API. I > want to write it to a Buffer for further processing. I can't seem to get > the `k` and `v` values whereas the `.keySize` and `.valueSize` are reported > correctly. Please advise how to extract the key value pairs from the > Produce request API payload. > > For better format, see https://pastebin.com/ZKad1ET6 > > MemoryRecords partitionRecords = (MemoryRecords) > partitionData.records(); > for (RecordBatch batch : partitionRecords.batches()) { > // Iterate through reords of a batch > Buffer batchBuffer = Buffer.buffer(); > Iterator<org.apache.kafka.common.record.Record> it = > batch.iterator(); > while (it.hasNext()) { > org.apache.kafka.common.record.Record record = it.next(); > > String k = ""; > String v = ""; > > for (Header header : record.headers()) { > v = new String(header.value()); > // Some logic with k and v to write to a Buffer > } > > if (record.hasKey()) { > ByteBuffer keyBuffer = record.key(); > ByteBuffer valueBuffer = record.value(); > > if (record.hasValue()) { > k = new String(keyBuffer.array(), keyBuffer.position(), > record.keySize()); > v = new String(valueBuffer.array(), valueBuffer.position(), > record.valueSize()); > // Some logic with k and v to write to a Buffer > } else { > k = new String(keyBuffer.array(), keyBuffer.position(), > record.keySize()); > // Some logic with k and v to write to a Buffer > } > } else { > // Empty buffer > } > } > } >