This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 35a1e23b62 Enhance DurableStateStore TCK with OCC, serialization, and 
soft-delete tests (#2917)
35a1e23b62 is described below

commit 35a1e23b62d96208891a459f0e5e4d95a8b288a4
Author: David Rose <[email protected]>
AuthorDate: Mon Apr 27 05:04:22 2026 -0700

    Enhance DurableStateStore TCK with OCC, serialization, and soft-delete 
tests (#2917)
    
    * Add DurableStateStore TCK to persistence-tck module
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/77803e79-3631-4111-98a8-4a1cf6f7beca
    
    Co-authored-by: pjfanning <[email protected]>
    
    * license headers
    
    * use non-empty tags in tests due to issues with Oracle JDBC tests
    
    * Enhance DurableStateStore TCK with OCC, serialization, and soft-delete 
tests
    
    Builds on #2833 to address the gaps discussed in #2831:
    
    - Optimistic concurrency check on upsert (supportsUpsertWithRevisionCheck)
    - Roundtrip serialization via TestPayload (supportsSerialization, defaults 
on
      to match JournalSpec convention)
    - Deprecated single-arg deleteObject(pid) overload (supportsSoftDelete)
    - New required test: upsert again after deletion
    - preparePersistenceId cleanup hook for stateful plugins
    - Brief plugin-author docs subsection in persistence-journals.md mirroring
      the Journal/SnapshotStore TCK pattern
    
    The reference TCK against PersistenceTestKitDurableStateStore passes 6/6
    required tests; all 4 optional capability flags correctly skip when off.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * license dates
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
    Co-authored-by: PJ Fanning <[email protected]>
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 docs/src/main/paradox/persistence-journals.md      |   8 +
 .../LambdaPersistencePluginDocTest.java            |  25 +++
 .../persistence/PersistencePluginDocSpec.scala     |  21 ++
 .../apache/pekko/persistence/CapabilityFlags.scala |  30 +++
 .../japi/state/JavaDurableStateStoreSpec.scala     |  80 ++++++++
 .../persistence/state/DurableStateStoreSpec.scala  | 216 +++++++++++++++++++++
 ...ersistenceTestKitDurableStateStoreTCKSpec.scala |  37 ++++
 7 files changed, 417 insertions(+)

diff --git a/docs/src/main/paradox/persistence-journals.md 
b/docs/src/main/paradox/persistence-journals.md
index ff11ff70b9..0abda75c27 100644
--- a/docs/src/main/paradox/persistence-journals.md
+++ b/docs/src/main/paradox/persistence-journals.md
@@ -134,6 +134,14 @@ Scala
 Java
 :  @@snip 
[LambdaPersistencePluginDocTest.java](/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java)
 { #snapshot-store-tck-java }
 
+To include the `DurableStateStore` TCK tests in your test suite simply extend 
the provided @scala[`DurableStateStoreSpec`]@java[`JavaDurableStateStoreSpec`]:
+
+Scala
+:  @@snip 
[PersistencePluginDocSpec.scala](/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala)
 { #durable-state-tck-scala }
+
+Java
+:  @@snip 
[LambdaPersistencePluginDocTest.java](/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java)
 { #durable-state-tck-java }
+
 In case your plugin requires some setting up (starting a mock database, 
removing temporary files etc.) you can override the
 `beforeAll` and `afterAll` methods to hook into the tests lifecycle:
 
diff --git 
a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java 
b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java
index 3481650f82..0035109ba1 100644
--- a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java
+++ b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java
@@ -27,6 +27,7 @@ import org.apache.pekko.actor.*;
 import org.apache.pekko.persistence.*;
 import org.apache.pekko.persistence.japi.journal.JavaJournalSpec;
 import org.apache.pekko.persistence.japi.snapshot.JavaSnapshotStoreSpec;
+import org.apache.pekko.persistence.japi.state.JavaDurableStateStoreSpec;
 import org.apache.pekko.persistence.journal.japi.*;
 import org.apache.pekko.persistence.journal.leveldb.SharedLeveldbJournal;
 import org.apache.pekko.persistence.journal.leveldb.SharedLeveldbStore;
@@ -172,6 +173,30 @@ public class LambdaPersistencePluginDocTest {
         // #snapshot-store-tck-java
       };
 
+  static Object o3b =
+      new Object() {
+        // #durable-state-tck-java
+        class MyDurableStateStoreTest extends JavaDurableStateStoreSpec {
+
+          public MyDurableStateStoreTest() {
+            super(
+                ConfigFactory.parseString(
+                    "pekko.persistence.state.plugin = 
\"my.durable-state.plugin\""));
+          }
+
+          @Override
+          public CapabilityFlag supportsDeleteWithRevisionCheck() {
+            return CapabilityFlag.on();
+          }
+
+          @Override
+          public CapabilityFlag supportsUpsertWithRevisionCheck() {
+            return CapabilityFlag.on();
+          }
+        }
+        // #durable-state-tck-java
+      };
+
   static Object o4 =
       new Object() {
         // https://github.com/pekko/pekko/issues/26826
diff --git 
a/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala 
b/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala
index 62ecdaffcd..79d2d8c3dc 100644
--- a/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala
+++ b/docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala
@@ -222,6 +222,27 @@ object PersistenceTCKDoc {
     }
     // #snapshot-store-tck-scala
   }
+  object example2b {
+    import org.apache.pekko.persistence.state.DurableStateStoreSpec
+
+    // #durable-state-tck-scala
+    class MyDurableStateStoreSpec
+        extends DurableStateStoreSpec(
+          config = ConfigFactory.parseString("""
+        pekko.persistence.state.plugin = "my.durable-state.plugin"
+        """)) {
+
+      override def supportsDeleteWithRevisionCheck: CapabilityFlag =
+        true // or CapabilityFlag.on
+
+      override def supportsUpsertWithRevisionCheck: CapabilityFlag =
+        true // or CapabilityFlag.on
+
+      override def supportsSerialization: CapabilityFlag =
+        true // or CapabilityFlag.on
+    }
+    // #durable-state-tck-scala
+  }
   object example3 {
     import java.io.File
 
diff --git 
a/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala
 
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala
index acac5ef412..0821608de3 100644
--- 
a/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala
+++ 
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala
@@ -86,3 +86,33 @@ trait SnapshotStoreCapabilityFlags extends CapabilityFlags {
   protected def supportsMetadata: CapabilityFlag
 }
 //#snapshot-store-flags
+
+//#durable-state-store-flags
+trait DurableStateStoreCapabilityFlags extends CapabilityFlags {
+
+  /**
+   * When `true` enables tests which check if the durable state store properly 
rejects
+   * a `deleteObject` call when the revision does not match the stored 
revision.
+   */
+  protected def supportsDeleteWithRevisionCheck: CapabilityFlag
+
+  /**
+   * When `true` enables tests which check if the durable state store properly 
rejects
+   * an `upsertObject` call when the revision is stale (does not match the 
expected next revision).
+   * This is the optimistic concurrency check on writes.
+   */
+  protected def supportsUpsertWithRevisionCheck: CapabilityFlag
+
+  /**
+   * When `true` enables tests which check if the durable state store properly 
serializes
+   * and deserializes values via the configured Pekko serializer 
infrastructure.
+   */
+  protected def supportsSerialization: CapabilityFlag
+
+  /**
+   * When `true` enables tests which check the deprecated single-argument 
`deleteObject(persistenceId)`
+   * overload, which deletes regardless of the current revision.
+   */
+  protected def supportsSoftDelete: CapabilityFlag
+}
+//#durable-state-store-flags
diff --git 
a/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala
 
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala
new file mode 100644
index 0000000000..235dbd8ae6
--- /dev/null
+++ 
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.japi.state
+
+import scala.collection.immutable
+
+import org.apache.pekko
+import pekko.persistence.CapabilityFlag
+import pekko.persistence.state.DurableStateStoreSpec
+
+import org.scalatest.{ Args, ConfigMap, Filter, Status, Suite, TestData }
+
+import com.typesafe.config.Config
+
+/**
+ * JAVA API
+ *
+ * This spec aims to verify custom pekko-persistence 
[[pekko.persistence.state.DurableStateStore]]
+ * implementations.
+ * Plugin authors are highly encouraged to include it in their plugin's test 
suites.
+ *
+ * In case your durable state store plugin needs some kind of setup or 
teardown, override the
+ * `beforeAll` or `afterAll` methods (don't forget to call `super` in your 
overridden methods).
+ *
+ * @see [[pekko.persistence.state.DurableStateStoreSpec]]
+ */
+class JavaDurableStateStoreSpec(config: Config) extends 
DurableStateStoreSpec(config) {
+  override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = 
CapabilityFlag.off()
+
+  override protected def supportsUpsertWithRevisionCheck: CapabilityFlag = 
CapabilityFlag.off()
+
+  override protected def supportsSoftDelete: CapabilityFlag = 
CapabilityFlag.off()
+
+  override def runTests(testName: Option[String], args: Args): Status =
+    super.runTests(testName, args)
+
+  override def runTest(testName: String, args: Args): Status =
+    super.runTest(testName, args)
+
+  override def run(testName: Option[String], args: Args): Status =
+    super.run(testName, args)
+
+  override def testDataFor(testName: String, theConfigMap: ConfigMap): 
TestData =
+    super.testDataFor(testName, theConfigMap)
+
+  override def testNames: Set[String] =
+    super.testNames
+
+  override def tags: Map[String, Set[String]] =
+    super.tags
+
+  override def rerunner: Option[String] =
+    super.rerunner
+
+  override def expectedTestCount(filter: Filter): Int =
+    super.expectedTestCount(filter)
+
+  override def suiteId: String =
+    super.suiteId
+
+  override def suiteName: String =
+    super.suiteName
+
+  override def runNestedSuites(args: Args): Status =
+    super.runNestedSuites(args)
+
+  override def nestedSuites: immutable.IndexedSeq[Suite] =
+    super.nestedSuites
+}
diff --git 
a/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala
 
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala
new file mode 100644
index 0000000000..0212e07dba
--- /dev/null
+++ 
b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.state
+
+import scala.annotation.nowarn
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.persistence._
+import pekko.persistence.scalatest.{ MayVerb, OptionalTests }
+import pekko.persistence.state.scaladsl.DurableStateUpdateStore
+import pekko.testkit.TestProbe
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+
+object DurableStateStoreSpec {
+  val config: Config = ConfigFactory.parseString(s"""
+    pekko.actor {
+      serializers {
+        durable-state-tck-test = "${classOf[TestSerializer].getName}"
+      }
+      serialization-bindings {
+        "${classOf[TestPayload].getName}" = durable-state-tck-test
+      }
+    }
+    """)
+}
+
+/**
+ * This spec aims to verify custom pekko-persistence [[DurableStateStore]] 
implementations.
+ * Plugin authors are highly encouraged to include it in their plugin's test 
suites.
+ *
+ * In case your durable state store plugin needs some kind of setup or 
teardown, override the `beforeAll`
+ * or `afterAll` methods (don't forget to call `super` in your overridden 
methods).
+ *
+ * For a Java and JUnit consumable version of the TCK please refer to
+ * [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]].
+ *
+ * @see [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]]
+ */
+abstract class DurableStateStoreSpec(config: Config)
+    extends PluginSpec(config)
+    with MayVerb
+    with OptionalTests
+    with DurableStateStoreCapabilityFlags {
+
+  implicit lazy val system: ActorSystem =
+    ActorSystem("DurableStateStoreSpec", 
config.withFallback(DurableStateStoreSpec.config))
+
+  override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = 
CapabilityFlag.off()
+
+  override protected def supportsUpsertWithRevisionCheck: CapabilityFlag = 
CapabilityFlag.off()
+
+  override protected def supportsSerialization: CapabilityFlag = 
CapabilityFlag.on()
+
+  override protected def supportsSoftDelete: CapabilityFlag = 
CapabilityFlag.off()
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    preparePersistenceId(pid)
+  }
+
+  /**
+   * Overridable hook that is called before each test case.
+   * `pid` is the `persistenceId` that will be used in the test.
+   * This method may be needed to clean any pre-existing state from the store,
+   * for example when running against a shared external database.
+   */
+  def preparePersistenceId(@nowarn("msg=never used") pid: String): Unit = ()
+
+  /**
+   * Returns the `DurableStateUpdateStore` under test. By default, this uses 
the plugin
+   * configured under `pekko.persistence.state.plugin` in the provided config.
+   */
+  def durableStateStore(): DurableStateUpdateStore[Any] =
+    
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateUpdateStore[Any]]("")
+
+  protected val timeout: FiniteDuration = 5.seconds
+
+  "A durable state store" must {
+    "not find a non-existing object" in {
+      val result = Await.result(durableStateStore().getObject(pid), timeout)
+      result.value shouldBe None
+    }
+
+    "persist a state and retrieve it" in {
+      val value = s"state-${pid}"
+      Await.result(durableStateStore().upsertObject(pid, 1L, value, 
"test-tag"), timeout)
+      val result = Await.result(durableStateStore().getObject(pid), timeout)
+      result.value shouldBe Some(value)
+      result.revision shouldBe 1L
+    }
+
+    "update a state" in {
+      val store = durableStateStore()
+      val value1 = s"state-1-${pid}"
+      val value2 = s"state-2-${pid}"
+      Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout)
+      Await.result(store.upsertObject(pid, 2L, value2, "test-tag"), timeout)
+      val result = Await.result(store.getObject(pid), timeout)
+      result.value shouldBe Some(value2)
+      result.revision shouldBe 2L
+    }
+
+    "delete a state" in {
+      val store = durableStateStore()
+      val value = s"state-${pid}"
+      Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+      Await.result(store.deleteObject(pid, 2L), timeout)
+      val result = Await.result(store.getObject(pid), timeout)
+      result.value shouldBe None
+    }
+
+    "handle different persistence IDs independently" in {
+      val store = durableStateStore()
+      val pid2 = pid + "-2"
+      val value1 = s"state-${pid}"
+      val value2 = s"state-${pid2}"
+      Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout)
+      Await.result(store.upsertObject(pid2, 1L, value2, "test-tag"), timeout)
+
+      val result1 = Await.result(store.getObject(pid), timeout)
+      val result2 = Await.result(store.getObject(pid2), timeout)
+
+      result1.value shouldBe Some(value1)
+      result2.value shouldBe Some(value2)
+    }
+
+    "upsert again after a deletion" in {
+      val store = durableStateStore()
+      val original = s"state-${pid}"
+      val recreated = s"state-${pid}-v2"
+      Await.result(store.upsertObject(pid, 1L, original, "test-tag"), timeout)
+      Await.result(store.deleteObject(pid, 2L), timeout)
+      Await.result(store.upsertObject(pid, 3L, recreated, "test-tag"), timeout)
+      val result = Await.result(store.getObject(pid), timeout)
+      result.value shouldBe Some(recreated)
+      result.revision shouldBe 3L
+    }
+  }
+
+  "A durable state store optionally".may {
+    optional(flag = supportsDeleteWithRevisionCheck) {
+      "fail to delete a state when the revision does not match" in {
+        val store = durableStateStore()
+        val value = s"state-${pid}"
+        Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+        val deleteResult = store.deleteObject(pid, 99L)
+        intercept[Exception] {
+          Await.result(deleteResult, timeout)
+        }
+        // The original state should still be accessible
+        val result = Await.result(store.getObject(pid), timeout)
+        result.value shouldBe Some(value)
+        result.revision shouldBe 1L
+      }
+    }
+
+    optional(flag = supportsUpsertWithRevisionCheck) {
+      "fail to upsert a state when the revision is stale" in {
+        val store = durableStateStore()
+        val original = s"state-${pid}"
+        val stale = s"state-${pid}-stale"
+        Await.result(store.upsertObject(pid, 1L, original, "test-tag"), 
timeout)
+        // Re-using revision 1 should be rejected; the next valid revision is 
2.
+        val staleUpsert = store.upsertObject(pid, 1L, stale, "test-tag")
+        intercept[Exception] {
+          Await.result(staleUpsert, timeout)
+        }
+        // The original state should still be accessible
+        val result = Await.result(store.getObject(pid), timeout)
+        result.value shouldBe Some(original)
+        result.revision shouldBe 1L
+      }
+    }
+
+    optional(flag = supportsSerialization) {
+      "serialize and deserialize values via the configured serializer" in {
+        val store = durableStateStore()
+        val probe = TestProbe()
+        val value = TestPayload(probe.ref)
+        Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+        val result = Await.result(store.getObject(pid), timeout)
+        result.value shouldBe Some(value)
+        result.revision shouldBe 1L
+      }
+    }
+
+    optional(flag = supportsSoftDelete) {
+      "delete a state via the deprecated deleteObject overload" in {
+        val store = durableStateStore()
+        val value = s"state-${pid}"
+        Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout)
+        @nowarn("cat=deprecation")
+        val deleteResult = store.deleteObject(pid)
+        Await.result(deleteResult, timeout)
+        val result = Await.result(store.getObject(pid), timeout)
+        result.value shouldBe None
+      }
+    }
+  }
+}
diff --git 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala
 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala
new file mode 100644
index 0000000000..4eec73e0c4
--- /dev/null
+++ 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.testkit.state.scaladsl
+
+import org.apache.pekko
+import pekko.persistence.CapabilityFlag
+import pekko.persistence.state.DurableStateStoreSpec
+import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
+
+import com.typesafe.config.ConfigFactory
+
+object PersistenceTestKitDurableStateStoreTCKSpec {
+  val config = 
PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString("""
+    pekko.loglevel = DEBUG
+    """))
+}
+
+class PersistenceTestKitDurableStateStoreTCKSpec
+    extends 
DurableStateStoreSpec(PersistenceTestKitDurableStateStoreTCKSpec.config) {
+  override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = 
CapabilityFlag.off()
+  override protected def supportsSerialization: CapabilityFlag = 
CapabilityFlag.off()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to