lianetm commented on code in PR #18336: URL: https://github.com/apache/kafka/pull/18336#discussion_r1900988248
########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -70,8 +70,12 @@ * <h3>Offsets and Consumer Position</h3> * Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of * a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer - * which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There - * are actually two notions of position relevant to the user of the consumer: + * which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. + * Note that offsets are not guaranteed to be consecutive (eg., for compacted topic, or—independent of "read_committed" + * mode— transactional topics). For example, if the consumer did read a record with offset 4, but 5 is not an offset + * with a record, it's position might advance to 6 (or higher) directly. Similarly, if the consumer's position is 5, + * but there is no record with offset 5, the consumer will return the record with the next higher offset. + * There are actually two notions of position relevant to the user of the consumer: Review Comment: Related to this but on ln 84 right below (sorry couldn't add comment there): ``` The {@link #commitSync() committed position} is the last offset that has been stored securely ``` shouldn't that refer to committed(..) instead? ``` The {@link #committed(Set) committed position} is the last offset that has been stored securely ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -266,8 +270,7 @@ * for (ConsumerRecord<String, String> record : partitionRecords) { * System.out.println(record.offset() + ": " + record.value()); * } - * long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); - * consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + * consumer.commitSync(Collections.singletonMap(partition, partitionRecords.nextOffsets().get(partition)); Review Comment: and missing parenthesis, I guess we should end with : ```suggestion * consumer.commitSync(Collections.singletonMap(partition, records.nextOffsets().get(partition))); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org