AndrewJSchofield commented on code in PR #19417: URL: https://github.com/apache/kafka/pull/19417#discussion_r2044426375
########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java: ########## @@ -140,9 +129,17 @@ * thrown by a failure to commit the acknowledgements.</li> * <li>The application calls {@link #close()} which releases any acquired records without acknowledgement.</li> * </ul> - * <p>The consumer can optionally use the {@code internal.share.acknowledgement.mode} configuration property to choose - * between implicit and explicit acknowledgement, specifying <code>"implicit"</code> or <code>"explicit"</code> as required. - * <p> + * <p>If the config is set to "explicit", the consumer is using <em>explicit acknowledgement</em>. In this case: + * <ul> + * <li>The application must acknowledge all the records it received in the batch before the next call to {@link #poll(Duration)}</li> + * <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which commits the acknowledgements to Kafka. + * If any records in the batch were not acknowledged until the next poll(), an {@link IllegalStateException} is thrown.</li> + * <li>The application calls {@link #poll(Duration)} without committing first, which commits the acknowledgements to + * Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. + * If any records in the batch were not acknowledged, an {@link IllegalStateException} is thrown.</li> + * <li>The application calls {@link #close()} which attempts to commit any pending acknowledgements and + * releases any remaining acquired records.</li> + * </ul> Review Comment: Let's have another try at this. ``` * <p> * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the * consumer {@code share.acknowledgement.mode} property. * <p> * If the application sets the property to "implicit" or does not set it at all, then the consumer is using * <em>implicit acknowledgement</em>. In this mode, the application acknowledges delivery by: * <ul> * <li>Calling {@link #poll(Duration)} without committing, which also implicitly acknowledges all of * the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is * thrown by a failure to commit the acknowledgements.</li> * <li>Calling {@link #commitSync()} or {@link #commitAsync()} which implicitly acknowledges all of * the delivered records as processed successfully and commits the acknowledgements to Kafka.</li> * <li>Calling {@link #close()} which releases any acquired records without acknowledgement.</li> * </ul> * If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgment</em>. * The application must acknowledge all records returned from {@link #poll(Duration)} using * {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next call to {@link #poll(Duration)}. * If the application calls {@link #poll(Duration)} without having acknowledged all records, an * {@link IllegalStateException} is thrown. The remaining unacknowledged records can still be acknowledged. * In this mode, the application acknowledges delivery by: * <ul> * <li>Calling {@link #poll(Duration)} after it has acknowledged all records, which commits the acknowledgements * to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.</li> * <li>Calling {@link #commitSync()} or {@link #commitAsync()} which commits any pending * acknowledgements to Kafka.</li> * <li>Calling {@link #close()} which attempts to commit any pending acknowledgements and releases * any remaining acquired records.</li> * </ul> * The consumer guarantees that the records returned in the {@code ConsumerRecords} object for a specific topic-partition * are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records * in a batch are performed atomically. This makes error handling significantly more straightforward because there can be * one error code per partition. ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ########## @@ -673,6 +661,10 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> ack // Notify the network thread to wake up and start the next round of fetching applicationEventHandler.wakeupNetworkThread(); } + if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) { + // We cannot leave unacknowledged records in EXPLICIT acknowledgement mode, so we throw an exception to the application. + throw new IllegalStateException("There are unacknowledged records from the previous fetch : " + currentFetch.records()); Review Comment: It would be much better if the entire set of records was not appended. I suggest "All records must be acknowledged in explicit acknowledgement mode". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org