AndrewJSchofield commented on code in PR #19417: URL: https://github.com/apache/kafka/pull/19417#discussion_r2041981277
########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -195,12 +194,14 @@ * * <h4>Per-record acknowledgement (explicit acknowledgement)</h4> * This example demonstrates using different acknowledgement types depending on the outcome of processing the records. + * Here the share.acknowledgement.mode property is set to "explicit" so the consumer must explicitly acknowledge each record. Review Comment: "{@code share.acknowledgement.mode}" ########## clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java: ########## @@ -26,7 +26,7 @@ /** * The consumer configuration behavior specific to share groups. */ -class ShareConsumerConfig extends ConsumerConfig { +public class ShareConsumerConfig extends ConsumerConfig { Review Comment: This should not be a public class. It's not part of the public interface. I know that it's currently referenced in `ShareConsumerImplTest` which is in the internals package, but I don't think that reference is necessary. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -116,22 +116,23 @@ * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures. * <p> - * The consumer can choose to use implicit or explicit acknowledgement of the records it processes. - * <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, - * it is using <em>explicit acknowledgement</em>. In this case: + * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the Review Comment: Having read it in a browser as generated javadoc, I suggest flipping this and explaining implicit mode first, and then explicit. Since implicit is the default, that's what people will get unless they set a config. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ########## @@ -456,7 +443,8 @@ private enum AcknowledgementMode { final ConsumerMetadata metadata, final int requestTimeoutMs, final int defaultApiTimeoutMs, - final String groupId) { + final String groupId, + final String acknowledgementMode) { Review Comment: I'd probably give this a different variable name, such as `acknowledgementModeConfig`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -116,22 +116,23 @@ * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures. * <p> - * The consumer can choose to use implicit or explicit acknowledgement of the records it processes. - * <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, - * it is using <em>explicit acknowledgement</em>. In this case: + * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the + * {@code share.acknowledgement.mode} property. If the property is not set, the default mode is <code>"implicit"</code>. + * + * <p>If the config is set to "explicit", the consumer is using <em>explicit acknowledgement</em>. In this case: * <ul> + * <li>The application must acknowledge all the records it received in the batch before the next call to ({@link #poll(Duration)}</li> Review Comment: nit: Extra "(" before `@link`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -381,18 +382,16 @@ public class ConsumerConfig extends AbstractConfig { private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; /** - * <code>share.acknowledgement.mode</code> is being evaluated as a new configuration to control the acknowledgement mode - * for share consumers. It will be removed or converted to a proper configuration before release. - * An alternative being considered is <code>enable.explicit.share.acknowledgement</code> as a boolean configuration. + * <code>share.acknowledgement.mode</code> */ - public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "internal.share.acknowledgement.mode"; - private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." + - " If unset, the acknowledgement mode of the consumer is decided by the method calls it uses to fetch and commit." + + public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "share.acknowledgement.mode"; + private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." + " If set to <code>implicit</code>, the acknowledgement mode of the consumer is implicit and it must not" + " use <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records. Instead," + " delivery is acknowledged implicitly on the next call to poll or commit." + " If set to <code>explicit</code>, the acknowledgement mode of the consumer is explicit and it must use" + - " <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records."; + " <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records." + + " Otherwise, the acknowledgement mode of the consumer is set to 'implicit' by default"; Review Comment: I would remove this sentence. It's already clear when you look at the generated information without this sentence. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; +import java.util.stream.Collectors; + +public class ShareAcknowledgementMode { + public enum AcknowledgementMode { + IMPLICIT, EXPLICIT; + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + } + + private final AcknowledgementMode acknowledgementMode; + + public static final ShareAcknowledgementMode IMPLICIT = new ShareAcknowledgementMode(AcknowledgementMode.IMPLICIT); + public static final ShareAcknowledgementMode EXPLICIT = new ShareAcknowledgementMode(AcknowledgementMode.EXPLICIT); + + private ShareAcknowledgementMode(AcknowledgementMode acknowledgementMode) { + this.acknowledgementMode = acknowledgementMode; + } + + /** + * Returns the ShareAcknowledgementMode from the given string. + */ + public static ShareAcknowledgementMode fromString(String acknowledgementMode) { + if (acknowledgementMode == null) { + throw new IllegalArgumentException("Acknowledgement mode is null"); + } + + if (Arrays.asList(Utils.enumOptions(AcknowledgementMode.class)).contains(acknowledgementMode)) { + AcknowledgementMode mode = AcknowledgementMode.valueOf(acknowledgementMode.toUpperCase(Locale.ROOT)); + switch (mode) { + case IMPLICIT: + return IMPLICIT; + case EXPLICIT: + return EXPLICIT; + default: + throw new IllegalArgumentException("Invalid acknowledgement mode: " + acknowledgementMode); + } + } else { + throw new IllegalArgumentException("Invalid acknowledgement mode: " + acknowledgementMode); + } + } + + /** + * Returns the acknowledgement mode enum. + */ + public AcknowledgementMode getAcknowledgementMode() { Review Comment: This method is unused, so I suggest removing it. And if it remains, Kafka getters don't start with `get`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org