jsancio commented on code in PR #12265: URL: https://github.com/apache/kafka/pull/12265#discussion_r915992223
########## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ########## @@ -117,26 +117,32 @@ class BrokerMetadataListener( } finally { reader.close() } - _publisher.foreach(publish) - // If we detected a change in metadata.version, generate a local snapshot - val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta => - featuresDelta.metadataVersionChange().isPresent + _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes + if (shouldSnapshot()) { + maybeStartSnapshot() } - snapshotter.foreach { snapshotter => - _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes - if (shouldSnapshot() || metadataVersionChanged) { - if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) { - _bytesSinceLastSnapshot = 0L - } - } - } + _publisher.foreach(publish) } } private def shouldSnapshot(): Boolean = { - _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots + (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || metadataVersionChanged() + } + + private def metadataVersionChanged(): Boolean = { + _publisher.nonEmpty && Option(_delta.featuresDelta()).exists { featuresDelta => Review Comment: Do you mind writing a comment as to why we check that the publisher is set. If I understand this correctly it is not a correctness issue but a performance issue, right? If I remember correctly, @mumrah mentioned that he wanted to generate a snapshot whenever the metadata version changes. Unfortunately, I couldn't find a mention of this in KIP-778. With this change this is no longer true. What do you think @mumrah ? ########## core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala: ########## @@ -240,6 +239,42 @@ class BrokerMetadataListenerTest { } } + @Test + def testNotSnapshotAfterMetadataVersionChangeBeforePublishing(): Unit = { + val snapshotter = new MockMetadataSnapshotter() + val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) + + updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L) + listener.getImageRecords().get() + assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate snapshot on metadata version change before starting publishing") + } + + @Test + def testSnapshotAfterMetadataVersionChangeWhenStarting(): Unit = { + val snapshotter = new MockMetadataSnapshotter() + val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) + + val endOffset = 100L + updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), endOffset) + listener.startPublishing(new MockMetadataPublisher()).get() + assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should try to generate snapshot when starting publishing") + } + + @Test + def testSnapshotAfterMetadataVersionChange(): Unit = { + val snapshotter = new MockMetadataSnapshotter() + val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) + listener.startPublishing(new MockMetadataPublisher()).get() + + val endOffset = 100L + updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, (MetadataVersion.latest().featureLevel() - 1).toShort, endOffset) + listener.getImageRecords().get() Review Comment: Let's write a comment saying that this `get` is waiting for the metadata version update above to get processed. ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ########## @@ -51,20 +51,26 @@ class BrokerMetadataSnapshotter( val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse("")) override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized { - if (_currentSnapshotOffset == -1L) { + if (_currentSnapshotOffset != -1) { + info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " + + s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}") Review Comment: Nit, the curly brackets ({}) are not needed for `_currentSnapshotOffset`. This comment applies to a few places. ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ########## @@ -109,4 +115,9 @@ class BrokerMetadataSnapshotter( beginShutdown() eventQueue.close() } + + // VisibleForTesting + def currentSnapshotOffset(): Long = { + _currentSnapshotOffset + } Review Comment: It doesn't look like you use this in any of the tests. Let's remove this method if true. ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ########## @@ -45,37 +45,32 @@ class BrokerMetadataSnapshotter( */ private var _currentSnapshotOffset = -1L - /** - * The offset of the newest snapshot, or -1 if there hasn't been one. Accessed only under - * the object lock. - */ - private var _latestSnapshotOffset = -1L - /** * The event queue which runs this listener. */ val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse("")) override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized { Review Comment: So far we have two reasons for creating a snapshot. 1. X bytes were applied. 2. the metadata version changed. What do you think if we pass the reason for creating a snapshot to this method and logging it? We would need to make a similar change to the KRaft Controller logic for creating a snapshot. If you agree, I am okay creating a Jira for this and implementing this in a future PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org