Hi, I am looking at parsing Produce request API on broker side. This is for simulating a broker. No consumer is involved. Also, I am using 3.8.0.
On Sat, 28 Dec, 2024, 04:47 Greg Harris, <greg.har...@aiven.io.invalid> wrote: > Hi, > > Thanks for your question. > > It appears you're using the legacy consumer API, which was removed in 2.0.0 > and is no longer supported. > I would strongly suggest building on top of the modern Java Consumer API at > this time. > > The modern API exposes the deserialized headers via the > ConsumerRecord#headers method: > > https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html > > Hope this helps, > Greg > > On Fri, Dec 27, 2024, 10:19 AM Chain Head <mrchainh...@gmail.com> wrote: > > > Hi, > > I am struggling to get the key-value pair from the Produce Request API. I > > want to write it to a Buffer for further processing. I can't seem to get > > the `k` and `v` values whereas the `.keySize` and `.valueSize` are > reported > > correctly. Please advise how to extract the key value pairs from the > > Produce request API payload. > > > > For better format, see https://pastebin.com/ZKad1ET6 > > > > MemoryRecords partitionRecords = (MemoryRecords) > > partitionData.records(); > > for (RecordBatch batch : partitionRecords.batches()) { > > // Iterate through reords of a batch > > Buffer batchBuffer = Buffer.buffer(); > > Iterator<org.apache.kafka.common.record.Record> it = > > batch.iterator(); > > while (it.hasNext()) { > > org.apache.kafka.common.record.Record record = it.next(); > > > > String k = ""; > > String v = ""; > > > > for (Header header : record.headers()) { > > v = new String(header.value()); > > // Some logic with k and v to write to a Buffer > > } > > > > if (record.hasKey()) { > > ByteBuffer keyBuffer = record.key(); > > ByteBuffer valueBuffer = record.value(); > > > > if (record.hasValue()) { > > k = new String(keyBuffer.array(), keyBuffer.position(), > > record.keySize()); > > v = new String(valueBuffer.array(), > valueBuffer.position(), > > record.valueSize()); > > // Some logic with k and v to write to a Buffer > > } else { > > k = new String(keyBuffer.array(), keyBuffer.position(), > > record.keySize()); > > // Some logic with k and v to write to a Buffer > > } > > } else { > > // Empty buffer > > } > > } > > } > > >