Hi,

Can somebody provide me with an example of how to formulate an
OffsetCommitRequest for a single stream/partition using SimpleConsumer from
java?

Both ends, really ... periodically issuing commits, but also how to get the
current offset when starting up.


I can show what I'm attempting ... but failing to connect the objects and
constructors:


TopicAndPartition key = new TopicAndPartition(topic, shardNum);
OffsetMetadataAndError value = new OffsetMetadataAndError(offset); /* ??? */

Map<TopicAndPartition, OffsetMetadataAndError> map =
Collections.singletonMap(key, value);

OffsetCommitRequest request = new OffsetCommitRequest(
        groupId,
        map,
        kafka.api.OffsetCommitRequest.CurrentVersion(),
        0, /* what do I do with this - correlation id? */
        clientName);

Reply via email to