AndrewJSchofield commented on code in PR #18336:
URL: https://github.com/apache/kafka/pull/18336#discussion_r1898872562


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -70,8 +70,12 @@
  * <h3>Offsets and Consumer Position</h3>
  * Kafka maintains a numerical offset for each record in a partition. This 
offset acts as a unique identifier of
  * a record within that partition, and also denotes the position of the 
consumer in the partition. For example, a consumer
- * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5. There
- * are actually two notions of position relevant to the user of the consumer:
+ * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5.
+ * Note that offsets are not guaranteed to be consecutive (eg., for compacted 
topic, or&mdash;independent of "read_committed"

Review Comment:
   I suggest the following the parentheses "(such as compacted topic or when 
records have been produced using transactions)".



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -677,7 +679,9 @@ public void beginTransaction() throws 
ProducerFencedException {
      * Sends a list of specified offsets to the consumer group coordinator, 
and also marks
      * those offsets as part of the current transaction. These offsets will be 
considered
      * committed only if the transaction is committed successfully. The 
committed offset should
-     * be the next message your application will consume, i.e. 
lastProcessedMessageOffset + 1.
+     * be the next message your application will consume, i.e. {@code 
nextRecordToBeProcessed.offset()}
+     * (or {@link ConsumerRecords#nextOffsets()}). You should also add the 
{@link ConsumerRecord#leaderEpoch()}
+     * (or {@code nextOffsets().get(...).leaderEpoch()}) as commit metadata.

Review Comment:
   Ditto



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1117,7 +1127,9 @@ public void commitAsync(OffsetCommitCallback callback) {
      * This commits offsets to Kafka. The offsets committed using this API 
will be used on the first fetch after every
      * rebalance and also on startup. As such, if you need to store offsets in 
anything other than Kafka, this API
      * should not be used. The committed offset should be the next message 
your application will consume,
-     * i.e. lastProcessedMessageOffset + 1. If automatic group management with 
{@link #subscribe(Collection)} is used,
+     * i.e. {@code nextRecordToBeProcessed.offset()} (or {@link 
ConsumerRecords#nextOffsets()}).
+     * You should also add the {@link ConsumerRecord#leaderEpoch()} (or {@code 
nextOffsets().get(...).leaderEpoch()})

Review Comment:
   Ditto.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1033,7 +1041,9 @@ public void commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
      * This commits offsets to Kafka. The offsets committed using this API 
will be used on the first fetch after every
      * rebalance and also on startup. As such, if you need to store offsets in 
anything other than Kafka, this API
      * should not be used. The committed offset should be the next message 
your application will consume,
-     * i.e. lastProcessedMessageOffset + 1. If automatic group management with 
{@link #subscribe(Collection)} is used,
+     * i.e. {@code nextRecordToBeProcessed.offset()} (or {@link 
ConsumerRecords#nextOffsets()}).
+     * You should also add the {@link ConsumerRecord#leaderEpoch()} (or {@code 
nextOffsets().get(...).leaderEpoch()})

Review Comment:
   Same point as above about `nextOffsets().get(...).leaderEpoch()`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -70,8 +70,12 @@
  * <h3>Offsets and Consumer Position</h3>
  * Kafka maintains a numerical offset for each record in a partition. This 
offset acts as a unique identifier of
  * a record within that partition, and also denotes the position of the 
consumer in the partition. For example, a consumer
- * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5. There
- * are actually two notions of position relevant to the user of the consumer:
+ * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5.
+ * Note that offsets are not guaranteed to be consecutive (eg., for compacted 
topic, or&mdash;independent of "read_committed"
+ * mode&mdash; transactional topics). For example, if the consumer did read a 
record with offset 4, but 5 is not an offset
+ * with a record, it's position might advance to 6 (or higher) directly. 
Similarly, if the consumer's position is 5,

Review Comment:
   nit: its not it's



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -984,7 +990,9 @@ public void commitSync(Duration timeout) {
      * This commits offsets to Kafka. The offsets committed using this API 
will be used on the first fetch after every
      * rebalance and also on startup. As such, if you need to store offsets in 
anything other than Kafka, this API
      * should not be used. The committed offset should be the next message 
your application will consume,
-     * i.e. lastProcessedMessageOffset + 1. If automatic group management with 
{@link #subscribe(Collection)} is used,
+     * i.e. {@code nextRecordToBeProcessed.offset()} (or {@link 
ConsumerRecords#nextOffsets()}).
+     * You should also add the {@link ConsumerRecord#leaderEpoch()} (or {@code 
nextOffsets().get(...).leaderEpoch()})

Review Comment:
   Maybe "You should also add the leader epoch as commit metadata, which can be 
obtained from {@link ConsumerRecord#leaderEpoch()} or {@link 
ConsumerRecords#nextOffsets}." I didn't find the 
`nextOffsets().get(...).leaderEpoch()` that easy to follow and the hyperlink to 
the ConsumerRecords seems nicer to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to