AndrewJSchofield commented on code in PR #19417: URL: https://github.com/apache/kafka/pull/19417#discussion_r2039155579
########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -381,18 +381,17 @@ public class ConsumerConfig extends AbstractConfig { private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; /** - * <code>share.acknowledgement.mode</code> is being evaluated as a new configuration to control the acknowledgement mode - * for share consumers. It will be removed or converted to a proper configuration before release. - * An alternative being considered is <code>enable.explicit.share.acknowledgement</code> as a boolean configuration. + * <code>share.acknowledgement.mode</code> is a config to control the acknowledgement mode Review Comment: Just `<code>share.acknowledgement.mode</code>` is sufficient. Because this config is no longer internal, it will appear in the documentation for consumer configs for free. ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -381,18 +381,17 @@ public class ConsumerConfig extends AbstractConfig { private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; /** - * <code>share.acknowledgement.mode</code> is being evaluated as a new configuration to control the acknowledgement mode - * for share consumers. It will be removed or converted to a proper configuration before release. - * An alternative being considered is <code>enable.explicit.share.acknowledgement</code> as a boolean configuration. + * <code>share.acknowledgement.mode</code> is a config to control the acknowledgement mode + * for share consumers. It can be set to implicit or explicit. */ - public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "internal.share.acknowledgement.mode"; - private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." + - " If unset, the acknowledgement mode of the consumer is decided by the method calls it uses to fetch and commit." + + public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "share.acknowledgement.mode"; + private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." + " If set to <code>implicit</code>, the acknowledgement mode of the consumer is implicit and it must not" + " use <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records. Instead," + " delivery is acknowledged implicitly on the next call to poll or commit." + " If set to <code>explicit</code>, the acknowledgement mode of the consumer is explicit and it must use" + - " <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records."; + " <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records." + + " If unset, the acknowledgement mode of the consumer is set to 'implicit' by default"; Review Comment: Maybe the final sentence would be better as `Otherwise, the acknowledgement mode is implicit.` ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -695,12 +694,12 @@ public class ConsumerConfig extends AbstractConfig { atLeast(0), Importance.LOW, CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC) - .define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, + .define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, Type.STRING, null, in(null, "implicit", "explicit"), Review Comment: I think you should be following the style of `AutoOffsetResetStrategy` or `GroupProtocol` for this configuration. In summary: * define an enum/class called `ShareAcknowledgementMode` in `o.a.k.clients.consumer.internals` * validate using the names of the enum constants, uppercasing and then matching so the application can use `Explicit` or `eXplicit` or whatever * Do not permit a null value. The default would be `ShareAcknowledgementMode.IMPLICIT.name().toLowerCase(Locale.ROOT)` ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -115,23 +115,26 @@ * as naturally happens when the locks time out. This limit is controlled by the broker configuration property * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures. + * * <p> - * The consumer can choose to use implicit or explicit acknowledgement of the records it processes. - * <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, - * it is using <em>explicit acknowledgement</em>. In this case: + * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the + * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the property is not set, the default mode is <code>"implicit"</code>. + * + * <p>If the config is set to "explicit", the consumer is using <em>explicit acknowledgement</em>. In this case: * <ul> + * <li>The application is expected to acknowledge all the records it received in the batch before the next call to ({@link #poll(Duration)}</li> + * <li> If the application has some unacknowledged records before the next call to ({@link #poll(Duration)}, then the poll() Review Comment: nit: Extra space after `<li>` ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -115,23 +115,26 @@ * as naturally happens when the locks time out. This limit is controlled by the broker configuration property * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures. + * Review Comment: nit: The style of the rest of this javadoc comment is not to have blank lines before the `<p>` and to have the `<p>` on a line by itself. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -115,23 +115,26 @@ * as naturally happens when the locks time out. This limit is controlled by the broker configuration property * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures. + * * <p> - * The consumer can choose to use implicit or explicit acknowledgement of the records it processes. - * <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, - * it is using <em>explicit acknowledgement</em>. In this case: + * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the + * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the property is not set, the default mode is <code>"implicit"</code>. + * + * <p>If the config is set to "explicit", the consumer is using <em>explicit acknowledgement</em>. In this case: * <ul> + * <li>The application is expected to acknowledge all the records it received in the batch before the next call to ({@link #poll(Duration)}</li> + * <li> If the application has some unacknowledged records before the next call to ({@link #poll(Duration)}, then the poll() + * throws an {@link IllegalStateException}. This is because in explicit mode, all the records in the batch should be acknowledged before the next call to poll()</li> * <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which commits the acknowledgements to Kafka. - * If any records in the batch were not acknowledged, they remain acquired and will be presented to the application - * in response to a future poll.</li> + * If any records in the batch were not acknowledged until the next poll(), an {@link IllegalStateException} is thrown.</li> * <li>The application calls {@link #poll(Duration)} without committing first, which commits the acknowledgements to * Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. - * If any records in the batch were not acknowledged, they remain acquired and will be presented to the application - * in response to a future poll.</li> + * If any records in the batch were not acknowledged, an {@link IllegalStateException} is thrown.</li> * <li>The application calls {@link #close()} which attempts to commit any pending acknowledgements and * releases any remaining acquired records.</li> * </ul> - * If the application does not call {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, - * it is using <em>implicit acknowledgement</em>. In this case: + * If the application sets the {@code share.acknowledgement.mode} property to "implicit" or does not configure the mode, then Review Comment: Maybe `does not set the {@code share.acknowledgement.mode} property to "explicit", then`.... ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ########## @@ -748,28 +713,33 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch() { // Acknowledging 2 out of the 3 records received via commitAsync. ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next(); assertEquals(0L, firstRecord.offset()); assertEquals(1L, secondRecord.offset()); shareConsumer1.acknowledge(firstRecord); shareConsumer1.acknowledge(secondRecord); shareConsumer1.commitAsync(); - // The 3rd record should be re-presented to the consumer when it polls again. - records = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - iterator = records.iterator(); - firstRecord = iterator.next(); - assertEquals(2L, firstRecord.offset()); + producer.send(record4); + producer.flush(); + + // The next poll() should throw an IllegalStateException as there is still 1 unacknowledged record. + // In EXPLICIT acknowledgement mode, we are not allowed to have unacknowledged records from a batch. + assertThrows(IllegalStateException.class, () -> shareConsumer1.poll(Duration.ofMillis(5000))); + + // Acknowledging the 3rd record Review Comment: Hmm. Interesting. So, can I still use the `records` collection from the last successful `poll(Duration)`? What if I reset the iterator and start running through it again? ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -156,6 +157,7 @@ * This example demonstrates implicit acknowledgement using {@link #poll(Duration)} to acknowledge the records which * were delivered in the previous poll. All the records delivered are implicitly marked as successfully consumed and * acknowledged synchronously with Kafka as the consumer fetches more records. + * The <code>share.acknowledgement.mode</code> property is not configured, so it is set to "implicit" by default. Review Comment: I'd omit this line. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -140,9 +143,7 @@ * thrown by a failure to commit the acknowledgements.</li> * <li>The application calls {@link #close()} which releases any acquired records without acknowledgement.</li> * </ul> - * <p>The consumer can optionally use the {@code internal.share.acknowledgement.mode} configuration property to choose - * between implicit and explicit acknowledgement, specifying <code>"implicit"</code> or <code>"explicit"</code> as required. - * <p> + * Review Comment: I think you should have a `<p>` and no blank line. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -115,23 +115,26 @@ * as naturally happens when the locks time out. This limit is controlled by the broker configuration property * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures. + * * <p> - * The consumer can choose to use implicit or explicit acknowledgement of the records it processes. - * <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, - * it is using <em>explicit acknowledgement</em>. In this case: + * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the + * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the property is not set, the default mode is <code>"implicit"</code>. + * + * <p>If the config is set to "explicit", the consumer is using <em>explicit acknowledgement</em>. In this case: * <ul> + * <li>The application is expected to acknowledge all the records it received in the batch before the next call to ({@link #poll(Duration)}</li> + * <li> If the application has some unacknowledged records before the next call to ({@link #poll(Duration)}, then the poll() Review Comment: Actually, I would remove this sentence. We can say it more clearly with fewer words. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ########## @@ -588,7 +588,7 @@ public void testControlRecordsSkipped() throws Exception { public void testExplicitAcknowledgeSuccess() { alterShareAutoOffsetReset("group1", "earliest"); try (Producer<byte[], byte[]> producer = createProducer(); - ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) { Review Comment: I'd like to see constants for "explicit" and "implicit" for tests, apart from situations where invalid values are being tested intentionally. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -181,6 +183,7 @@ * props.setProperty("group.id", "test"); * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("share.acknowledgement.mode", "implicit"); Review Comment: And this one. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -115,23 +115,26 @@ * as naturally happens when the locks time out. This limit is controlled by the broker configuration property * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures. + * * <p> - * The consumer can choose to use implicit or explicit acknowledgement of the records it processes. - * <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, - * it is using <em>explicit acknowledgement</em>. In this case: + * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the + * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the property is not set, the default mode is <code>"implicit"</code>. + * + * <p>If the config is set to "explicit", the consumer is using <em>explicit acknowledgement</em>. In this case: * <ul> + * <li>The application is expected to acknowledge all the records it received in the batch before the next call to ({@link #poll(Duration)}</li> Review Comment: Let's make it stronger. `The application must acknowledge all the records it received in the batch before`... Also, the line lengths are intentionally quite short here for readability so please split this line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org