hachikuji commented on code in PR #12761:
URL: https://github.com/apache/kafka/pull/12761#discussion_r1028360492


##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -725,7 +727,15 @@ object KafkaConfig {
     "This is required configuration when running in KRaft mode."
   val MetadataLogDirDoc = "This configuration determines where we put the 
metadata log for clusters in KRaft mode. " +
     "If it is not set, the metadata log is placed in the first log directory 
from log.dirs."
-  val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of 
bytes in the log between the latest snapshot and the high-watermark needed 
before generating a new snapshot."
+  val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of 
bytes in the log between the latest " +
+    "snapshot and the high-watermark needed before generating a new snapshot. 
The default value is " +
+    s"${Defaults.MetadataSnapshotMaxNewRecordBytes}. To geneate snapshots 
based on the time elapsed, see " +
+    s"the <code>$MetadataSnapshotMaxIntervalMsProp</code> configuration."
+  val MetadataSnapshotMaxIntervalMsDoc = "This is the maximum number of 
milliseconds to wait to generate a snapshot " +
+    "if there are committed records in the log that are not included in the 
latest snapshot. A value of zero disables " +
+    s"time based snapshot generation. The default value is 
${Defaults.MetadataSnapshotMaxIntervalMs}. To geneate " +

Review Comment:
   typo: "geneate" (used a couple times above)



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1455,30 +1524,56 @@ private void replay(ApiMessage message, 
Optional<OffsetAndEpoch> snapshotId, lon
     }
 
     private void maybeGenerateSnapshot() {
-        if (committedBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes &&
-            snapshotGeneratorManager.generator == null
-        ) {
-            if (!isActiveController()) {
-                // The active controller creates in-memory snapshot every time 
an uncommitted
-                // batch gets appended. The in-active controller can be more 
efficient and only
-                // create an in-memory snapshot when needed.
-                snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+        if (!snapshotGeneratorManager.snapshotInProgress()) {
+            Set<SnapshotReason> snapshotReasons = new HashSet<>();
+            // Check if a snapshot should be generated because of committed 
bytes
+            if (committedBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes) {
+                snapshotReasons.add(
+                    
SnapshotReason.maxBytesExceeded(committedBytesSinceLastSnapshot, 
snapshotMaxNewRecordBytes)
+                );
             }
 
-            log.info(
-                "Generating a snapshot that includes (epoch={}, offset={}) 
after {} committed bytes since the last snapshot because, {}.",
-                lastCommittedEpoch,
-                lastCommittedOffset,
-                committedBytesSinceLastSnapshot,
-                SnapshotReason.MaxBytesExceeded
-            );
+            // Check if a snapshot should be generated because of committed 
append times
+            if (snapshotMaxIntervalMs > 0) {
+                // Time base snasphots are enabled
+                long snapshotIntervalMs = time.milliseconds() - 
oldestCommittedLogOnlyAppendTimestamp;
+                if (snapshotIntervalMs >= snapshotMaxIntervalMs) {
+                    
snapshotReasons.add(SnapshotReason.maxIntervalExceeded(snapshotIntervalMs, 
snapshotMaxIntervalMs));
+                } else {
+                    maybeScheduleNextGenerateSnapshot();
+                }
+            }
 
-            snapshotGeneratorManager.createSnapshotGenerator(
-                lastCommittedOffset,
-                lastCommittedEpoch,
-                lastCommittedTimestamp
-            );
-            committedBytesSinceLastSnapshot = 0;
+            if (!snapshotReasons.isEmpty()) {
+                if (!isActiveController()) {
+                    // The inactive controllers only create an in-memory 
snapshot when generating a snapshot. This is
+                    // unlike the active controller which creates in-memory 
snapshots every time an uncommitted batch
+                    // gets replayed.
+                    snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+                }
+
+                log.info(
+                    "Generating a snapshot that includes (epoch={}, offset={}) 
because: {}",
+                    lastCommittedEpoch,
+                    lastCommittedOffset,
+                    SnapshotReason.stringFromReasons(snapshotReasons)
+                );
+
+                
snapshotGeneratorManager.createSnapshotGenerator(lastCommittedOffset, 
lastCommittedEpoch, lastCommittedTimestamp);
+
+                // Reset all of the snapshot counters
+                committedBytesSinceLastSnapshot = 0;
+                oldestCommittedLogOnlyAppendTimestamp = Long.MAX_VALUE;
+
+                // Starting a snapshot invalidates any scheduled snapshot 
generation
+                cancelNextGenerateSnapshot();
+            }
+        } else {
+            /* Skip snapshot generation if there is a snaphshot in progress.

Review Comment:
   nit: I think this would be a little clearer if the `if` check is inverted:
   ```java
   if (snapshotGeneratorManager.snapshotInProgress()) {
     /* Skip snapshot generation if there is a snaphshot in progress.
   ...
   } else {
   ```



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -568,45 +576,62 @@ void cancel() {
         }
 
         void reschedule(long delayNs) {
-            ControlEvent event = new ControlEvent(GENERATE_SNAPSHOT, this);
+            ControllerEvent event = new ControllerEvent(GENERATE_SNAPSHOT, 
this);
             queue.scheduleDeferred(event.name,
                 new EarliestDeadlineFunction(time.nanoseconds() + delayNs), 
event);
         }
 
+        void handleSnapshotFinished(Optional<Exception> exception) {
+            if (exception.isPresent()) {
+                log.error("Error while generating snapshot {}", 
generator.lastContainedLogOffset(), exception.get());
+            } else {
+                log.info("Finished generating snapshot {}.", 
generator.lastContainedLogOffset());
+            }
+
+            generator.writer().close();
+            generator = null;
+
+            // Delete every in-memory snapshot up to the committed offset. 
They are not needed since this
+            // snapshot generation finished.
+            snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
+
+            // The snapshot counters for size-based and time-based snapshots 
could have changed to cause a new
+            // snapshot to get generated.
+            maybeGenerateSnapshot();

Review Comment:
   In case there was a failure, does it make sense to back off before retrying?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -986,6 +1012,13 @@ public void 
handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                             batch.appendTimestamp(),
                             committedBytesSinceLastSnapshot + 
batch.sizeInBytes()
                         );
+
+                        if (offset >= 
raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) {
+                            oldestCommittedLogOnlyAppendTimestamp = Math.min(

Review Comment:
   I do think `oldestNonSnapshottedLogAppendTimestamp` is a little clearer. 
Perhaps the fact that it is an append timestamp is already clear from context 
and we could use `oldestNonSnapshottedTimestamp`? I also don't feel strongly 
though.



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -725,7 +727,15 @@ object KafkaConfig {
     "This is required configuration when running in KRaft mode."
   val MetadataLogDirDoc = "This configuration determines where we put the 
metadata log for clusters in KRaft mode. " +
     "If it is not set, the metadata log is placed in the first log directory 
from log.dirs."
-  val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of 
bytes in the log between the latest snapshot and the high-watermark needed 
before generating a new snapshot."
+  val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of 
bytes in the log between the latest " +
+    "snapshot and the high-watermark needed before generating a new snapshot. 
The default value is " +
+    s"${Defaults.MetadataSnapshotMaxNewRecordBytes}. To geneate snapshots 
based on the time elapsed, see " +
+    s"the <code>$MetadataSnapshotMaxIntervalMsProp</code> configuration."
+  val MetadataSnapshotMaxIntervalMsDoc = "This is the maximum number of 
milliseconds to wait to generate a snapshot " +
+    "if there are committed records in the log that are not included in the 
latest snapshot. A value of zero disables " +
+    s"time based snapshot generation. The default value is 
${Defaults.MetadataSnapshotMaxIntervalMs}. To geneate " +
+    s"snapshots based on the number of metadata bytes, see the 
<code>$MetadataSnapshotMaxNewRecordBytesProp</code> " +

Review Comment:
   Instead of just referring to the other configuration, I was thinking we 
could mention that snapshots will be taken when either the interval is reached 
or the max bytes limit is reached.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to