Hi, Can somebody provide me with an example of how to formulate an OffsetCommitRequest for a single stream/partition using SimpleConsumer from java?
Both ends, really ... periodically issuing commits, but also how to get the current offset when starting up. I can show what I'm attempting ... but failing to connect the objects and constructors: TopicAndPartition key = new TopicAndPartition(topic, shardNum); OffsetMetadataAndError value = new OffsetMetadataAndError(offset); /* ??? */ Map<TopicAndPartition, OffsetMetadataAndError> map = Collections.singletonMap(key, value); OffsetCommitRequest request = new OffsetCommitRequest( groupId, map, kafka.api.OffsetCommitRequest.CurrentVersion(), 0, /* what do I do with this - correlation id? */ clientName);