This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new bac4192b1e Port akka-core#31753: Run DurableStateBehavior delete
effect asynchronously like persist (#2752)
bac4192b1e is described below
commit bac4192b1ee0b9f67ee8be52d0b3f4da2c6dffd7
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Mar 19 23:47:19 2026 +0100
Port akka-core#31753: Run DurableStateBehavior delete effect asynchronously
like persist (#2752)
* Initial plan
* Port akka-core PR #31753: run DurableStateBehavior delete effect same way
as persist
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../scaladsl/DurableStateBehaviorReplySpec.scala | 25 ++++-
...lySpec.scala => DurableStateBehaviorSpec.scala} | 111 ++++++++++-----------
.../internal/DurableStateStoreInteractions.scala | 9 +-
.../persistence/typed/state/internal/Running.scala | 35 ++++++-
4 files changed, 110 insertions(+), 70 deletions(-)
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
index e0cace3710..ecc1ceae62 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
@@ -92,7 +92,7 @@ class DurableStateBehaviorReplySpec
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId =
PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()}")
- "A typed persistent actor with commands that are expecting replies" must {
+ "A DurableStateBehavior actor with commands that are expecting replies" must
{
"persist state thenReply" in {
val c = spawn(counter(nextPid()))
@@ -137,5 +137,28 @@ class DurableStateBehaviorReplySpec
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(0))
}
+
+ "handle commands sequentially" in {
+ val c = spawn(counter(nextPid()))
+ val probe = TestProbe[Any]()
+
+ c ! IncrementWithConfirmation(probe.ref)
+ c ! IncrementWithConfirmation(probe.ref)
+ c ! IncrementWithConfirmation(probe.ref)
+ c ! GetValue(probe.ref)
+ probe.expectMessage(Done)
+ probe.expectMessage(Done)
+ probe.expectMessage(Done)
+ probe.expectMessage(State(3))
+
+ c ! IncrementWithConfirmation(probe.ref)
+ c ! IncrementWithConfirmation(probe.ref)
+ c ! DeleteWithConfirmation(probe.ref)
+ c ! GetValue(probe.ref)
+ probe.expectMessage(Done)
+ probe.expectMessage(Done)
+ probe.expectMessage(Done)
+ probe.expectMessage(State(0))
+ }
}
}
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorSpec.scala
similarity index 60%
copy from
persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
copy to
persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorSpec.scala
index e0cace3710..bc33a87e0a 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorSpec.scala
@@ -17,11 +17,9 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.pekko
import pekko.Done
+import pekko.actor.testkit.typed.TestKitSettings
import pekko.actor.testkit.typed.scaladsl._
import pekko.actor.typed.ActorRef
-import pekko.actor.typed.Behavior
-import pekko.actor.typed.scaladsl.ActorContext
-import pekko.actor.typed.scaladsl.Behaviors
import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
import pekko.persistence.typed.PersistenceId
import pekko.serialization.jackson.CborSerializable
@@ -31,99 +29,75 @@ import org.scalatest.wordspec.AnyWordSpecLike
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
-object DurableStateBehaviorReplySpec {
+object DurableStateBehaviorSpec {
+
def conf: Config =
PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString(s"""
pekko.loglevel = INFO
"""))
- sealed trait Command[ReplyMessage] extends CborSerializable
- final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends
Command[Done]
- final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends
Command[Done]
- final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done]
- final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
- final case class DeleteWithConfirmation(replyTo: ActorRef[Done]) extends
Command[Done]
- case object Increment extends Command[Nothing]
- case class IncrementBy(by: Int) extends Command[Nothing]
+ sealed trait Command extends CborSerializable
+ final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends
Command
+ final case class GetValue(replyTo: ActorRef[State]) extends Command
+ final case class DeleteWithConfirmation(replyTo: ActorRef[Done]) extends
Command
+ case class IncrementBy(by: Int) extends Command
final case class State(value: Int) extends CborSerializable
- def counter(persistenceId: PersistenceId): Behavior[Command[_]] =
- Behaviors.setup(ctx => counter(ctx, persistenceId))
-
- def counter(ctx: ActorContext[Command[_]], persistenceId: PersistenceId):
DurableStateBehavior[Command[_], State] = {
- DurableStateBehavior.withEnforcedReplies[Command[_], State](
+ def counter(persistenceId: PersistenceId): DurableStateBehavior[Command,
State] = {
+ DurableStateBehavior(
persistenceId,
emptyState = State(0),
commandHandler = (state, command) =>
command match {
- case IncrementWithConfirmation(replyTo) =>
- Effect.persist(state.copy(value = state.value +
1)).thenReply(replyTo)(_ => Done)
-
- case IncrementReplyLater(replyTo) =>
- Effect
- .persist(state.copy(value = state.value + 1))
- .thenRun((_: State) => ctx.self ! ReplyNow(replyTo))
- .thenNoReply()
+ case IncrementBy(by) =>
+ Effect.persist(state.copy(value = state.value + by))
- case ReplyNow(replyTo) =>
- Effect.reply(replyTo)(Done)
+ case IncrementWithConfirmation(replyTo) =>
+ Effect.persist(state.copy(value = state.value + 1)).thenRun(_ =>
replyTo ! Done)
case GetValue(replyTo) =>
- Effect.reply(replyTo)(state)
+ replyTo ! state
+ Effect.none
case DeleteWithConfirmation(replyTo) =>
- Effect.delete[State]().thenReply(replyTo)(_ => Done)
-
- case _ => ???
-
+ Effect.delete[State]().thenRun(_ => replyTo ! Done)
})
}
-
}
-class DurableStateBehaviorReplySpec
- extends ScalaTestWithActorTestKit(DurableStateBehaviorReplySpec.conf)
+class DurableStateBehaviorSpec
+ extends ScalaTestWithActorTestKit(DurableStateBehaviorSpec.conf)
with AnyWordSpecLike
with LogCapturing {
+ import DurableStateBehaviorSpec._
- import DurableStateBehaviorReplySpec._
+ implicit val testSettings: TestKitSettings = TestKitSettings(system)
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId =
PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()}")
- "A typed persistent actor with commands that are expecting replies" must {
-
- "persist state thenReply" in {
+ "A DurableStateBehavior actor" must {
+ "persist and update state" in {
val c = spawn(counter(nextPid()))
- val probe = TestProbe[Done]()
- c ! IncrementWithConfirmation(probe.ref)
- probe.expectMessage(Done)
+ val updateProbe = TestProbe[Done]()
+ val queryProbe = TestProbe[State]()
- c ! IncrementWithConfirmation(probe.ref)
- c ! IncrementWithConfirmation(probe.ref)
- probe.expectMessage(Done)
- probe.expectMessage(Done)
- }
+ c ! IncrementWithConfirmation(updateProbe.ref)
+ updateProbe.expectMessage(Done)
- "persist state thenReply later" in {
- val c = spawn(counter(nextPid()))
- val probe = TestProbe[Done]()
- c ! IncrementReplyLater(probe.ref)
- probe.expectMessage(Done)
- }
+ c ! GetValue(queryProbe.ref)
+ queryProbe.expectMessage(State(1))
- "reply to query command" in {
- val c = spawn(counter(nextPid()))
- val updateProbe = TestProbe[Done]()
+ c ! IncrementBy(5)
c ! IncrementWithConfirmation(updateProbe.ref)
+ updateProbe.expectMessage(Done)
- val queryProbe = TestProbe[State]()
c ! GetValue(queryProbe.ref)
- queryProbe.expectMessage(State(1))
+ queryProbe.expectMessage(State(7))
}
- "delete state thenReply" in {
+ "delete state" in {
val c = spawn(counter(nextPid()))
val updateProbe = TestProbe[Done]()
c ! IncrementWithConfirmation(updateProbe.ref)
@@ -137,5 +111,24 @@ class DurableStateBehaviorReplySpec
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(0))
}
+
+ "handle commands sequentially" in {
+ val c = spawn(counter(nextPid()))
+ val probe = TestProbe[Any]()
+
+ c ! IncrementWithConfirmation(probe.ref)
+ c ! IncrementWithConfirmation(probe.ref)
+ c ! IncrementWithConfirmation(probe.ref)
+ c ! GetValue(probe.ref)
+ probe.expectMessage(Done)
+ probe.expectMessage(Done)
+ probe.expectMessage(Done)
+ probe.expectMessage(State(3))
+
+ c ! DeleteWithConfirmation(probe.ref)
+ c ! GetValue(probe.ref)
+ probe.expectMessage(Done)
+ probe.expectMessage(State(0))
+ }
}
}
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
index 56e8fe8627..dcd52ff68a 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreInteractions.scala
@@ -68,13 +68,13 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
protected def internalDelete(
ctx: ActorContext[InternalProtocol],
- cmd: Any,
+ @nowarn("msg=never used") cmd: Any,
state: Running.RunningState[S, C]): Running.RunningState[S, C] = {
val newRunningState: Running.RunningState[S, C] =
state.nextRevision().copy(state = setup.emptyState)
val persistenceId = setup.persistenceId.id
- onDeleteInitiated(ctx, cmd)
+ // TODO Might need to call hook method for Telemetry
ctx.pipeToSelf[Done](setup.durableStateStore.deleteObject(persistenceId,
newRunningState.revision)) {
case Success(_) => InternalProtocol.DeleteSuccess
@@ -84,14 +84,11 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
newRunningState
}
- // FIXME These hook methods are for Telemetry. What more parameters are
needed? persistenceId?
+ // TODO These hook methods are for Telemetry. What more parameters are
needed? persistenceId?
@InternalStableApi
private[pekko] def onWriteInitiated(@nowarn("msg=never used") ctx:
ActorContext[_],
@nowarn("msg=never used") cmd: Any): Unit = ()
- private[pekko] def onDeleteInitiated(@nowarn("msg=never used") ctx:
ActorContext[_],
- @nowarn("msg=never used") cmd: Any): Unit = ()
-
protected def requestRecoveryPermit(): Unit = {
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit,
setup.selfClassic)
}
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
index 91d39fdad4..4a3f73bad8 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/Running.scala
@@ -195,6 +195,16 @@ private[pekko] object Running {
(persistingState(newState2, state, sideEffects), false)
}
+ private def handleDelete(
+ cmd: Any,
+ sideEffects: immutable.Seq[SideEffect[S]]):
(Behavior[InternalProtocol], Boolean) = {
+ _currentRevision = state.revision + 1
+
+ val nextState = internalDelete(setup.context, cmd, state)
+
+ (persistingState(nextState, state, sideEffects), false)
+ }
+
@tailrec def applyEffects(
msg: Any,
state: RunningState[S, C],
@@ -219,8 +229,7 @@ private[pekko] object Running {
(applySideEffects(sideEffects, state), true)
case _: Delete[_] =>
- val nextState = internalDelete(setup.context, msg, state)
- (applySideEffects(sideEffects, nextState), true)
+ handleDelete(msg, sideEffects)
case _: Unhandled.type =>
import pekko.actor.typed.scaladsl.adapter._
@@ -271,12 +280,12 @@ private[pekko] object Running {
case UpsertFailure(exc) => onUpsertFailed(exc)
case in: IncomingCommand[C @unchecked] => onCommand(in)
case get: GetState[S @unchecked] => stashInternal(get)
+ case DeleteSuccess => onDeleteSuccess()
+ case DeleteFailure(exc) => onDeleteFailed(exc)
case RecoveryTimeout => Behaviors.unhandled
case RecoveryPermitGranted => Behaviors.unhandled
case _: GetSuccess[_] => Behaviors.unhandled
case _: GetFailure => Behaviors.unhandled
- case DeleteSuccess => Behaviors.unhandled
- case DeleteFailure(_) => Behaviors.unhandled
}
}
@@ -308,6 +317,24 @@ private[pekko] object Running {
throw new DurableStateStoreException(setup.persistenceId,
currentRevision, cause)
}
+ final def onDeleteSuccess(): Behavior[InternalProtocol] = {
+ if (setup.internalLogger.isDebugEnabled) {
+ setup.internalLogger
+ .debug("Received DeleteSuccess response after: {} nanos",
System.nanoTime() - persistStartTime)
+ }
+
+ // TODO Might need to call hook method for Telemetry
+
+ visibleState = state
+ val newState = applySideEffects(sideEffects, state)
+ tryUnstashOne(newState)
+ }
+
+ final def onDeleteFailed(cause: Throwable): Behavior[InternalProtocol] = {
+ // TODO Might need to call hook method for Telemetry
+ throw new DurableStateStoreException(setup.persistenceId,
currentRevision, cause)
+ }
+
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]]
= {
case PoisonPill =>
// wait for store responses before stopping
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]