hachikuji commented on code in PR #12761: URL: https://github.com/apache/kafka/pull/12761#discussion_r1028360492
########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -725,7 +727,15 @@ object KafkaConfig { "This is required configuration when running in KRaft mode." val MetadataLogDirDoc = "This configuration determines where we put the metadata log for clusters in KRaft mode. " + "If it is not set, the metadata log is placed in the first log directory from log.dirs." - val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot." + val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of bytes in the log between the latest " + + "snapshot and the high-watermark needed before generating a new snapshot. The default value is " + + s"${Defaults.MetadataSnapshotMaxNewRecordBytes}. To geneate snapshots based on the time elapsed, see " + + s"the <code>$MetadataSnapshotMaxIntervalMsProp</code> configuration." + val MetadataSnapshotMaxIntervalMsDoc = "This is the maximum number of milliseconds to wait to generate a snapshot " + + "if there are committed records in the log that are not included in the latest snapshot. A value of zero disables " + + s"time based snapshot generation. The default value is ${Defaults.MetadataSnapshotMaxIntervalMs}. To geneate " + Review Comment: typo: "geneate" (used a couple times above) ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1455,30 +1524,56 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon } private void maybeGenerateSnapshot() { - if (committedBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes && - snapshotGeneratorManager.generator == null - ) { - if (!isActiveController()) { - // The active controller creates in-memory snapshot every time an uncommitted - // batch gets appended. The in-active controller can be more efficient and only - // create an in-memory snapshot when needed. - snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); + if (!snapshotGeneratorManager.snapshotInProgress()) { + Set<SnapshotReason> snapshotReasons = new HashSet<>(); + // Check if a snapshot should be generated because of committed bytes + if (committedBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes) { + snapshotReasons.add( + SnapshotReason.maxBytesExceeded(committedBytesSinceLastSnapshot, snapshotMaxNewRecordBytes) + ); } - log.info( - "Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.", - lastCommittedEpoch, - lastCommittedOffset, - committedBytesSinceLastSnapshot, - SnapshotReason.MaxBytesExceeded - ); + // Check if a snapshot should be generated because of committed append times + if (snapshotMaxIntervalMs > 0) { + // Time base snasphots are enabled + long snapshotIntervalMs = time.milliseconds() - oldestCommittedLogOnlyAppendTimestamp; + if (snapshotIntervalMs >= snapshotMaxIntervalMs) { + snapshotReasons.add(SnapshotReason.maxIntervalExceeded(snapshotIntervalMs, snapshotMaxIntervalMs)); + } else { + maybeScheduleNextGenerateSnapshot(); + } + } - snapshotGeneratorManager.createSnapshotGenerator( - lastCommittedOffset, - lastCommittedEpoch, - lastCommittedTimestamp - ); - committedBytesSinceLastSnapshot = 0; + if (!snapshotReasons.isEmpty()) { + if (!isActiveController()) { + // The inactive controllers only create an in-memory snapshot when generating a snapshot. This is + // unlike the active controller which creates in-memory snapshots every time an uncommitted batch + // gets replayed. + snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); + } + + log.info( + "Generating a snapshot that includes (epoch={}, offset={}) because: {}", + lastCommittedEpoch, + lastCommittedOffset, + SnapshotReason.stringFromReasons(snapshotReasons) + ); + + snapshotGeneratorManager.createSnapshotGenerator(lastCommittedOffset, lastCommittedEpoch, lastCommittedTimestamp); + + // Reset all of the snapshot counters + committedBytesSinceLastSnapshot = 0; + oldestCommittedLogOnlyAppendTimestamp = Long.MAX_VALUE; + + // Starting a snapshot invalidates any scheduled snapshot generation + cancelNextGenerateSnapshot(); + } + } else { + /* Skip snapshot generation if there is a snaphshot in progress. Review Comment: nit: I think this would be a little clearer if the `if` check is inverted: ```java if (snapshotGeneratorManager.snapshotInProgress()) { /* Skip snapshot generation if there is a snaphshot in progress. ... } else { ``` ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -568,45 +576,62 @@ void cancel() { } void reschedule(long delayNs) { - ControlEvent event = new ControlEvent(GENERATE_SNAPSHOT, this); + ControllerEvent event = new ControllerEvent(GENERATE_SNAPSHOT, this); queue.scheduleDeferred(event.name, new EarliestDeadlineFunction(time.nanoseconds() + delayNs), event); } + void handleSnapshotFinished(Optional<Exception> exception) { + if (exception.isPresent()) { + log.error("Error while generating snapshot {}", generator.lastContainedLogOffset(), exception.get()); + } else { + log.info("Finished generating snapshot {}.", generator.lastContainedLogOffset()); + } + + generator.writer().close(); + generator = null; + + // Delete every in-memory snapshot up to the committed offset. They are not needed since this + // snapshot generation finished. + snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset); + + // The snapshot counters for size-based and time-based snapshots could have changed to cause a new + // snapshot to get generated. + maybeGenerateSnapshot(); Review Comment: In case there was a failure, does it make sense to back off before retrying? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -986,6 +1012,13 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) { batch.appendTimestamp(), committedBytesSinceLastSnapshot + batch.sizeInBytes() ); + + if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) { + oldestCommittedLogOnlyAppendTimestamp = Math.min( Review Comment: I do think `oldestNonSnapshottedLogAppendTimestamp` is a little clearer. Perhaps the fact that it is an append timestamp is already clear from context and we could use `oldestNonSnapshottedTimestamp`? I also don't feel strongly though. ########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -725,7 +727,15 @@ object KafkaConfig { "This is required configuration when running in KRaft mode." val MetadataLogDirDoc = "This configuration determines where we put the metadata log for clusters in KRaft mode. " + "If it is not set, the metadata log is placed in the first log directory from log.dirs." - val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot." + val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of bytes in the log between the latest " + + "snapshot and the high-watermark needed before generating a new snapshot. The default value is " + + s"${Defaults.MetadataSnapshotMaxNewRecordBytes}. To geneate snapshots based on the time elapsed, see " + + s"the <code>$MetadataSnapshotMaxIntervalMsProp</code> configuration." + val MetadataSnapshotMaxIntervalMsDoc = "This is the maximum number of milliseconds to wait to generate a snapshot " + + "if there are committed records in the log that are not included in the latest snapshot. A value of zero disables " + + s"time based snapshot generation. The default value is ${Defaults.MetadataSnapshotMaxIntervalMs}. To geneate " + + s"snapshots based on the number of metadata bytes, see the <code>$MetadataSnapshotMaxNewRecordBytesProp</code> " + Review Comment: Instead of just referring to the other configuration, I was thinking we could mention that snapshots will be taken when either the interval is reached or the max bytes limit is reached. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org