AndrewJSchofield commented on code in PR #19417:
URL: https://github.com/apache/kafka/pull/19417#discussion_r2044426375


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -140,9 +129,17 @@
  *     thrown by a failure to commit the acknowledgements.</li>
  *     <li>The application calls {@link #close()}  which releases any acquired 
records without acknowledgement.</li>
  * </ul>
- * <p>The consumer can optionally use the {@code 
internal.share.acknowledgement.mode} configuration property to choose
- * between implicit and explicit acknowledgement, specifying 
<code>"implicit"</code> or <code>"explicit"</code> as required.
- * <p>
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit 
acknowledgement</em>. In this case:
+ * <ul>
+ *     <li>The application must acknowledge all the records it received in the 
batch before the next call to {@link #poll(Duration)}</li>
+ *     <li>The application calls {@link #commitSync()} or {@link 
#commitAsync()} which commits the acknowledgements to Kafka.
+ *     If any records in the batch were not acknowledged until the next 
poll(), an {@link IllegalStateException} is thrown.</li>
+ *     <li>The application calls {@link #poll(Duration)} without committing 
first, which commits the acknowledgements to
+ *     Kafka asynchronously. In this case, no exception is thrown by a failure 
to commit the acknowledgement.
+ *     If any records in the batch were not acknowledged, an {@link 
IllegalStateException} is thrown.</li>
+ *     <li>The application calls {@link #close()} which attempts to commit any 
pending acknowledgements and
+ *     releases any remaining acquired records.</li>
+ * </ul>

Review Comment:
   Let's have another try at this.
   ```
    * <p>
    * The consumer can choose to use implicit or explicit acknowledgement of 
the records it processes by configuring the
    * consumer {@code share.acknowledgement.mode} property.
    * <p>
    * If the application sets the property to "implicit" or does not set it at 
all, then the consumer is using
    * <em>implicit acknowledgement</em>. In this mode, the application 
acknowledges delivery by:
    * <ul>
    *     <li>Calling {@link #poll(Duration)} without committing, which also 
implicitly acknowledges all of
    *     the delivered records and commits the acknowledgements to Kafka 
asynchronously. In this case, no exception is
    *     thrown by a failure to commit the acknowledgements.</li>
    *     <li>Calling {@link #commitSync()} or {@link #commitAsync()} which 
implicitly acknowledges all of
    *     the delivered records as processed successfully and commits the 
acknowledgements to Kafka.</li>
    *     <li>Calling {@link #close()} which releases any acquired records 
without acknowledgement.</li>
    * </ul>
    * If the application sets the property to "explicit", then the consumer is 
using <em>explicit acknowledgment</em>.
    * The application must acknowledge all records returned from {@link 
#poll(Duration)} using
    * {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next 
call to {@link #poll(Duration)}.
    * If the application calls {@link #poll(Duration)} without having 
acknowledged all records, an
    * {@link IllegalStateException} is thrown. The remaining unacknowledged 
records can still be acknowledged.
    * In this mode, the application acknowledges delivery by:
    * <ul>
    *     <li>Calling {@link #poll(Duration)} after it has acknowledged all 
records, which commits the acknowledgements
    *     to Kafka asynchronously. In this case, no exception is thrown by a 
failure to commit the acknowledgements.</li>
    *     <li>Calling {@link #commitSync()} or {@link #commitAsync()} which 
commits any pending
    *     acknowledgements to Kafka.</li>
    *     <li>Calling {@link #close()} which attempts to commit any pending 
acknowledgements and releases
    *     any remaining acquired records.</li>
    * </ul>
    * The consumer guarantees that the records returned in the {@code 
ConsumerRecords} object for a specific topic-partition
    * are in order of increasing offset. For each topic-partition, Kafka 
guarantees that acknowledgements for the records
    * in a batch are performed atomically. This makes error handling 
significantly more straightforward because there can be
    * one error code per partition.
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -673,6 +661,10 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, 
NodeAcknowledgements> ack
                 // Notify the network thread to wake up and start the next 
round of fetching
                 applicationEventHandler.wakeupNetworkThread();
             }
+            if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
+                // We cannot leave unacknowledged records in EXPLICIT 
acknowledgement mode, so we throw an exception to the application.
+                throw new IllegalStateException("There are unacknowledged 
records from the previous fetch : " + currentFetch.records());

Review Comment:
   It would be much better if the entire set of records was not appended. I 
suggest "All records must be acknowledged in explicit acknowledgement mode".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to