AndrewJSchofield commented on code in PR #19417:
URL: https://github.com/apache/kafka/pull/19417#discussion_r2039155579


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -381,18 +381,17 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String SECURITY_PROVIDERS_DOC = 
SecurityConfig.SECURITY_PROVIDERS_DOC;
 
     /**
-     * <code>share.acknowledgement.mode</code> is being evaluated as a new 
configuration to control the acknowledgement mode
-     * for share consumers. It will be removed or converted to a proper 
configuration before release.
-     * An alternative being considered is 
<code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
+     * <code>share.acknowledgement.mode</code> is a config to control the 
acknowledgement mode

Review Comment:
   Just `<code>share.acknowledgement.mode</code>` is sufficient. Because this 
config is no longer internal, it will appear in the documentation for consumer 
configs for free.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -381,18 +381,17 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String SECURITY_PROVIDERS_DOC = 
SecurityConfig.SECURITY_PROVIDERS_DOC;
 
     /**
-     * <code>share.acknowledgement.mode</code> is being evaluated as a new 
configuration to control the acknowledgement mode
-     * for share consumers. It will be removed or converted to a proper 
configuration before release.
-     * An alternative being considered is 
<code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
+     * <code>share.acknowledgement.mode</code> is a config to control the 
acknowledgement mode
+     * for share consumers. It can be set to implicit or explicit.
      */
-    public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = 
"internal.share.acknowledgement.mode";
-    private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = 
"Controls the acknowledgement mode for a share consumer." +
-            " If unset, the acknowledgement mode of the consumer is decided by 
the method calls it uses to fetch and commit." +
+    public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = 
"share.acknowledgement.mode";
+    private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the 
acknowledgement mode for a share consumer." +
             " If set to <code>implicit</code>, the acknowledgement mode of the 
consumer is implicit and it must not" +
             " use 
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to 
acknowledge delivery of records. Instead," +
             " delivery is acknowledged implicitly on the next call to poll or 
commit." +
             " If set to <code>explicit</code>, the acknowledgement mode of the 
consumer is explicit and it must use" +
-            " 
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to 
acknowledge delivery of records.";
+            " 
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to 
acknowledge delivery of records." +
+            " If unset, the acknowledgement mode of the consumer is set to 
'implicit' by default";

Review Comment:
   Maybe the final sentence would be better as `Otherwise, the acknowledgement 
mode is implicit.`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -695,12 +694,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.LOW,
                                         
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
-                                
.define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
+                                
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
                                         Type.STRING,
                                         null,
                                         in(null, "implicit", "explicit"),

Review Comment:
   I think you should be following the style of `AutoOffsetResetStrategy` or 
`GroupProtocol` for this configuration. In summary:
   * define an enum/class called `ShareAcknowledgementMode` in 
`o.a.k.clients.consumer.internals`
   * validate using the names of the enum constants, uppercasing and then 
matching so the application can use `Explicit` or `eXplicit` or whatever
   * Do not permit a null value. The default would be 
`ShareAcknowledgementMode.IMPLICIT.name().toLowerCase(Locale.ROOT)`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
  * as naturally happens when the locks time out. This limit is controlled by 
the broker configuration property
  * {@code group.share.record.lock.partition.limit}. By limiting the duration 
of the acquisition lock and automatically
  * releasing the locks, the broker ensures delivery progresses even in the 
presence of consumer failures.
+ *
  * <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord, 
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes by configuring the
+ * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the 
property is not set, the default mode is <code>"implicit"</code>.
+ *
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit 
acknowledgement</em>. In this case:
  * <ul>
+ *     <li>The application is expected to acknowledge all the records it 
received in the batch before the next call to ({@link #poll(Duration)}</li>
+ *     <li> If the application has some unacknowledged records before the next 
call to ({@link #poll(Duration)}, then the poll()

Review Comment:
   nit: Extra space after `<li>`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
  * as naturally happens when the locks time out. This limit is controlled by 
the broker configuration property
  * {@code group.share.record.lock.partition.limit}. By limiting the duration 
of the acquisition lock and automatically
  * releasing the locks, the broker ensures delivery progresses even in the 
presence of consumer failures.
+ *

Review Comment:
   nit: The style of the rest of this javadoc comment is not to have blank 
lines before the `<p>` and to have the `<p>` on a line by itself.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
  * as naturally happens when the locks time out. This limit is controlled by 
the broker configuration property
  * {@code group.share.record.lock.partition.limit}. By limiting the duration 
of the acquisition lock and automatically
  * releasing the locks, the broker ensures delivery progresses even in the 
presence of consumer failures.
+ *
  * <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord, 
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes by configuring the
+ * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the 
property is not set, the default mode is <code>"implicit"</code>.
+ *
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit 
acknowledgement</em>. In this case:
  * <ul>
+ *     <li>The application is expected to acknowledge all the records it 
received in the batch before the next call to ({@link #poll(Duration)}</li>
+ *     <li> If the application has some unacknowledged records before the next 
call to ({@link #poll(Duration)}, then the poll()
+ *     throws an {@link IllegalStateException}. This is because in explicit 
mode, all the records in the batch should be acknowledged before the next call 
to poll()</li>
  *     <li>The application calls {@link #commitSync()} or {@link 
#commitAsync()} which commits the acknowledgements to Kafka.
- *     If any records in the batch were not acknowledged, they remain acquired 
and will be presented to the application
- *     in response to a future poll.</li>
+ *     If any records in the batch were not acknowledged until the next 
poll(), an {@link IllegalStateException} is thrown.</li>
  *     <li>The application calls {@link #poll(Duration)} without committing 
first, which commits the acknowledgements to
  *     Kafka asynchronously. In this case, no exception is thrown by a failure 
to commit the acknowledgement.
- *     If any records in the batch were not acknowledged, they remain acquired 
and will be presented to the application
- *     in response to a future poll.</li>
+ *     If any records in the batch were not acknowledged, an {@link 
IllegalStateException} is thrown.</li>
  *     <li>The application calls {@link #close()} which attempts to commit any 
pending acknowledgements and
  *     releases any remaining acquired records.</li>
  * </ul>
- * If the application does not call {@link #acknowledge(ConsumerRecord, 
AcknowledgeType)} for any record in the batch,
- * it is using <em>implicit acknowledgement</em>. In this case:
+ * If the application sets the {@code share.acknowledgement.mode} property to 
"implicit" or does not configure the mode, then

Review Comment:
   Maybe `does not set the {@code share.acknowledgement.mode} property to 
"explicit", then`....



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -748,28 +713,33 @@ public void 
testExplicitAcknowledgementCommitAsyncPartialBatch() {
             // Acknowledging 2 out of the 3 records received via commitAsync.
             ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
             ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+            ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
             assertEquals(0L, firstRecord.offset());
             assertEquals(1L, secondRecord.offset());
 
             shareConsumer1.acknowledge(firstRecord);
             shareConsumer1.acknowledge(secondRecord);
             shareConsumer1.commitAsync();
 
-            // The 3rd record should be re-presented to the consumer when it 
polls again.
-            records = shareConsumer1.poll(Duration.ofMillis(5000));
-            assertEquals(1, records.count());
-            iterator = records.iterator();
-            firstRecord = iterator.next();
-            assertEquals(2L, firstRecord.offset());
+            producer.send(record4);
+            producer.flush();
+
+            // The next poll() should throw an IllegalStateException as there 
is still 1 unacknowledged record.
+            // In EXPLICIT acknowledgement mode, we are not allowed to have 
unacknowledged records from a batch.
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer1.poll(Duration.ofMillis(5000)));
+
+            // Acknowledging the 3rd record

Review Comment:
   Hmm. Interesting. So, can I still use the `records` collection from the last 
successful `poll(Duration)`? What if I reset the iterator and start running 
through it again?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -156,6 +157,7 @@
  * This example demonstrates implicit acknowledgement using {@link 
#poll(Duration)} to acknowledge the records which
  * were delivered in the previous poll. All the records delivered are 
implicitly marked as successfully consumed and
  * acknowledged synchronously with Kafka as the consumer fetches more records.
+ * The <code>share.acknowledgement.mode</code> property is not configured, so 
it is set to "implicit" by default.

Review Comment:
   I'd omit this line.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -140,9 +143,7 @@
  *     thrown by a failure to commit the acknowledgements.</li>
  *     <li>The application calls {@link #close()}  which releases any acquired 
records without acknowledgement.</li>
  * </ul>
- * <p>The consumer can optionally use the {@code 
internal.share.acknowledgement.mode} configuration property to choose
- * between implicit and explicit acknowledgement, specifying 
<code>"implicit"</code> or <code>"explicit"</code> as required.
- * <p>
+ *

Review Comment:
   I think you should have a `<p>` and no blank line.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
  * as naturally happens when the locks time out. This limit is controlled by 
the broker configuration property
  * {@code group.share.record.lock.partition.limit}. By limiting the duration 
of the acquisition lock and automatically
  * releasing the locks, the broker ensures delivery progresses even in the 
presence of consumer failures.
+ *
  * <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord, 
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes by configuring the
+ * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the 
property is not set, the default mode is <code>"implicit"</code>.
+ *
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit 
acknowledgement</em>. In this case:
  * <ul>
+ *     <li>The application is expected to acknowledge all the records it 
received in the batch before the next call to ({@link #poll(Duration)}</li>
+ *     <li> If the application has some unacknowledged records before the next 
call to ({@link #poll(Duration)}, then the poll()

Review Comment:
   Actually, I would remove this sentence. We can say it more clearly with 
fewer words.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -588,7 +588,7 @@ public void testControlRecordsSkipped() throws Exception {
     public void testExplicitAcknowledgeSuccess() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) {

Review Comment:
   I'd like to see constants for "explicit" and "implicit" for tests, apart 
from situations where invalid values are being tested intentionally.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -181,6 +183,7 @@
  *     props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
  *     props.setProperty(&quot;key.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.setProperty(&quot;value.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
+ *     props.setProperty(&quot;share.acknowledgement.mode&quot;, 
&quot;implicit&quot;);

Review Comment:
   And this one.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
  * as naturally happens when the locks time out. This limit is controlled by 
the broker configuration property
  * {@code group.share.record.lock.partition.limit}. By limiting the duration 
of the acquisition lock and automatically
  * releasing the locks, the broker ensures delivery progresses even in the 
presence of consumer failures.
+ *
  * <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord, 
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes by configuring the
+ * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the 
property is not set, the default mode is <code>"implicit"</code>.
+ *
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit 
acknowledgement</em>. In this case:
  * <ul>
+ *     <li>The application is expected to acknowledge all the records it 
received in the batch before the next call to ({@link #poll(Duration)}</li>

Review Comment:
   Let's make it stronger. `The application must acknowledge all the records it 
received in the batch before`... Also, the line lengths are intentionally quite 
short here for readability so please split this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to