This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new bac4192b1e Port akka-core#31753: Run DurableStateBehavior delete 
effect asynchronously like persist (#2752)
bac4192b1e is described below

commit bac4192b1ee0b9f67ee8be52d0b3f4da2c6dffd7
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Mar 19 23:47:19 2026 +0100

    Port akka-core#31753: Run DurableStateBehavior delete effect asynchronously 
like persist (#2752)
    
    * Initial plan
    
    * Port akka-core PR #31753: run DurableStateBehavior delete effect same way 
as persist
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../scaladsl/DurableStateBehaviorReplySpec.scala   |  25 ++++-
 ...lySpec.scala => DurableStateBehaviorSpec.scala} | 111 ++++++++++-----------
 .../internal/DurableStateStoreInteractions.scala   |   9 +-
 .../persistence/typed/state/internal/Running.scala |  35 ++++++-
 4 files changed, 110 insertions(+), 70 deletions(-)

diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
index e0cace3710..ecc1ceae62 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
@@ -92,7 +92,7 @@ class DurableStateBehaviorReplySpec
   val pidCounter = new AtomicInteger(0)
   private def nextPid(): PersistenceId = 
PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()}")
 
-  "A typed persistent actor with commands that are expecting replies" must {
+  "A DurableStateBehavior actor with commands that are expecting replies" must 
{
 
     "persist state thenReply" in {
       val c = spawn(counter(nextPid()))
@@ -137,5 +137,28 @@ class DurableStateBehaviorReplySpec
       c ! GetValue(queryProbe.ref)
       queryProbe.expectMessage(State(0))
     }
+
+    "handle commands sequentially" in {
+      val c = spawn(counter(nextPid()))
+      val probe = TestProbe[Any]()
+
+      c ! IncrementWithConfirmation(probe.ref)
+      c ! IncrementWithConfirmation(probe.ref)
+      c ! IncrementWithConfirmation(probe.ref)
+      c ! GetValue(probe.ref)
+      probe.expectMessage(Done)
+      probe.expectMessage(Done)
+      probe.expectMessage(Done)
+      probe.expectMessage(State(3))
+
+      c ! IncrementWithConfirmation(probe.ref)
+      c ! IncrementWithConfirmation(probe.ref)
+      c ! DeleteWithConfirmation(probe.ref)
+      c ! GetValue(probe.ref)
+      probe.expectMessage(Done)
+      probe.expectMessage(Done)
+      probe.expectMessage(Done)
+      probe.expectMessage(State(0))
+    }
   }
 }
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorSpec.scala
similarity index 60%
copy from 
persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
copy to 
persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorSpec.scala
index e0cace3710..bc33a87e0a 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorSpec.scala
@@ -17,11 +17,9 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.pekko
 import pekko.Done
+import pekko.actor.testkit.typed.TestKitSettings
 import pekko.actor.testkit.typed.scaladsl._
 import pekko.actor.typed.ActorRef
-import pekko.actor.typed.Behavior
-import pekko.actor.typed.scaladsl.ActorContext
-import pekko.actor.typed.scaladsl.Behaviors
 import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
 import pekko.persistence.typed.PersistenceId
 import pekko.serialization.jackson.CborSerializable
@@ -31,99 +29,75 @@ import org.scalatest.wordspec.AnyWordSpecLike
 import com.typesafe.config.Config
 import com.typesafe.config.ConfigFactory
 
-object DurableStateBehaviorReplySpec {
+object DurableStateBehaviorSpec {
+
   def conf: Config = 
PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString(s"""
     pekko.loglevel = INFO
     """))
 
-  sealed trait Command[ReplyMessage] extends CborSerializable
-  final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends 
Command[Done]
-  final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends 
Command[Done]
-  final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done]
-  final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
-  final case class DeleteWithConfirmation(replyTo: ActorRef[Done]) extends 
Command[Done]
-  case object Increment extends Command[Nothing]
-  case class IncrementBy(by: Int) extends Command[Nothing]
+  sealed trait Command extends CborSerializable
+  final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends 
Command
+  final case class GetValue(replyTo: ActorRef[State]) extends Command
+  final case class DeleteWithConfirmation(replyTo: ActorRef[Done]) extends 
Command
+  case class IncrementBy(by: Int) extends Command
 
   final case class State(value: Int) extends CborSerializable
 
-  def counter(persistenceId: PersistenceId): Behavior[Command[_]] =
-    Behaviors.setup(ctx => counter(ctx, persistenceId))
-
-  def counter(ctx: ActorContext[Command[_]], persistenceId: PersistenceId): 
DurableStateBehavior[Command[_], State] = {
-    DurableStateBehavior.withEnforcedReplies[Command[_], State](
+  def counter(persistenceId: PersistenceId): DurableStateBehavior[Command, 
State] = {
+    DurableStateBehavior(
       persistenceId,
       emptyState = State(0),
       commandHandler = (state, command) =>
         command match {
 
-          case IncrementWithConfirmation(replyTo) =>
-            Effect.persist(state.copy(value = state.value + 
1)).thenReply(replyTo)(_ => Done)
-
-          case IncrementReplyLater(replyTo) =>
-            Effect
-              .persist(state.copy(value = state.value + 1))
-              .thenRun((_: State) => ctx.self ! ReplyNow(replyTo))
-              .thenNoReply()
+          case IncrementBy(by) =>
+            Effect.persist(state.copy(value = state.value + by))
 
-          case ReplyNow(replyTo) =>
-            Effect.reply(replyTo)(Done)
+          case IncrementWithConfirmation(replyTo) =>
+            Effect.persist(state.copy(value = state.value + 1)).thenRun(_ => 
replyTo ! Done)
 
           case GetValue(replyTo) =>
-            Effect.reply(replyTo)(state)
+            replyTo ! state
+            Effect.none
 
           case DeleteWithConfirmation(replyTo) =>
-            Effect.delete[State]().thenReply(replyTo)(_ => Done)
-
-          case _ => ???
-
+            Effect.delete[State]().thenRun(_ => replyTo ! Done)
         })
   }
-
 }
 
-class DurableStateBehaviorReplySpec
-    extends ScalaTestWithActorTestKit(DurableStateBehaviorReplySpec.conf)
+class DurableStateBehaviorSpec
+    extends ScalaTestWithActorTestKit(DurableStateBehaviorSpec.conf)
     with AnyWordSpecLike
     with LogCapturing {
+  import DurableStateBehaviorSpec._
 
-  import DurableStateBehaviorReplySpec._
+  implicit val testSettings: TestKitSettings = TestKitSettings(system)
 
   val pidCounter = new AtomicInteger(0)
   private def nextPid(): PersistenceId = 
PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()}")
 
-  "A typed persistent actor with commands that are expecting replies" must {
-
-    "persist state thenReply" in {
+  "A DurableStateBehavior actor" must {
+    "persist and update state" in {
       val c = spawn(counter(nextPid()))
-      val probe = TestProbe[Done]()
-      c ! IncrementWithConfirmation(probe.ref)
-      probe.expectMessage(Done)
+      val updateProbe = TestProbe[Done]()
+      val queryProbe = TestProbe[State]()
 
-      c ! IncrementWithConfirmation(probe.ref)
-      c ! IncrementWithConfirmation(probe.ref)
-      probe.expectMessage(Done)
-      probe.expectMessage(Done)
-    }
+      c ! IncrementWithConfirmation(updateProbe.ref)
+      updateProbe.expectMessage(Done)
 
-    "persist state thenReply later" in {
-      val c = spawn(counter(nextPid()))
-      val probe = TestProbe[Done]()
-      c ! IncrementReplyLater(probe.ref)
-      probe.expectMessage(Done)
-    }
+      c ! GetValue(queryProbe.ref)
+      queryProbe.expectMessage(State(1))
 
-    "reply to query command" in {
-      val c = spawn(counter(nextPid()))
-      val updateProbe = TestProbe[Done]()
+      c ! IncrementBy(5)
       c ! IncrementWithConfirmation(updateProbe.ref)
+      updateProbe.expectMessage(Done)
 
-      val queryProbe = TestProbe[State]()
       c ! GetValue(queryProbe.ref)
-      queryProbe.expectMessage(State(1))
+      queryProbe.expectMessage(State(7))
     }
 
-    "delete state thenReply" in {
+    "delete state" in {
       val c = spawn(counter(nextPid()))
       val updateProbe = TestProbe[Done]()
       c ! IncrementWithConfirmation(updateProbe.ref)
@@ -137,5 +111,24 @@ class DurableStateBehaviorReplySpec
       c ! GetValue(queryProbe.ref)
       queryProbe.expectMessage(State(0))
     }
+
+    "handle commands sequentially" in {
+      val c = spawn(counter(nextPid()))
+      val probe = TestProbe[Any]()
+
+      c ! IncrementWithConfirmation(probe.ref)
+      c ! IncrementWithConfirmation(probe.ref)
+      c ! IncrementWithConfirmation(probe.ref)
+      c ! GetValue(probe.ref)
+      probe.expectMessage(Done)
+      probe.expectMessage(Done)
+      probe.expectMessage(Done)
+      probe.expectMessage(State(3))
+
+      c ! DeleteWithConfirmation(probe.ref)
+      c ! GetValue(probe.ref)
+      probe.expectMessage(Done)
+      probe.expectMessage(State(0))
+    }
   }
 }
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
index 56e8fe8627..dcd52ff68a 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
@@ -68,13 +68,13 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
 
   protected def internalDelete(
       ctx: ActorContext[InternalProtocol],
-      cmd: Any,
+      @nowarn("msg=never used") cmd: Any,
       state: Running.RunningState[S, C]): Running.RunningState[S, C] = {
 
     val newRunningState: Running.RunningState[S, C] = 
state.nextRevision().copy(state = setup.emptyState)
     val persistenceId = setup.persistenceId.id
 
-    onDeleteInitiated(ctx, cmd)
+    // TODO Might need to call hook method for Telemetry
 
     ctx.pipeToSelf[Done](setup.durableStateStore.deleteObject(persistenceId, 
newRunningState.revision)) {
       case Success(_)     => InternalProtocol.DeleteSuccess
@@ -84,14 +84,11 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
     newRunningState
   }
 
-  // FIXME These hook methods are for Telemetry. What more parameters are 
needed? persistenceId?
+  // TODO These hook methods are for Telemetry. What more parameters are 
needed? persistenceId?
   @InternalStableApi
   private[pekko] def onWriteInitiated(@nowarn("msg=never used") ctx: 
ActorContext[_],
       @nowarn("msg=never used") cmd: Any): Unit = ()
 
-  private[pekko] def onDeleteInitiated(@nowarn("msg=never used") ctx: 
ActorContext[_],
-      @nowarn("msg=never used") cmd: Any): Unit = ()
-
   protected def requestRecoveryPermit(): Unit = {
     
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit,
 setup.selfClassic)
   }
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
index 91d39fdad4..4a3f73bad8 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
@@ -195,6 +195,16 @@ private[pekko] object Running {
       (persistingState(newState2, state, sideEffects), false)
     }
 
+    private def handleDelete(
+        cmd: Any,
+        sideEffects: immutable.Seq[SideEffect[S]]): 
(Behavior[InternalProtocol], Boolean) = {
+      _currentRevision = state.revision + 1
+
+      val nextState = internalDelete(setup.context, cmd, state)
+
+      (persistingState(nextState, state, sideEffects), false)
+    }
+
     @tailrec def applyEffects(
         msg: Any,
         state: RunningState[S, C],
@@ -219,8 +229,7 @@ private[pekko] object Running {
           (applySideEffects(sideEffects, state), true)
 
         case _: Delete[_] =>
-          val nextState = internalDelete(setup.context, msg, state)
-          (applySideEffects(sideEffects, nextState), true)
+          handleDelete(msg, sideEffects)
 
         case _: Unhandled.type =>
           import pekko.actor.typed.scaladsl.adapter._
@@ -271,12 +280,12 @@ private[pekko] object Running {
         case UpsertFailure(exc)                => onUpsertFailed(exc)
         case in: IncomingCommand[C @unchecked] => onCommand(in)
         case get: GetState[S @unchecked]       => stashInternal(get)
+        case DeleteSuccess                     => onDeleteSuccess()
+        case DeleteFailure(exc)                => onDeleteFailed(exc)
         case RecoveryTimeout                   => Behaviors.unhandled
         case RecoveryPermitGranted             => Behaviors.unhandled
         case _: GetSuccess[_]                  => Behaviors.unhandled
         case _: GetFailure                     => Behaviors.unhandled
-        case DeleteSuccess                     => Behaviors.unhandled
-        case DeleteFailure(_)                  => Behaviors.unhandled
       }
     }
 
@@ -308,6 +317,24 @@ private[pekko] object Running {
       throw new DurableStateStoreException(setup.persistenceId, 
currentRevision, cause)
     }
 
+    final def onDeleteSuccess(): Behavior[InternalProtocol] = {
+      if (setup.internalLogger.isDebugEnabled) {
+        setup.internalLogger
+          .debug("Received DeleteSuccess response after: {} nanos", 
System.nanoTime() - persistStartTime)
+      }
+
+      // TODO Might need to call hook method for Telemetry
+
+      visibleState = state
+      val newState = applySideEffects(sideEffects, state)
+      tryUnstashOne(newState)
+    }
+
+    final def onDeleteFailed(cause: Throwable): Behavior[InternalProtocol] = {
+      // TODO Might need to call hook method for Telemetry
+      throw new DurableStateStoreException(setup.persistenceId, 
currentRevision, cause)
+    }
+
     override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] 
= {
       case PoisonPill =>
         // wait for store responses before stopping


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to