This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 4463da8420 Port akka-core#31784: Add only-one-snapshot optimization
for retention strategies (#2767)
4463da8420 is described below
commit 4463da8420fae15e67c37c81820d50f35b6181e4
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Mar 22 14:12:58 2026 +0100
Port akka-core#31784: Add only-one-snapshot optimization for retention
strategies (#2767)
* Initial plan
* Port akka-core#31784: Add only-one-snapshot optimization for retention
strategies
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/06a3f06e-dbec-4f2d-a33f-35bf8cecdc24
* scalafmt
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../testkit/PersistenceTestKitPlugin.scala | 3 +-
...urcedBehaviorRetentionOnlyOneSnapshotSpec.scala | 184 +++++++++++++++++++++
.../EventSourcedBehaviorRetentionSpec.scala | 5 +-
.../persistence/typed/internal/BehaviorSetup.scala | 21 ++-
.../typed/internal/RetentionCriteriaImpl.scala | 8 +
.../pekko/persistence/typed/internal/Running.scala | 23 ++-
.../typed/javadsl/RetentionCriteria.scala | 12 ++
.../typed/scaladsl/RetentionCriteria.scala | 12 ++
persistence/src/main/resources/reference.conf | 5 +
9 files changed, 260 insertions(+), 13 deletions(-)
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala
index e308cb34b2..549e69cfa7 100644
---
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala
@@ -181,7 +181,8 @@ object PersistenceTestKitSnapshotPlugin {
Map(
"pekko.persistence.snapshot-store.plugin" -> PluginId,
s"$PluginId.class" -> classOf[PersistenceTestKitSnapshotPlugin].getName,
- s"$PluginId.snapshot-is-optional" -> false // fallback isn't used by the
testkit
+ s"$PluginId.snapshot-is-optional" -> false, // fallback isn't used by
the testkit
+ s"$PluginId.only-one-snapshot" -> false // fallback isn't used by the
testkit
).asJava)
}
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionOnlyOneSnapshotSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionOnlyOneSnapshotSpec.scala
new file mode 100644
index 0000000000..7d3bd63743
--- /dev/null
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionOnlyOneSnapshotSpec.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.typed.scaladsl
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.util.Success
+import scala.util.Try
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import org.apache.pekko
+import pekko.actor.testkit.typed.scaladsl._
+import pekko.actor.typed.scaladsl.Behaviors
+import pekko.persistence.testkit.PersistenceTestKitPlugin
+import pekko.persistence.testkit.PersistenceTestKitSnapshotPlugin
+import pekko.persistence.typed.DeleteEventsCompleted
+import pekko.persistence.typed.EventSourcedSignal
+import pekko.persistence.typed.PersistenceId
+
+object EventSourcedBehaviorRetentionOnlyOneSnapshotSpec {
+ private val config = ConfigFactory.parseString(s"""
+ ${PersistenceTestKitSnapshotPlugin.PluginId} {
+ only-one-snapshot = true
+ }
+
""").withFallback(PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config))
+}
+
+class EventSourcedBehaviorRetentionOnlyOneSnapshotSpec
+ extends
ScalaTestWithActorTestKit(EventSourcedBehaviorRetentionOnlyOneSnapshotSpec.config)
+ with AnyWordSpecLike
+ with LogCapturing {
+
+ import EventSourcedBehaviorRetentionSpec._
+
+ val pidCounter = new AtomicInteger(0)
+ private def nextPid(): PersistenceId =
PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()}")
+
+ "EventSourcedBehavior with retention and only-one-snapshot" must {
+
+ "snapshot every N sequence nrs" in {
+ val pid = nextPid()
+ val c = spawn(Behaviors.setup[Command](ctx =>
+ counter(ctx,
pid).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 2))))
+
+ val replyProbe = TestProbe[State]()
+
+ c ! Increment
+ c ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(1, Vector(0)))
+ c ! StopIt
+ val watchProbe = TestProbe()
+ watchProbe.expectTerminated(c)
+
+ // no snapshot should have happened
+ val probeC2 = TestProbe[(State, Event)]()
+ val snapshotProbe = createTestProbe[WrappedSignal]()
+
+ val c2 = spawn(
+ Behaviors.setup[Command](ctx =>
+ counter(ctx, pid, probe = Some(probeC2.ref), snapshotSignalProbe =
Some(snapshotProbe.ref))
+ .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents =
2))))
+ probeC2.expectMessage[(State, Event)]((State(0, Vector()),
Incremented(1)))
+
+ c2 ! Increment
+ snapshotProbe.expectSnapshotCompleted(2)
+ c2 ! StopIt
+ watchProbe.expectTerminated(c2)
+
+ val probeC3 = TestProbe[(State, Event)]()
+ val c3 = spawn(Behaviors.setup[Command](ctx =>
+ counter(ctx, pid,
Some(probeC3.ref)).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents
= 2))))
+ // this time it should have been snapshotted so no events to replay
+ probeC3.expectNoMessage()
+ c3 ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(2, Vector(0, 1)))
+ }
+
+ "not delete snapshots" in {
+ val pid = nextPid()
+ val snapshotSignalProbe = TestProbe[WrappedSignal]()
+ val deleteSnapshotSignalProbe = TestProbe[WrappedSignal]()
+ val replyProbe = TestProbe[State]()
+
+ val persistentActor = spawn(
+ Behaviors.setup[Command](ctx =>
+ counter(
+ ctx,
+ pid,
+ snapshotSignalProbe = Some(snapshotSignalProbe.ref),
+ deleteSnapshotSignalProbe = Some(deleteSnapshotSignalProbe.ref))
+ .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3,
keepNSnapshots = 2))))
+
+ (1 to 10).foreach(_ => persistentActor ! Increment)
+ persistentActor ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(10, (0 until 10).toVector))
+ snapshotSignalProbe.expectSnapshotCompleted(3)
+ snapshotSignalProbe.expectSnapshotCompleted(6)
+ snapshotSignalProbe.expectSnapshotCompleted(9)
+ // this is the difference compared to EventSourcedBehaviorRetentionSpec
+ deleteSnapshotSignalProbe.expectNoMessage()
+
+ (1 to 10).foreach(_ => persistentActor ! Increment)
+ persistentActor ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(20, (0 until 20).toVector))
+ snapshotSignalProbe.expectSnapshotCompleted(12)
+ snapshotSignalProbe.expectSnapshotCompleted(15)
+ snapshotSignalProbe.expectSnapshotCompleted(18)
+ // this is the difference compared to EventSourcedBehaviorRetentionSpec
+ deleteSnapshotSignalProbe.expectNoMessage()
+
+ snapshotSignalProbe.expectNoMessage()
+ }
+
+ "optionally delete old events" in {
+ val pid = nextPid()
+ val snapshotSignalProbe = TestProbe[WrappedSignal]()
+ val deleteSnapshotSignalProbe = TestProbe[WrappedSignal]()
+ val eventProbe = TestProbe[Try[EventSourcedSignal]]()
+ val replyProbe = TestProbe[State]()
+
+ val persistentActor = spawn(
+ Behaviors.setup[Command](ctx =>
+ counter(
+ ctx,
+ pid,
+ snapshotSignalProbe = Some(snapshotSignalProbe.ref),
+ deleteSnapshotSignalProbe = Some(deleteSnapshotSignalProbe.ref),
+ eventSignalProbe = Some(eventProbe.ref)).withRetention(
+ // tests the Java API as well
+ RetentionCriteria.snapshotEvery(numberOfEvents =
3).withDeleteEventsOnSnapshot)))
+
+ (1 to 10).foreach(_ => persistentActor ! Increment)
+ persistentActor ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(10, (0 until 10).toVector))
+ snapshotSignalProbe.expectSnapshotCompleted(3)
+ snapshotSignalProbe.expectSnapshotCompleted(6)
+ snapshotSignalProbe.expectSnapshotCompleted(9)
+
+
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 3
+
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 6
+
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 9
+ deleteSnapshotSignalProbe.expectNoMessage()
+
+ // one at a time since snapshotting+event-deletion switches to running
state before deleting events so ordering
+ // if sending many commands in one go is not deterministic
+ persistentActor ! Increment // 11
+ persistentActor ! Increment // 12
+ snapshotSignalProbe.expectSnapshotCompleted(12)
+
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 12
+ deleteSnapshotSignalProbe.expectNoMessage()
+
+ persistentActor ! Increment // 13
+ persistentActor ! Increment // 14
+ persistentActor ! Increment // 15
+ snapshotSignalProbe.expectSnapshotCompleted(15)
+
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 15
+ deleteSnapshotSignalProbe.expectNoMessage()
+
+ persistentActor ! Increment // 16
+ persistentActor ! Increment // 17
+ persistentActor ! Increment // 18
+ snapshotSignalProbe.expectSnapshotCompleted(18)
+
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr
shouldEqual 18
+ deleteSnapshotSignalProbe.expectNoMessage()
+
+ eventProbe.expectNoMessage()
+ snapshotSignalProbe.expectNoMessage()
+ }
+
+ }
+}
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
index 9aae31ac39..20c20f2279 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala
@@ -63,6 +63,7 @@ object EventSourcedBehaviorRetentionSpec extends Matchers {
persistenceId: PersistenceId,
probe: Option[ActorRef[(State, Event)]] = None,
snapshotSignalProbe: Option[ActorRef[WrappedSignal]] = None,
+ deleteSnapshotSignalProbe: Option[ActorRef[WrappedSignal]] = None,
eventSignalProbe: Option[ActorRef[Try[EventSourcedSignal]]] = None)
: EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior[Command, Event, State](
@@ -96,9 +97,9 @@ object EventSourcedBehaviorRetentionSpec extends Matchers {
case (_, sf: SnapshotFailed) =>
snapshotSignalProbe.foreach(_ ! WrappedSignal(sf))
case (_, dc: DeleteSnapshotsCompleted) =>
- snapshotSignalProbe.foreach(_ ! WrappedSignal(dc))
+ deleteSnapshotSignalProbe.orElse(snapshotSignalProbe).foreach(_ !
WrappedSignal(dc))
case (_, dsf: DeleteSnapshotsFailed) =>
- snapshotSignalProbe.foreach(_ ! WrappedSignal(dsf))
+ deleteSnapshotSignalProbe.orElse(snapshotSignalProbe).foreach(_ !
WrappedSignal(dsf))
case (_, e: EventSourcedSignal) =>
eventSignalProbe.foreach(_ ! Success(e))
}
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
index 356db67709..6e80c2b75e 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
@@ -85,8 +85,10 @@ private[pekko] final class BehaviorSetup[C, E, S](
val snapshotStore: ClassicActorRef = persistence
.snapshotStoreFor(settings.snapshotPluginId,
settings.snapshotPluginConfig.getOrElse(ConfigFactory.empty))
- val isSnapshotOptional: Boolean =
-
Persistence(context.system.classicSystem).configFor(snapshotStore).getBoolean("snapshot-is-optional")
+ val (isSnapshotOptional: Boolean, isOnlyOneSnapshot: Boolean) = {
+ val snapshotStoreConfig =
Persistence(context.system.classicSystem).configFor(snapshotStore)
+ (snapshotStoreConfig.getBoolean("snapshot-is-optional"),
snapshotStoreConfig.getBoolean("only-one-snapshot"))
+ }
if (isSnapshotOptional &&
(retention match {
@@ -104,6 +106,21 @@ private[pekko] final class BehaviorSetup[C, E, S](
private var mdcPhase = PersistenceMdc.Initializing
+ if (isOnlyOneSnapshot) {
+ retention match {
+ case SnapshotCountRetentionCriteriaImpl(_, keepNSnapshots, _) if
keepNSnapshots > 1 =>
+ // not using internalLogger because it's probably not good to use mdc
from the constructor
+ internalLoggerFactory().warn(
+ "Retention has been defined with keepNSnapshots [{}] for
persistenceId [{}]," +
+ "but the snapshot store [{}] will only keep one snapshot. You can
silence this warning and benefit from " +
+ "a performance optimization by defining the retention criteria
without the keepNSnapshots parameter.",
+ keepNSnapshots,
+ persistenceId,
+ settings.snapshotPluginId)
+ case _ =>
+ }
+ }
+
def internalLogger: Logger = {
PersistenceMdc.setMdc(persistenceId, mdcPhase)
internalLoggerFactory()
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
index 6caa28b80d..0bc9a57b10 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala
@@ -20,6 +20,8 @@ import pekko.persistence.typed.scaladsl
/**
* INTERNAL API
+ *
+ * Note that `keepNSnapshots` should not be used when
`BehaviorSetup.isOnlyOneSnapshot` is true.
*/
@InternalApi private[pekko] final case class
SnapshotCountRetentionCriteriaImpl(
snapshotEveryNEvents: Int,
@@ -34,11 +36,17 @@ import pekko.persistence.typed.scaladsl
def snapshotWhen(currentSequenceNr: Long): Boolean =
currentSequenceNr % snapshotEveryNEvents == 0
+ /**
+ * Should only be used when `BehaviorSetup.isOnlyOneSnapshot` is false.
+ */
def deleteUpperSequenceNr(lastSequenceNr: Long): Long = {
// Delete old events, retain the latest
math.max(0, lastSequenceNr - (keepNSnapshots.toLong *
snapshotEveryNEvents))
}
+ /**
+ * Should only be used when `BehaviorSetup.isOnlyOneSnapshot` is false.
+ */
def deleteLowerSequenceNr(upperSequenceNr: Long): Long = {
// We could use 0 as fromSequenceNr to delete all older snapshots, but
that might be inefficient for
// large ranges depending on how it's implemented in the snapshot plugin.
Therefore we use the
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index e5f38625e7..533b1b56c0 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -896,13 +896,18 @@ private[pekko] object Running {
case DisabledRetentionCriteria => // no
further actions
case s @ SnapshotCountRetentionCriteriaImpl(_, _, true) =>
// deleteEventsOnSnapshot == true, deletion of old events
- val deleteEventsToSeqNr =
s.deleteUpperSequenceNr(meta.sequenceNr)
+ val deleteEventsToSeqNr = {
+ if (setup.isOnlyOneSnapshot) meta.sequenceNr // delete all
events up to the snapshot
+ else s.deleteUpperSequenceNr(meta.sequenceNr) //
keepNSnapshots batches of events
+ }
// snapshot deletion then happens on event deletion success in
Running.onDeleteEventsJournalResponse
internalDeleteEvents(meta.sequenceNr, deleteEventsToSeqNr)
case s @ SnapshotCountRetentionCriteriaImpl(_, _, false) =>
// deleteEventsOnSnapshot == false, deletion of old snapshots
- val deleteSnapshotsToSeqNr =
s.deleteUpperSequenceNr(meta.sequenceNr)
-
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr),
deleteSnapshotsToSeqNr)
+ if (!setup.isOnlyOneSnapshot) {
+ val deleteSnapshotsToSeqNr =
s.deleteUpperSequenceNr(meta.sequenceNr)
+
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr),
deleteSnapshotsToSeqNr)
+ }
case unexpected => throw new IllegalStateException(s"Unexpected
retention criteria: $unexpected")
}
}
@@ -1012,11 +1017,13 @@ private[pekko] object Running {
setup.retention match {
case DisabledRetentionCriteria => // no further actions
case s: SnapshotCountRetentionCriteriaImpl =>
- // The reason for -1 is that a snapshot at the exact toSequenceNr
is still useful and the events
- // after that can be replayed after that snapshot, but replaying
the events after toSequenceNr without
- // starting at the snapshot at toSequenceNr would be invalid.
- val deleteSnapshotsToSeqNr = toSequenceNr - 1
-
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr),
deleteSnapshotsToSeqNr)
+ if (!setup.isOnlyOneSnapshot) {
+ // The reason for -1 is that a snapshot at the exact
toSequenceNr is still useful and the events
+ // after that can be replayed after that snapshot, but replaying
the events after toSequenceNr without
+ // starting at the snapshot at toSequenceNr would be invalid.
+ val deleteSnapshotsToSeqNr = toSequenceNr - 1
+
internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr),
deleteSnapshotsToSeqNr)
+ }
case unexpected => throw new IllegalStateException(s"Unexpected
retention criteria: $unexpected")
}
Some(DeleteEventsCompleted(toSequenceNr))
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/RetentionCriteria.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/RetentionCriteria.scala
index c958b544ab..e18f456e98 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/RetentionCriteria.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/RetentionCriteria.scala
@@ -49,6 +49,18 @@ object RetentionCriteria {
def snapshotEvery(numberOfEvents: Int, keepNSnapshots: Int):
SnapshotCountRetentionCriteria =
SnapshotCountRetentionCriteriaImpl(numberOfEvents, keepNSnapshots,
deleteEventsOnSnapshot = false)
+ /**
+ * Save snapshots automatically every `numberOfEvents`.
+ *
+ * Use [[SnapshotCountRetentionCriteria.withDeleteEventsOnSnapshot]] to
+ * delete old events. Events are not deleted by default.
+ *
+ * If multiple events are persisted with a single Effect, the snapshot will
happen after
+ * all of the events are persisted rather than precisely every
`numberOfEvents`.
+ */
+ def snapshotEvery(numberOfEvents: Int): SnapshotCountRetentionCriteria =
+ snapshotEvery(numberOfEvents, keepNSnapshots = 1)
+
}
@DoNotInherit abstract class SnapshotCountRetentionCriteria extends
RetentionCriteria {
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/RetentionCriteria.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/RetentionCriteria.scala
index 8ce002cb50..20c61efa18 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/RetentionCriteria.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/RetentionCriteria.scala
@@ -49,6 +49,18 @@ object RetentionCriteria {
def snapshotEvery(numberOfEvents: Int, keepNSnapshots: Int):
SnapshotCountRetentionCriteria =
SnapshotCountRetentionCriteriaImpl(numberOfEvents, keepNSnapshots,
deleteEventsOnSnapshot = false)
+ /**
+ * Save snapshots automatically every `numberOfEvents`.
+ *
+ * Use [[SnapshotCountRetentionCriteria.withDeleteEventsOnSnapshot]] to
+ * delete old events. Events are not deleted by default.
+ *
+ * If multiple events are persisted with a single Effect the snapshot will
happen after
+ * all of the events are persisted rather than precisely every
`numberOfEvents`.
+ */
+ def snapshotEvery(numberOfEvents: Int): SnapshotCountRetentionCriteria =
+ snapshotEvery(numberOfEvents, keepNSnapshots = 1)
+
}
@DoNotInherit trait SnapshotCountRetentionCriteria extends RetentionCriteria {
diff --git a/persistence/src/main/resources/reference.conf
b/persistence/src/main/resources/reference.conf
index aa4f039be6..6634dd6233 100644
--- a/persistence/src/main/resources/reference.conf
+++ b/persistence/src/main/resources/reference.conf
@@ -196,6 +196,11 @@ pekko.persistence {
# result in wrong recovered state if snapshot load fails.
snapshot-is-optional = false
+ # Some snapshot store plugins only store the latest snapshot and can set
this
+ # to true. That enables optimizations in retention strategies based on
that
+ # old snapshots don't have to be deleted.
+ only-one-snapshot = false
+
}
fsm {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]