jsancio commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r965048542
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
}
}
+ /**
+ * Append records to the Raft log. They will be written out asynchronously.
+ *
+ * @param log The log4j logger.
+ * @param result The controller result we are writing out.
+ * @param maxRecordsPerBatch The maximum number of records to allow in
a batch.
+ * @param appender The callback to invoke for each batch. The
arguments are last
+ * write offset, record list, and the return
result is the new
+ * last write offset.
+ * @return The final offset that was returned from
the Raft layer.
+ */
+ static long appendRecords(
+ Logger log,
+ ControllerResult<?> result,
+ int maxRecordsPerBatch,
+ Function<List<ApiMessageAndVersion>, Long> appender
+ ) {
+ try {
+ List<ApiMessageAndVersion> records = result.records();
+ if (result.isAtomic()) {
+ // If the result must be written out atomically, check that it
is not too large.
+ // In general, we create atomic batches when it is important
to commit "all, or
+ // nothing". They are limited in size and must only be used
when the batch size
+ // is bounded.
+ if (records.size() > maxRecordsPerBatch) {
+ throw new IllegalStateException("Attempted to atomically
commit " +
+ records.size() + " records, but maxRecordsPerBatch
is " +
+ maxRecordsPerBatch);
+ }
+ long offset = appender.apply(records);
+ if (log.isTraceEnabled()) {
+ log.trace("Atomically appended {} record(s) ending with
offset {}.",
+ records.size(), offset);
+ }
+ return offset;
+ } else {
+ // If the result is non-atomic, then split it into as many
batches as needed.
+ // The appender callback will create an in-memory snapshot for
each batch,
+ // since we might need to revert to any of them. We will only
return the final
+ // offset of the last batch, however.
+ int i = 0, numBatches = 0;
+ while (true) {
+ numBatches++;
+ int j = i + maxRecordsPerBatch;
+ if (j > records.size()) {
+ long offset = appender.apply(records.subList(i,
records.size()));
Review Comment:
Hmm. What is the time complexity for `sublist`? This code assumes that we
have a `List`. I think `sublist` can have very different time complexity based
on the implementation of `List`. What do you think?
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
}
}
+ /**
+ * Append records to the Raft log. They will be written out asynchronously.
+ *
+ * @param log The log4j logger.
+ * @param result The controller result we are writing out.
+ * @param maxRecordsPerBatch The maximum number of records to allow in
a batch.
+ * @param appender The callback to invoke for each batch. The
arguments are last
+ * write offset, record list, and the return
result is the new
+ * last write offset.
+ * @return The final offset that was returned from
the Raft layer.
+ */
+ static long appendRecords(
+ Logger log,
+ ControllerResult<?> result,
+ int maxRecordsPerBatch,
+ Function<List<ApiMessageAndVersion>, Long> appender
+ ) {
+ try {
+ List<ApiMessageAndVersion> records = result.records();
+ if (result.isAtomic()) {
+ // If the result must be written out atomically, check that it
is not too large.
+ // In general, we create atomic batches when it is important
to commit "all, or
+ // nothing". They are limited in size and must only be used
when the batch size
+ // is bounded.
+ if (records.size() > maxRecordsPerBatch) {
+ throw new IllegalStateException("Attempted to atomically
commit " +
+ records.size() + " records, but maxRecordsPerBatch
is " +
+ maxRecordsPerBatch);
+ }
+ long offset = appender.apply(records);
+ if (log.isTraceEnabled()) {
+ log.trace("Atomically appended {} record(s) ending with
offset {}.",
+ records.size(), offset);
+ }
+ return offset;
+ } else {
+ // If the result is non-atomic, then split it into as many
batches as needed.
+ // The appender callback will create an in-memory snapshot for
each batch,
+ // since we might need to revert to any of them. We will only
return the final
+ // offset of the last batch, however.
+ int i = 0, numBatches = 0;
+ while (true) {
+ numBatches++;
+ int j = i + maxRecordsPerBatch;
Review Comment:
I agree.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]