jsancio commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r516172571



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1017,12 +1028,9 @@ private boolean handleFetchResponse(
                 
log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
                     logger.info("Truncated to offset {} from Fetch response 
from leader {}",
                         truncationOffset, quorum.leaderIdOrNil());
-
-                    // Since the end offset has been updated, we should 
complete any delayed
-                    // reads at the end offset.
-                    fetchPurgatory.maybeComplete(
-                        new LogOffset(Long.MAX_VALUE, Isolation.UNCOMMITTED),
-                        currentTimeMs);
+                    // After truncation, we complete all pending reads in 
order to
+                    // ensure that fetches account for the updated log end 
offset
+                    fetchPurgatory.completeAll(currentTimeMs);

Review comment:
       > I had considered this previously and decided to leave the fetches in 
purgatory while the election was in progress to prevent unnecessary retries 
since that is all the client can do while waiting for the outcome. On the other 
hand, some of the fetches in purgatory might be from other voters. It might be 
better to respond more quickly so that there are not any unnecessary election 
delays. I'd suggest we open a separate issue to consider this.
   
   Sounds good to create a Jira for this.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1778,4 +1808,98 @@ public void complete() {
         }
     }
 
+    private final class ListenerContext implements 
CloseListener<BatchReader<T>> {
+        private final RaftClient.Listener<T> listener;
+        private BatchReader<T> lastSent = null;
+        private long lastAckedOffset = 0;
+        private int claimedEpoch = 0;
+
+        private ListenerContext(Listener<T> listener) {
+            this.listener = listener;
+        }
+
+        /**
+         * Get the last acked offset, which is one greater than the offset of 
the
+         * last record which was acked by the state machine.
+         */
+        public synchronized long lastAckedOffset() {
+            return lastAckedOffset;
+        }
+
+        /**
+         * Get the next expected offset, which might be larger than the last 
acked
+         * offset if there are inflight batches which have not been acked yet.
+         * Note that when fetching from disk, we may not know the last offset 
of
+         * inflight data until it has been processed by the state machine. In 
this case,
+         * we delay sending additional data until the state machine has read 
to the
+         * end and the last offset is determined.
+         */
+        public synchronized OptionalLong nextExpectedOffset() {
+            if (lastSent != null) {
+                OptionalLong lastSentOffset = lastSent.lastOffset();
+                if (lastSentOffset.isPresent()) {
+                    return OptionalLong.of(lastSentOffset.getAsLong() + 1);
+                } else {
+                    return OptionalLong.empty();
+                }
+            } else {
+                return OptionalLong.of(lastAckedOffset);
+            }
+        }
+
+        /**
+         * This API is used for committed records that have been received 
through
+         * replication. In general, followers will write new data to disk 
before they
+         * know whether it has been committed. Rather than retaining the 
uncommitted
+         * data in memory, we let the state machine read the records from disk.
+         */
+        public void fireHandleCommit(long baseOffset, Records records) {
+            BufferSupplier bufferSupplier = BufferSupplier.create();
+            RecordsBatchReader<T> reader = new 
RecordsBatchReader<>(baseOffset, records,
+                serde, bufferSupplier, this);
+            fireHandleCommit(reader);
+        }
+
+        /**
+         * This API is used for committed records originating from {@link 
#scheduleAppend(int, List)}
+         * on this instance. In this case, we are able to save the original 
record objects,
+         * which saves the need to read them back from disk. This is a nice 
optimization
+         * for the leader which is typically doing more work than all of the 
followers.
+         */
+        public void fireHandleCommit(long baseOffset, int epoch, List<T> 
records) {
+            BatchReader.Batch<T> batch = new BatchReader.Batch<>(baseOffset, 
epoch, records);
+            MemoryBatchReader<T> reader = new 
MemoryBatchReader<>(Collections.singletonList(batch), this);
+            fireHandleCommit(reader);
+        }
+
+        private void fireHandleCommit(BatchReader<T> reader) {
+            synchronized (this) {
+                this.lastSent = reader;
+            }
+            listener.handleCommit(reader);

Review comment:
       >  At the moment, I am leaning toward the latter. In any case, I suggest 
we let the errors propagate for now and file a jira to reconsider once we are 
closer to integration. Does that sound fair?
   
   Sounds fair to create a Jira and revisit this later.

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an 
interface
+ * is that it allows us to push blocking operations such as reads from disk 
outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will 
not
+ * affect replication.
+ *
+ * @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, 
Closeable {

Review comment:
       > We need some way to communicate iteration progress back to the IO 
thread. It is probably still possible to do this with a layer of indirection 
through Iterable, but it seemed more natural if the IO thread had access to the 
Iterator object that was used by the listener.
   
   Okay. Specifically, you are saying that this would be difficult to implement 
with an `Iterable`:
   ```java
   lastReturnedOffset = res.lastOffset();
   ```
   
https://github.com/apache/kafka/pull/9482/files/867650fa2344497ac3f3505bd5058f2eae0cc0c4#diff-37728c07e52382d38a9db6f655c3921d274c4277a8909d7613fc433d6bf69636R140




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to