lianetm commented on code in PR #18336:
URL: https://github.com/apache/kafka/pull/18336#discussion_r1900988248


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -70,8 +70,12 @@
  * <h3>Offsets and Consumer Position</h3>
  * Kafka maintains a numerical offset for each record in a partition. This 
offset acts as a unique identifier of
  * a record within that partition, and also denotes the position of the 
consumer in the partition. For example, a consumer
- * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5. There
- * are actually two notions of position relevant to the user of the consumer:
+ * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5.
+ * Note that offsets are not guaranteed to be consecutive (eg., for compacted 
topic, or&mdash;independent of "read_committed"
+ * mode&mdash; transactional topics). For example, if the consumer did read a 
record with offset 4, but 5 is not an offset
+ * with a record, it's position might advance to 6 (or higher) directly. 
Similarly, if the consumer's position is 5,
+ * but there is no record with offset 5, the consumer will return the record 
with the next higher offset.
+ * There are actually two notions of position relevant to the user of the 
consumer:

Review Comment:
   Related to this but on ln 84 right below (sorry couldn't add comment there):
   ```
   The {@link #commitSync() committed position} is the last offset that has 
been stored securely
   ```
   
   shouldn't that refer to committed(..) instead?
   ```
   The {@link #committed(Set) committed position} is the last offset that has 
been stored securely
   ```
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -266,8 +270,7 @@
  *                 for (ConsumerRecord&lt;String, String&gt; record : 
partitionRecords) {
  *                     System.out.println(record.offset() + &quot;: &quot; + 
record.value());
  *                 }
- *                 long lastOffset = 
partitionRecords.get(partitionRecords.size() - 1).offset();
- *                 consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(lastOffset + 1)));
+ *                 consumer.commitSync(Collections.singletonMap(partition, 
partitionRecords.nextOffsets().get(partition));

Review Comment:
   and missing parenthesis, I guess we should end with :
   ```suggestion
    *                 consumer.commitSync(Collections.singletonMap(partition, 
records.nextOffsets().get(partition)));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to