showuon commented on code in PR #12753:
URL: https://github.com/apache/kafka/pull/12753#discussion_r1047936121


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,28 @@ public void commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next 
{@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset 
will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of 
consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked 
will have the offset specified, given that
+     * a record with that offset exists (i.e. it is a valid offset).
+     * <p>
+     * {@link #seekToBeginning(Collection)} will go to the first offset in the 
topic.
+     * seek(0) is equivalent to seekToBeginning for a TopicPartition with 
beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link #seekToEnd(Collection)} is equivalent to seeking to the last 
offset of the partition, but behavior depends on
+     * {@code isolation.level}, so see {@link #seekToBeginning(Collection)} 
documentation for more details.

Review Comment:
   I think you want to refer to `{@link #seekToEnd(Collection)}`, not `{@link 
#seekToBeginning(Collection)}`, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,28 @@ public void commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next 
{@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset 
will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of 
consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked 
will have the offset specified, given that
+     * a record with that offset exists (i.e. it is a valid offset).
+     * <p>
+     * {@link #seekToBeginning(Collection)} will go to the first offset in the 
topic.
+     * seek(0) is equivalent to seekToBeginning for a TopicPartition with 
beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link #seekToEnd(Collection)} is equivalent to seeking to the last 
offset of the partition, but behavior depends on
+     * {@code isolation.level}, so see {@link #seekToBeginning(Collection)} 
documentation for more details.
+     * <p>
+     * Seeking to the offset smaller than the log start offset or larger than 
the log end offset
+     * or high watermark means an invalid offset is reached.

Review Comment:
   The term needs to be updated. How about:
        * Seeking to the offset smaller than the log start offset or larger 
than the last offset means an invalid offset is reached.
   



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,53 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchOutOfRangeOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    // ensure no records arrive before poll is called so that the offset 
actually gets reset
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"0")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+
+  @Test
+  def testFetchOutOfRangeOffsetResetConfigLatest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    // ensure no records arrive before poll is called so that the offset 
actually gets reset
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"0")

Review Comment:
   ditto



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,28 @@ public void commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next 
{@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset 
will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of 
consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked 
will have the offset specified, given that
+     * a record with that offset exists (i.e. it is a valid offset).
+     * <p>
+     * {@link #seekToBeginning(Collection)} will go to the first offset in the 
topic.
+     * seek(0) is equivalent to seekToBeginning for a TopicPartition with 
beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link #seekToEnd(Collection)} is equivalent to seeking to the last 
offset of the partition, but behavior depends on
+     * {@code isolation.level}, so see {@link #seekToBeginning(Collection)} 
documentation for more details.
+     * <p>
+     * Seeking to the offset smaller than the log start offset or larger than 
the log end offset
+     * or high watermark means an invalid offset is reached.
+     * Invalid offset behaviour is controlled by the {@link ConsumerConfig 
AUTO_RESET_CONFIG} property.

Review Comment:
   `{@link ConsumerConfig AUTO_RESET_CONFIG}` should be updated to `{@code 
auto.offset.reset}`, because the link won't work with this format.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,28 @@ public void commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next 
{@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset 
will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of 
consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked 
will have the offset specified, given that
+     * a record with that offset exists (i.e. it is a valid offset).
+     * <p>
+     * {@link #seekToBeginning(Collection)} will go to the first offset in the 
topic.
+     * seek(0) is equivalent to seekToBeginning for a TopicPartition with 
beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link #seekToEnd(Collection)} is equivalent to seeking to the last 
offset of the partition, but behavior depends on
+     * {@code isolation.level}, so see {@link #seekToBeginning(Collection)} 
documentation for more details.
+     * <p>
+     * Seeking to the offset smaller than the log start offset or larger than 
the log end offset
+     * or high watermark means an invalid offset is reached.
+     * Invalid offset behaviour is controlled by the {@link ConsumerConfig 
AUTO_RESET_CONFIG} property.
+     * If this is set to "earliest", the next poll will return records from 
the starting offset.
+     * If it is set to "latest", it will seek to the last offset (similar to 
seekToEnd()).
+     * If it is set to "none", an {@code OffsetOutOfRangeException} will be 
thrown.
+     * <p>
+     * Note that, the seek offset won't change to the in-flight fetch request, 
it will take effect in next fetch request.
+     * So, the consumer might wait for {@code fetch.max.wait.ms} before 
starting to fetch the records from desired offset.
      *
+     * @param partition The TopicPartition on which the seek will be performed.
+     * @param offset the next offset returned by poll() will be either this or 
greater.

Review Comment:
   About the `greater`, I don't know what you are referring to. Could you 
elaborate more? 
   I think it's possible we returned greater offset if this is a compact topic. 
But I don't think we need to make this line that complicated.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,53 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchOutOfRangeOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    // ensure no records arrive before poll is called so that the offset 
actually gets reset
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"0")

Review Comment:
   I think this comment is not clear. Maybe 
   // ensure no in-flight fetch request so that the offset can be reset 
immediately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to