Hi,

I apologize for misunderstanding your initial email. Unfortunately I still
don't understand your question. Could you clarify what result you expect
from your code, and what the actual behavior is?

Maybe also try and simplify the reproduction case. I see confusing use of a
String constructor that could be causing your problem or masking it.

Thanks,
Greg


On Fri, Dec 27, 2024, 5:47 PM Chain Head <mrchainh...@gmail.com> wrote:

> Hi,
> I am looking at parsing Produce request API on broker side. This is for
> simulating a broker. No consumer is involved. Also, I am using 3.8.0.
>
> On Sat, 28 Dec, 2024, 04:47 Greg Harris, <greg.har...@aiven.io.invalid>
> wrote:
>
> > Hi,
> >
> > Thanks for your question.
> >
> > It appears you're using the legacy consumer API, which was removed in
> 2.0.0
> > and is no longer supported.
> > I would strongly suggest building on top of the modern Java Consumer API
> at
> > this time.
> >
> > The modern API exposes the deserialized headers via the
> > ConsumerRecord#headers method:
> >
> >
> https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
> >
> > Hope this helps,
> > Greg
> >
> > On Fri, Dec 27, 2024, 10:19 AM Chain Head <mrchainh...@gmail.com> wrote:
> >
> > > Hi,
> > > I am struggling to get the key-value pair from the Produce Request
> API. I
> > > want to write it to a Buffer for further processing. I can't seem to
> get
> > > the `k` and `v` values whereas the `.keySize` and `.valueSize` are
> > reported
> > > correctly. Please advise how to extract the key value pairs from the
> > > Produce request API payload.
> > >
> > > For better format, see https://pastebin.com/ZKad1ET6
> > >
> > >         MemoryRecords partitionRecords = (MemoryRecords)
> > > partitionData.records();
> > >         for (RecordBatch batch : partitionRecords.batches()) {
> > >           // Iterate through reords of a batch
> > >           Buffer batchBuffer = Buffer.buffer();
> > >           Iterator<org.apache.kafka.common.record.Record> it =
> > > batch.iterator();
> > >           while (it.hasNext()) {
> > >             org.apache.kafka.common.record.Record record = it.next();
> > >
> > >             String k = "";
> > >             String v = "";
> > >
> > >             for (Header header : record.headers()) {
> > >               v = new String(header.value());
> > >               // Some logic with k and v to write to a Buffer
> > >             }
> > >
> > >             if (record.hasKey()) {
> > >               ByteBuffer keyBuffer = record.key();
> > >               ByteBuffer valueBuffer = record.value();
> > >
> > >               if (record.hasValue()) {
> > >                 k = new String(keyBuffer.array(), keyBuffer.position(),
> > > record.keySize());
> > >                 v = new String(valueBuffer.array(),
> > valueBuffer.position(),
> > > record.valueSize());
> > >                 // Some logic with k and v to write to a Buffer
> > >               } else {
> > >                 k = new String(keyBuffer.array(), keyBuffer.position(),
> > > record.keySize());
> > >                 // Some logic with k and v to write to a Buffer
> > >               }
> > >             } else {
> > >               // Empty buffer
> > >             }
> > >           }
> > >           }
> > >
> >
>

Reply via email to