showuon commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r964335502


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1148,19 +1225,12 @@ private void renounce() {
             controllerMetrics.setActive(false);
             purgatory.failAll(newNotControllerException());
 
-            if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-                snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-                authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
-            } else {
-                log.info("Unable to find last committed offset {} in snapshot 
registry; resetting " +
-                         "to empty state.", lastCommittedOffset);
-                resetToEmptyState();
-                authorizer.ifPresent(a -> 
a.loadSnapshot(Collections.emptyMap()));
-                needToCompleteAuthorizerLoad = authorizer.isPresent();
-                raftClient.unregister(metaLogListener);
-                metaLogListener = new QuorumMetaLogListener();
-                raftClient.register(metaLogListener);
+            if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+                throw new RuntimeException("Unable to find last committed 
offset " +
+                        lastCommittedEpoch + " in snapshot registry.");

Review Comment:
   We throw exception directly if we can't find snapshot now, and that will 
jump to faultHandler, without the following procedure, like 
`clusterControl.deactivate();`, is it expected? 



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in 
a batch.
+     * @param appender              The callback to invoke for each batch. The 
arguments are last
+     *                              write offset, record list, and the return 
result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from 
the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it 
is not too large.
+                // In general, we create atomic batches when it is important 
to commit "all, or
+                // nothing". They are limited in size and must only be used 
when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically 
commit " +
+                            records.size() + " records, but maxRecordsPerBatch 
is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with 
offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many 
batches as needed.
+                // The appender callback will create an in-memory snapshot for 
each batch,
+                // since we might need to revert to any of them. We will only 
return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;
+                    if (j > records.size()) {
+                        long offset = appender.apply(records.subList(i, 
records.size()));
+                        if (log.isTraceEnabled()) {
+                            log.trace("Appended {} record(s) in {} batch(es), 
ending with offset {}.",
+                                    records.size(), numBatches, offset);
+                        }
+                        return offset;
+                    } else {
+                        appender.apply(records.subList(i, j));
+                    }
+                    i += maxRecordsPerBatch;
+                }
+            }
+        } catch (ApiException e) {
+            // If the Raft client throws a subclass of ApiException, we need 
to convert it into a
+            // RuntimeException so that it will be handled as the unexpected 
exception that it is.
+            // ApiExceptions are reserved for expected errors such as 
incorrect uses of controller
+            // APIs, permission errors, NotControllerException, etc. etc.
+            throw new RuntimeException(e);

Review Comment:
   Thanks for adding the comments to make it clear!



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in 
a batch.
+     * @param appender              The callback to invoke for each batch. The 
arguments are last
+     *                              write offset, record list, and the return 
result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from 
the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it 
is not too large.
+                // In general, we create atomic batches when it is important 
to commit "all, or
+                // nothing". They are limited in size and must only be used 
when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically 
commit " +
+                            records.size() + " records, but maxRecordsPerBatch 
is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with 
offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many 
batches as needed.
+                // The appender callback will create an in-memory snapshot for 
each batch,
+                // since we might need to revert to any of them. We will only 
return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;

Review Comment:
   nit: rename variable for readability:
   i => startIndex
   j => endIndex



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to