This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new e0abd889ab fix: stabilise PersistentActorRecoveryTimeoutSpec (#2906)
e0abd889ab is described below
commit e0abd889abf39f7fa7d543273e136ba08dce972f
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Apr 26 19:14:54 2026 +0800
fix: stabilise PersistentActorRecoveryTimeoutSpec (#2906)
Motivation:
PersistentActorRecoveryTimeoutSpec can flake under CI load when the
receive-timeout test reuses the same 3s recovery timeout path as the test that
intentionally times out recovery. The second stepped replay operation can race
the recovery timeout and leave SteppingInmemJournal.step waiting until its ask
timeout.
Modification:
Allow SteppingInmemJournal to read instance-id from the plugin config
passed to the test journal actor, while keeping the no-arg fallback for
existing tests. Use a separate stepping journal instance with a wider recovery
timeout for the receive-timeout scenario, consume the first RecoveryCompleted
signal, and release both replay tokens up front.
Result:
The recovery-timeout failure test remains a 3s timeout check, while the
receive-timeout test verifies successful recovery without racing that timeout.
---
.../PersistentActorRecoveryTimeoutSpec.scala | 32 +++++++++++++---------
.../persistence/journal/SteppingInmemJournal.scala | 8 ++++--
2 files changed, 25 insertions(+), 15 deletions(-)
diff --git
a/persistence/src/test/scala/org/apache/pekko/persistence/PersistentActorRecoveryTimeoutSpec.scala
b/persistence/src/test/scala/org/apache/pekko/persistence/PersistentActorRecoveryTimeoutSpec.scala
index 2573ccd6b1..bc466701b7 100644
---
a/persistence/src/test/scala/org/apache/pekko/persistence/PersistentActorRecoveryTimeoutSpec.scala
+++
b/persistence/src/test/scala/org/apache/pekko/persistence/PersistentActorRecoveryTimeoutSpec.scala
@@ -19,18 +19,23 @@ import org.apache.pekko
import pekko.actor.{ Actor, ActorLogging, ActorRef, Props }
import pekko.actor.Status.Failure
import pekko.persistence.journal.SteppingInmemJournal
-import pekko.testkit.{ ImplicitSender, PekkoSpec, TestProbe }
+import pekko.testkit.{ ImplicitSender, PekkoSpec, TestDuration, TestProbe }
import com.typesafe.config.ConfigFactory
object PersistentActorRecoveryTimeoutSpec {
val journalId = "persistent-actor-recovery-timeout-spec"
+ val receiveTimeoutJournalId =
"persistent-actor-recovery-timeout-spec-receive-timeout"
+ val receiveTimeoutJournalPluginId =
"pekko.persistence.journal.stepping-inmem-receive-timeout"
def config =
SteppingInmemJournal
.config(PersistentActorRecoveryTimeoutSpec.journalId)
- .withFallback(ConfigFactory.parseString("""
+ .withFallback(ConfigFactory.parseString(s"""
|pekko.persistence.journal.stepping-inmem.recovery-event-timeout=3s
+
|$receiveTimeoutJournalPluginId.class=${classOf[SteppingInmemJournal].getName}
+
|$receiveTimeoutJournalPluginId.instance-id="$receiveTimeoutJournalId"
+ |$receiveTimeoutJournalPluginId.recovery-event-timeout=30s
""".stripMargin))
.withFallback(PersistenceSpec.config("stepping-inmem",
"PersistentActorRecoveryTimeoutSpec"))
@@ -53,6 +58,8 @@ object PersistentActorRecoveryTimeoutSpec {
extends NamedPersistentActor("recovery-timeout-actor-2")
with ActorLogging {
+ override def journalPluginId: String = receiveTimeoutJournalPluginId
+
override def preStart(): Unit = {
context.setReceiveTimeout(receiveTimeout)
}
@@ -81,7 +88,7 @@ class PersistentActorRecoveryTimeoutSpec
extends PekkoSpec(PersistentActorRecoveryTimeoutSpec.config)
with ImplicitSender {
- import PersistentActorRecoveryTimeoutSpec.journalId
+ import PersistentActorRecoveryTimeoutSpec.{ journalId,
receiveTimeoutJournalId }
"The recovery timeout" should {
@@ -127,11 +134,12 @@ class PersistentActorRecoveryTimeoutSpec
val persisting =
system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestReceiveTimeoutActor],
timeout, probe.ref))
- awaitAssert(SteppingInmemJournal.getRef(journalId), 3.seconds)
- val journal = SteppingInmemJournal.getRef(journalId)
+ awaitAssert(SteppingInmemJournal.getRef(receiveTimeoutJournalId),
3.seconds)
+ val journal = SteppingInmemJournal.getRef(receiveTimeoutJournalId)
// initial read highest
SteppingInmemJournal.step(journal)
+ probe.expectMsg(timeout)
persisting ! "A"
SteppingInmemJournal.step(journal)
@@ -141,18 +149,16 @@ class PersistentActorRecoveryTimeoutSpec
system.stop(persisting)
expectTerminated(persisting)
- // now replay, but don't give the journal any tokens to replay events
- // so that we cause the timeout to trigger
+ // now replay and verify that recovery keeps the actor's receive timeout
system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestReceiveTimeoutActor],
timeout, probe.ref))
- // initial read highest
- SteppingInmemJournal.step(journal)
-
- // read journal
- SteppingInmemJournal.step(journal)
+ // Release both recovery journal operations up front. Waiting for the
second stepped
+ // operation can race with the recovery timeout under heavy CI load.
+ journal ! SteppingInmemJournal.Token
+ journal ! SteppingInmemJournal.Token
// we should get initial receive timeout back from actor when replay
completes
- probe.expectMsg(timeout)
+ probe.expectMsg(30.seconds.dilated, timeout)
}
diff --git
a/persistence/src/test/scala/org/apache/pekko/persistence/journal/SteppingInmemJournal.scala
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/SteppingInmemJournal.scala
index 295b8cf8fa..9ff0473f8e 100644
---
a/persistence/src/test/scala/org/apache/pekko/persistence/journal/SteppingInmemJournal.scala
+++
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/SteppingInmemJournal.scala
@@ -72,12 +72,16 @@ object SteppingInmemJournal {
* it using {{{SteppingInmemJournal.getRef(String)}}}, send it
{{{SteppingInmemJournal.Token}}}s to
* allow one journal operation to complete.
*/
-final class SteppingInmemJournal extends InmemJournal {
+final class SteppingInmemJournal(config: Config) extends InmemJournal {
+
+ def this() = this(ConfigFactory.empty)
import SteppingInmemJournal._
import context.dispatcher
- val instanceId =
context.system.settings.config.getString("pekko.persistence.journal.stepping-inmem.instance-id")
+ val instanceId =
+ if (config.hasPath("instance-id")) config.getString("instance-id")
+ else
context.system.settings.config.getString("pekko.persistence.journal.stepping-inmem.instance-id")
var queuedOps: Seq[() => Future[Unit]] = Seq.empty
var queuedTokenRecipients = List.empty[ActorRef]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]