This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 35a1e23b62 Enhance DurableStateStore TCK with OCC, serialization, and
soft-delete tests (#2917)
35a1e23b62 is described below
commit 35a1e23b62d96208891a459f0e5e4d95a8b288a4
Author: David Rose <[email protected]>
AuthorDate: Mon Apr 27 05:04:22 2026 -0700
Enhance DurableStateStore TCK with OCC, serialization, and soft-delete
tests (#2917)
* Add DurableStateStore TCK to persistence-tck module
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/77803e79-3631-4111-98a8-4a1cf6f7beca
Co-authored-by: pjfanning <[email protected]>
* license headers
* use non-empty tags in tests due to issues with Oracle JDBC tests
* Enhance DurableStateStore TCK with OCC, serialization, and soft-delete
tests
Builds on #2833 to address the gaps discussed in #2831:
- Optimistic concurrency check on upsert (supportsUpsertWithRevisionCheck)
- Roundtrip serialization via TestPayload (supportsSerialization, defaults
on
to match JournalSpec convention)
- Deprecated single-arg deleteObject(pid) overload (supportsSoftDelete)
- New required test: upsert again after deletion
- preparePersistenceId cleanup hook for stateful plugins
- Brief plugin-author docs subsection in persistence-journals.md mirroring
the Journal/SnapshotStore TCK pattern
The reference TCK against PersistenceTestKitDurableStateStore passes 6/6
required tests; all 4 optional capability flags correctly skip when off.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* license dates
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
docs/src/main/paradox/persistence-journals.md | 8 +
.../LambdaPersistencePluginDocTest.java | 25 +++
.../persistence/PersistencePluginDocSpec.scala | 21 ++
.../apache/pekko/persistence/CapabilityFlags.scala | 30 +++
.../japi/state/JavaDurableStateStoreSpec.scala | 80 ++++++++
.../persistence/state/DurableStateStoreSpec.scala | 216 +++++++++++++++++++++
...ersistenceTestKitDurableStateStoreTCKSpec.scala | 37 ++++
7 files changed, 417 insertions(+)
diff --git a/docs/src/main/paradox/persistence-journals.md
b/docs/src/main/paradox/persistence-journals.md
index ff11ff70b9..0abda75c27 100644
--- a/docs/src/main/paradox/persistence-journals.md
+++ b/docs/src/main/paradox/persistence-journals.md
@@ -134,6 +134,14 @@ Scala
Java
: @@snip
[LambdaPersistencePluginDocTest.java](/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java)
{ #snapshot-store-tck-java }
+To include the `DurableStateStore` TCK tests in your test suite simply extend
the provided @scala[`DurableStateStoreSpec`]@java[`JavaDurableStateStoreSpec`]:
+
+Scala
+: @@snip
[PersistencePluginDocSpec.scala](/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala)
{ #durable-state-tck-scala }
+
+Java
+: @@snip
[LambdaPersistencePluginDocTest.java](/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java)
{ #durable-state-tck-java }
+
In case your plugin requires some setting up (starting a mock database,
removing temporary files etc.) you can override the
`beforeAll` and `afterAll` methods to hook into the tests lifecycle:
diff --git
a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java
b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java
index 3481650f82..0035109ba1 100644
--- a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java
+++ b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java
@@ -27,6 +27,7 @@ import org.apache.pekko.actor.*;
import org.apache.pekko.persistence.*;
import org.apache.pekko.persistence.japi.journal.JavaJournalSpec;
import org.apache.pekko.persistence.japi.snapshot.JavaSnapshotStoreSpec;
+import org.apache.pekko.persistence.japi.state.JavaDurableStateStoreSpec;
import org.apache.pekko.persistence.journal.japi.*;
import org.apache.pekko.persistence.journal.leveldb.SharedLeveldbJournal;
import org.apache.pekko.persistence.journal.leveldb.SharedLeveldbStore;
@@ -172,6 +173,30 @@ public class LambdaPersistencePluginDocTest {
// #snapshot-store-tck-java
};
+ static Object o3b =
+ new Object() {
+ // #durable-state-tck-java
+ class MyDurableStateStoreTest extends JavaDurableStateStoreSpec {
+
+ public MyDurableStateStoreTest() {
+ super(
+ ConfigFactory.parseString(
+ "pekko.persistence.state.plugin =
\"my.durable-state.plugin\""));
+ }
+
+ @Override
+ public CapabilityFlag supportsDeleteWithRevisionCheck() {
+ return CapabilityFlag.on();
+ }
+
+ @Override
+ public CapabilityFlag supportsUpsertWithRevisionCheck() {
+ return CapabilityFlag.on();
+ }
+ }
+ // #durable-state-tck-java
+ };
+
static Object o4 =
new Object() {
// https://github.com/pekko/pekko/issues/26826
diff --git
a/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala
b/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala
index 62ecdaffcd..79d2d8c3dc 100644
--- a/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala
+++ b/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala
@@ -222,6 +222,27 @@ object PersistenceTCKDoc {
}
// #snapshot-store-tck-scala
}
+ object example2b {
+ import org.apache.pekko.persistence.state.DurableStateStoreSpec
+
+ // #durable-state-tck-scala
+ class MyDurableStateStoreSpec
+ extends DurableStateStoreSpec(
+ config = ConfigFactory.parseString("""
+ pekko.persistence.state.plugin = "my.durable-state.plugin"
+ """)) {
+
+ override def supportsDeleteWithRevisionCheck: CapabilityFlag =
+ true // or CapabilityFlag.on
+
+ override def supportsUpsertWithRevisionCheck: CapabilityFlag =
+ true // or CapabilityFlag.on
+
+ override def supportsSerialization: CapabilityFlag =
+ true // or CapabilityFlag.on
+ }
+ // #durable-state-tck-scala
+ }
object example3 {
import java.io.File
diff --git
a/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala
index acac5ef412..0821608de3 100644
---
a/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala
+++
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala
@@ -86,3 +86,33 @@ trait SnapshotStoreCapabilityFlags extends CapabilityFlags {
protected def supportsMetadata: CapabilityFlag
}
//#snapshot-store-flags
+
+//#durable-state-store-flags
+trait DurableStateStoreCapabilityFlags extends CapabilityFlags {
+
+ /**
+ * When `true` enables tests which check if the durable state store properly
rejects
+ * a `deleteObject` call when the revision does not match the stored
revision.
+ */
+ protected def supportsDeleteWithRevisionCheck: CapabilityFlag
+
+ /**
+ * When `true` enables tests which check if the durable state store properly
rejects
+ * an `upsertObject` call when the revision is stale (does not match the
expected next revision).
+ * This is the optimistic concurrency check on writes.
+ */
+ protected def supportsUpsertWithRevisionCheck: CapabilityFlag
+
+ /**
+ * When `true` enables tests which check if the durable state store properly
serializes
+ * and deserializes values via the configured Pekko serializer
infrastructure.
+ */
+ protected def supportsSerialization: CapabilityFlag
+
+ /**
+ * When `true` enables tests which check the deprecated single-argument
`deleteObject(persistenceId)`
+ * overload, which deletes regardless of the current revision.
+ */
+ protected def supportsSoftDelete: CapabilityFlag
+}
+//#durable-state-store-flags
diff --git
a/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala
new file mode 100644
index 0000000000..235dbd8ae6
--- /dev/null
+++
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.japi.state
+
+import scala.collection.immutable
+
+import org.apache.pekko
+import pekko.persistence.CapabilityFlag
+import pekko.persistence.state.DurableStateStoreSpec
+
+import org.scalatest.{ Args, ConfigMap, Filter, Status, Suite, TestData }
+
+import com.typesafe.config.Config
+
+/**
+ * JAVA API
+ *
+ * This spec aims to verify custom pekko-persistence
[[pekko.persistence.state.DurableStateStore]]
+ * implementations.
+ * Plugin authors are highly encouraged to include it in their plugin's test
suites.
+ *
+ * In case your durable state store plugin needs some kind of setup or
teardown, override the
+ * `beforeAll` or `afterAll` methods (don't forget to call `super` in your
overridden methods).
+ *
+ * @see [[pekko.persistence.state.DurableStateStoreSpec]]
+ */
+class JavaDurableStateStoreSpec(config: Config) extends
DurableStateStoreSpec(config) {
+ override protected def supportsDeleteWithRevisionCheck: CapabilityFlag =
CapabilityFlag.off()
+
+ override protected def supportsUpsertWithRevisionCheck: CapabilityFlag =
CapabilityFlag.off()
+
+ override protected def supportsSoftDelete: CapabilityFlag =
CapabilityFlag.off()
+
+ override def runTests(testName: Option[String], args: Args): Status =
+ super.runTests(testName, args)
+
+ override def runTest(testName: String, args: Args): Status =
+ super.runTest(testName, args)
+
+ override def run(testName: Option[String], args: Args): Status =
+ super.run(testName, args)
+
+ override def testDataFor(testName: String, theConfigMap: ConfigMap):
TestData =
+ super.testDataFor(testName, theConfigMap)
+
+ override def testNames: Set[String] =
+ super.testNames
+
+ override def tags: Map[String, Set[String]] =
+ super.tags
+
+ override def rerunner: Option[String] =
+ super.rerunner
+
+ override def expectedTestCount(filter: Filter): Int =
+ super.expectedTestCount(filter)
+
+ override def suiteId: String =
+ super.suiteId
+
+ override def suiteName: String =
+ super.suiteName
+
+ override def runNestedSuites(args: Args): Status =
+ super.runNestedSuites(args)
+
+ override def nestedSuites: immutable.IndexedSeq[Suite] =
+ super.nestedSuites
+}
diff --git
a/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala
new file mode 100644
index 0000000000..0212e07dba
--- /dev/null
+++
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.state
+
+import scala.annotation.nowarn
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.persistence._
+import pekko.persistence.scalatest.{ MayVerb, OptionalTests }
+import pekko.persistence.state.scaladsl.DurableStateUpdateStore
+import pekko.testkit.TestProbe
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+
+object DurableStateStoreSpec {
+ val config: Config = ConfigFactory.parseString(s"""
+ pekko.actor {
+ serializers {
+ durable-state-tck-test = "${classOf[TestSerializer].getName}"
+ }
+ serialization-bindings {
+ "${classOf[TestPayload].getName}" = durable-state-tck-test
+ }
+ }
+ """)
+}
+
+/**
+ * This spec aims to verify custom pekko-persistence [[DurableStateStore]]
implementations.
+ * Plugin authors are highly encouraged to include it in their plugin's test
suites.
+ *
+ * In case your durable state store plugin needs some kind of setup or
teardown, override the `beforeAll`
+ * or `afterAll` methods (don't forget to call `super` in your overridden
methods).
+ *
+ * For a Java and JUnit consumable version of the TCK please refer to
+ * [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]].
+ *
+ * @see [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]]
+ */
+abstract class DurableStateStoreSpec(config: Config)
+ extends PluginSpec(config)
+ with MayVerb
+ with OptionalTests
+ with DurableStateStoreCapabilityFlags {
+
+ implicit lazy val system: ActorSystem =
+ ActorSystem("DurableStateStoreSpec",
config.withFallback(DurableStateStoreSpec.config))
+
+ override protected def supportsDeleteWithRevisionCheck: CapabilityFlag =
CapabilityFlag.off()
+
+ override protected def supportsUpsertWithRevisionCheck: CapabilityFlag =
CapabilityFlag.off()
+
+ override protected def supportsSerialization: CapabilityFlag =
CapabilityFlag.on()
+
+ override protected def supportsSoftDelete: CapabilityFlag =
CapabilityFlag.off()
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ preparePersistenceId(pid)
+ }
+
+ /**
+ * Overridable hook that is called before each test case.
+ * `pid` is the `persistenceId` that will be used in the test.
+ * This method may be needed to clean any pre-existing state from the store,
+ * for example when running against a shared external database.
+ */
+ def preparePersistenceId(@nowarn("msg=never used") pid: String): Unit = ()
+
+ /**
+ * Returns the `DurableStateUpdateStore` under test. By default, this uses
the plugin
+ * configured under `pekko.persistence.state.plugin` in the provided config.
+ */
+ def durableStateStore(): DurableStateUpdateStore[Any] =
+
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateUpdateStore[Any]]("")
+
+ protected val timeout: FiniteDuration = 5.seconds
+
+ "A durable state store" must {
+ "not find a non-existing object" in {
+ val result = Await.result(durableStateStore().getObject(pid), timeout)
+ result.value shouldBe None
+ }
+
+ "persist a state and retrieve it" in {
+ val value = s"state-${pid}"
+ Await.result(durableStateStore().upsertObject(pid, 1L, value,
"test-tag"), timeout)
+ val result = Await.result(durableStateStore().getObject(pid), timeout)
+ result.value shouldBe Some(value)
+ result.revision shouldBe 1L
+ }
+
+ "update a state" in {
+ val store = durableStateStore()
+ val value1 = s"state-1-${pid}"
+ val value2 = s"state-2-${pid}"
+ Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout)
+ Await.result(store.upsertObject(pid, 2L, value2, "test-tag"), timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(value2)
+ result.revision shouldBe 2L
+ }
+
+ "delete a state" in {
+ val store = durableStateStore()
+ val value = s"state-${pid}"
+ Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+ Await.result(store.deleteObject(pid, 2L), timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe None
+ }
+
+ "handle different persistence IDs independently" in {
+ val store = durableStateStore()
+ val pid2 = pid + "-2"
+ val value1 = s"state-${pid}"
+ val value2 = s"state-${pid2}"
+ Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout)
+ Await.result(store.upsertObject(pid2, 1L, value2, "test-tag"), timeout)
+
+ val result1 = Await.result(store.getObject(pid), timeout)
+ val result2 = Await.result(store.getObject(pid2), timeout)
+
+ result1.value shouldBe Some(value1)
+ result2.value shouldBe Some(value2)
+ }
+
+ "upsert again after a deletion" in {
+ val store = durableStateStore()
+ val original = s"state-${pid}"
+ val recreated = s"state-${pid}-v2"
+ Await.result(store.upsertObject(pid, 1L, original, "test-tag"), timeout)
+ Await.result(store.deleteObject(pid, 2L), timeout)
+ Await.result(store.upsertObject(pid, 3L, recreated, "test-tag"), timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(recreated)
+ result.revision shouldBe 3L
+ }
+ }
+
+ "A durable state store optionally".may {
+ optional(flag = supportsDeleteWithRevisionCheck) {
+ "fail to delete a state when the revision does not match" in {
+ val store = durableStateStore()
+ val value = s"state-${pid}"
+ Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+ val deleteResult = store.deleteObject(pid, 99L)
+ intercept[Exception] {
+ Await.result(deleteResult, timeout)
+ }
+ // The original state should still be accessible
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(value)
+ result.revision shouldBe 1L
+ }
+ }
+
+ optional(flag = supportsUpsertWithRevisionCheck) {
+ "fail to upsert a state when the revision is stale" in {
+ val store = durableStateStore()
+ val original = s"state-${pid}"
+ val stale = s"state-${pid}-stale"
+ Await.result(store.upsertObject(pid, 1L, original, "test-tag"),
timeout)
+ // Re-using revision 1 should be rejected; the next valid revision is
2.
+ val staleUpsert = store.upsertObject(pid, 1L, stale, "test-tag")
+ intercept[Exception] {
+ Await.result(staleUpsert, timeout)
+ }
+ // The original state should still be accessible
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(original)
+ result.revision shouldBe 1L
+ }
+ }
+
+ optional(flag = supportsSerialization) {
+ "serialize and deserialize values via the configured serializer" in {
+ val store = durableStateStore()
+ val probe = TestProbe()
+ val value = TestPayload(probe.ref)
+ Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe Some(value)
+ result.revision shouldBe 1L
+ }
+ }
+
+ optional(flag = supportsSoftDelete) {
+ "delete a state via the deprecated deleteObject overload" in {
+ val store = durableStateStore()
+ val value = s"state-${pid}"
+ Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+ @nowarn("cat=deprecation")
+ val deleteResult = store.deleteObject(pid)
+ Await.result(deleteResult, timeout)
+ val result = Await.result(store.getObject(pid), timeout)
+ result.value shouldBe None
+ }
+ }
+ }
+}
diff --git
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala
new file mode 100644
index 0000000000..4eec73e0c4
--- /dev/null
+++
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.testkit.state.scaladsl
+
+import org.apache.pekko
+import pekko.persistence.CapabilityFlag
+import pekko.persistence.state.DurableStateStoreSpec
+import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
+
+import com.typesafe.config.ConfigFactory
+
+object PersistenceTestKitDurableStateStoreTCKSpec {
+ val config =
PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString("""
+ pekko.loglevel = DEBUG
+ """))
+}
+
+class PersistenceTestKitDurableStateStoreTCKSpec
+ extends
DurableStateStoreSpec(PersistenceTestKitDurableStateStoreTCKSpec.config) {
+ override protected def supportsDeleteWithRevisionCheck: CapabilityFlag =
CapabilityFlag.off()
+ override protected def supportsSerialization: CapabilityFlag =
CapabilityFlag.off()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]