Hi, I apologize for misunderstanding your initial email. Unfortunately I still don't understand your question. Could you clarify what result you expect from your code, and what the actual behavior is?
Maybe also try and simplify the reproduction case. I see confusing use of a String constructor that could be causing your problem or masking it. Thanks, Greg On Fri, Dec 27, 2024, 5:47 PM Chain Head <mrchainh...@gmail.com> wrote: > Hi, > I am looking at parsing Produce request API on broker side. This is for > simulating a broker. No consumer is involved. Also, I am using 3.8.0. > > On Sat, 28 Dec, 2024, 04:47 Greg Harris, <greg.har...@aiven.io.invalid> > wrote: > > > Hi, > > > > Thanks for your question. > > > > It appears you're using the legacy consumer API, which was removed in > 2.0.0 > > and is no longer supported. > > I would strongly suggest building on top of the modern Java Consumer API > at > > this time. > > > > The modern API exposes the deserialized headers via the > > ConsumerRecord#headers method: > > > > > https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html > > > > Hope this helps, > > Greg > > > > On Fri, Dec 27, 2024, 10:19 AM Chain Head <mrchainh...@gmail.com> wrote: > > > > > Hi, > > > I am struggling to get the key-value pair from the Produce Request > API. I > > > want to write it to a Buffer for further processing. I can't seem to > get > > > the `k` and `v` values whereas the `.keySize` and `.valueSize` are > > reported > > > correctly. Please advise how to extract the key value pairs from the > > > Produce request API payload. > > > > > > For better format, see https://pastebin.com/ZKad1ET6 > > > > > > MemoryRecords partitionRecords = (MemoryRecords) > > > partitionData.records(); > > > for (RecordBatch batch : partitionRecords.batches()) { > > > // Iterate through reords of a batch > > > Buffer batchBuffer = Buffer.buffer(); > > > Iterator<org.apache.kafka.common.record.Record> it = > > > batch.iterator(); > > > while (it.hasNext()) { > > > org.apache.kafka.common.record.Record record = it.next(); > > > > > > String k = ""; > > > String v = ""; > > > > > > for (Header header : record.headers()) { > > > v = new String(header.value()); > > > // Some logic with k and v to write to a Buffer > > > } > > > > > > if (record.hasKey()) { > > > ByteBuffer keyBuffer = record.key(); > > > ByteBuffer valueBuffer = record.value(); > > > > > > if (record.hasValue()) { > > > k = new String(keyBuffer.array(), keyBuffer.position(), > > > record.keySize()); > > > v = new String(valueBuffer.array(), > > valueBuffer.position(), > > > record.valueSize()); > > > // Some logic with k and v to write to a Buffer > > > } else { > > > k = new String(keyBuffer.array(), keyBuffer.position(), > > > record.keySize()); > > > // Some logic with k and v to write to a Buffer > > > } > > > } else { > > > // Empty buffer > > > } > > > } > > > } > > > > > >