AndrewJSchofield commented on code in PR #19417:
URL: https://github.com/apache/kafka/pull/19417#discussion_r2041981277


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -195,12 +194,14 @@
  *
  * <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
  * This example demonstrates using different acknowledgement types depending 
on the outcome of processing the records.
+ * Here the share.acknowledgement.mode property is set to "explicit" so the 
consumer must explicitly acknowledge each record.

Review Comment:
   "{@code share.acknowledgement.mode}"



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java:
##########
@@ -26,7 +26,7 @@
 /**
  * The consumer configuration behavior specific to share groups.
  */
-class ShareConsumerConfig extends ConsumerConfig {
+public class ShareConsumerConfig extends ConsumerConfig {

Review Comment:
   This should not be a public class. It's not part of the public interface. I 
know that it's currently referenced in `ShareConsumerImplTest` which is in the 
internals package, but I don't think that reference is necessary.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -116,22 +116,23 @@
  * {@code group.share.record.lock.partition.limit}. By limiting the duration 
of the acquisition lock and automatically
  * releasing the locks, the broker ensures delivery progresses even in the 
presence of consumer failures.
  * <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord, 
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes by configuring the

Review Comment:
   Having read it in a browser as generated javadoc, I suggest flipping this 
and explaining implicit mode first, and then explicit. Since implicit is the 
default, that's what people will get unless they set a config.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -456,7 +443,8 @@ private enum AcknowledgementMode {
                       final ConsumerMetadata metadata,
                       final int requestTimeoutMs,
                       final int defaultApiTimeoutMs,
-                      final String groupId) {
+                      final String groupId,
+                      final String acknowledgementMode) {

Review Comment:
   I'd probably give this a different variable name, such as 
`acknowledgementModeConfig`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -116,22 +116,23 @@
  * {@code group.share.record.lock.partition.limit}. By limiting the duration 
of the acquisition lock and automatically
  * releasing the locks, the broker ensures delivery progresses even in the 
presence of consumer failures.
  * <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord, 
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes by configuring the
+ * {@code share.acknowledgement.mode} property. If the property is not set, 
the default mode is <code>"implicit"</code>.
+ *
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit 
acknowledgement</em>. In this case:
  * <ul>
+ *     <li>The application must acknowledge all the records it received in the 
batch before the next call to ({@link #poll(Duration)}</li>

Review Comment:
   nit: Extra "(" before `@link`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -381,18 +382,16 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String SECURITY_PROVIDERS_DOC = 
SecurityConfig.SECURITY_PROVIDERS_DOC;
 
     /**
-     * <code>share.acknowledgement.mode</code> is being evaluated as a new 
configuration to control the acknowledgement mode
-     * for share consumers. It will be removed or converted to a proper 
configuration before release.
-     * An alternative being considered is 
<code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
+     * <code>share.acknowledgement.mode</code>
      */
-    public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = 
"internal.share.acknowledgement.mode";
-    private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = 
"Controls the acknowledgement mode for a share consumer." +
-            " If unset, the acknowledgement mode of the consumer is decided by 
the method calls it uses to fetch and commit." +
+    public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = 
"share.acknowledgement.mode";
+    private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the 
acknowledgement mode for a share consumer." +
             " If set to <code>implicit</code>, the acknowledgement mode of the 
consumer is implicit and it must not" +
             " use 
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to 
acknowledge delivery of records. Instead," +
             " delivery is acknowledged implicitly on the next call to poll or 
commit." +
             " If set to <code>explicit</code>, the acknowledgement mode of the 
consumer is explicit and it must use" +
-            " 
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to 
acknowledge delivery of records.";
+            " 
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to 
acknowledge delivery of records." +
+            " Otherwise, the acknowledgement mode of the consumer is set to 
'implicit' by default";

Review Comment:
   I would remove this sentence. It's already clear when you look at the 
generated information without this sentence.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class ShareAcknowledgementMode {
+    public enum AcknowledgementMode {
+        IMPLICIT, EXPLICIT;
+
+        @Override
+        public String toString() {
+            return super.toString().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    private final AcknowledgementMode acknowledgementMode;
+
+    public static final ShareAcknowledgementMode IMPLICIT = new 
ShareAcknowledgementMode(AcknowledgementMode.IMPLICIT);
+    public static final ShareAcknowledgementMode EXPLICIT = new 
ShareAcknowledgementMode(AcknowledgementMode.EXPLICIT);
+
+    private ShareAcknowledgementMode(AcknowledgementMode acknowledgementMode) {
+        this.acknowledgementMode = acknowledgementMode;
+    }
+
+    /**
+     * Returns the ShareAcknowledgementMode from the given string.
+     */
+    public static ShareAcknowledgementMode fromString(String 
acknowledgementMode) {
+        if (acknowledgementMode == null) {
+            throw new IllegalArgumentException("Acknowledgement mode is null");
+        }
+
+        if 
(Arrays.asList(Utils.enumOptions(AcknowledgementMode.class)).contains(acknowledgementMode))
 {
+            AcknowledgementMode mode = 
AcknowledgementMode.valueOf(acknowledgementMode.toUpperCase(Locale.ROOT));
+            switch (mode) {
+                case IMPLICIT:
+                    return IMPLICIT;
+                case EXPLICIT:
+                    return EXPLICIT;
+                default:
+                    throw new IllegalArgumentException("Invalid 
acknowledgement mode: " + acknowledgementMode);
+            }
+        } else {
+            throw new IllegalArgumentException("Invalid acknowledgement mode: 
" + acknowledgementMode);
+        }
+    }
+
+    /**
+     * Returns the acknowledgement mode enum.
+     */
+    public AcknowledgementMode getAcknowledgementMode() {

Review Comment:
   This method is unused, so I suggest removing it. And if it remains, Kafka 
getters don't start with `get`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to