This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch preRestart in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 1695e8e24d77e5ced87a66b60bc4bcb4dcb1e5fb Author: He-Pin <[email protected]> AuthorDate: Sun Mar 22 17:39:54 2026 +0800 refactor: finalize preRestart method in AbstractActor and add stash support trait --- .../pekko/actor/StashJavaAPITestUntypedActors.java | 2 +- .../actor/AbstractActorPreRestartFinalTest.java | 67 ++++++++++ .../apache/pekko/actor/StashJavaAPITestActors.java | 2 +- .../actor/AbstractActorPreRestartFinalSpec.scala | 147 +++++++++++++++++++++ .../org/apache/pekko/actor/AbstractActor.scala | 46 +++++-- .../org/apache/pekko/persistence/Persistence.scala | 4 +- 6 files changed, 254 insertions(+), 14 deletions(-) diff --git a/actor-tests/src/test/java-jdk21-only/org/apache/pekko/actor/StashJavaAPITestUntypedActors.java b/actor-tests/src/test/java-jdk21-only/org/apache/pekko/actor/StashJavaAPITestUntypedActors.java index 8a2f47ab79..31ca292779 100644 --- a/actor-tests/src/test/java-jdk21-only/org/apache/pekko/actor/StashJavaAPITestUntypedActors.java +++ b/actor-tests/src/test/java-jdk21-only/org/apache/pekko/actor/StashJavaAPITestUntypedActors.java @@ -16,7 +16,7 @@ package org.apache.pekko.actor; import static org.junit.jupiter.api.Assertions.assertEquals; public class StashJavaAPITestUntypedActors { - private static int testReceive(Object msg, int count, ActorRef sender, ActorRef self, UnrestrictedStash stash) { + private static int testReceive(Object msg, int count, ActorRef sender, ActorRef self, StashSupport stash) { switch (msg) { case String s when count < 0 -> { sender.tell(s.length(), self); diff --git a/actor-tests/src/test/java/org/apache/pekko/actor/AbstractActorPreRestartFinalTest.java b/actor-tests/src/test/java/org/apache/pekko/actor/AbstractActorPreRestartFinalTest.java new file mode 100644 index 0000000000..658e2bde01 --- /dev/null +++ b/actor-tests/src/test/java/org/apache/pekko/actor/AbstractActorPreRestartFinalTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.actor; + +import static org.junit.jupiter.api.Assertions.*; + +import java.lang.reflect.Modifier; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +public class AbstractActorPreRestartFinalTest { + + @Test + public void preRestartWithScalaOptionShouldBeFinal() throws NoSuchMethodException { + var method = + AbstractActor.class.getDeclaredMethod("preRestart", Throwable.class, scala.Option.class); + assertTrue( + Modifier.isFinal(method.getModifiers()), + "preRestart(Throwable, Option) should be final in AbstractActor"); + } + + @Test + public void preRestartWithJavaOptionalShouldNotBeFinal() throws NoSuchMethodException { + var method = + AbstractActor.class.getDeclaredMethod("preRestart", Throwable.class, Optional.class); + assertFalse( + Modifier.isFinal(method.getModifiers()), + "preRestart(Throwable, Optional) should NOT be final in AbstractActor"); + } + + @Test + public void untypedAbstractActorShouldInheritFinalPreRestart() throws NoSuchMethodException { + var method = + UntypedAbstractActor.class.getMethod("preRestart", Throwable.class, scala.Option.class); + assertTrue( + Modifier.isFinal(method.getModifiers()), + "preRestart(Throwable, Option) should be final in UntypedAbstractActor"); + } + + @Test + public void abstractActorWithStashShouldExtendStashSupport() { + assertTrue( + StashSupport.class.isAssignableFrom(AbstractActorWithStash.class), + "AbstractActorWithStash should implement StashSupport"); + } + + @Test + public void abstractActorWithUnboundedStashShouldExtendStashSupport() { + assertTrue( + StashSupport.class.isAssignableFrom(AbstractActorWithUnboundedStash.class), + "AbstractActorWithUnboundedStash should implement StashSupport"); + } + + @Test + public void abstractActorWithUnrestrictedStashShouldExtendStashSupport() { + assertTrue( + StashSupport.class.isAssignableFrom(AbstractActorWithUnrestrictedStash.class), + "AbstractActorWithUnrestrictedStash should implement StashSupport"); + } +} diff --git a/actor-tests/src/test/java/org/apache/pekko/actor/StashJavaAPITestActors.java b/actor-tests/src/test/java/org/apache/pekko/actor/StashJavaAPITestActors.java index 18a775f43a..3fc9df6f08 100644 --- a/actor-tests/src/test/java/org/apache/pekko/actor/StashJavaAPITestActors.java +++ b/actor-tests/src/test/java/org/apache/pekko/actor/StashJavaAPITestActors.java @@ -22,7 +22,7 @@ public class StashJavaAPITestActors { * AbstractActorWithUnrestrictedStash more DRY since mixin is not possible. */ private static int testReceive( - Object msg, int count, ActorRef sender, ActorRef self, UnrestrictedStash stash) { + Object msg, int count, ActorRef sender, ActorRef self, StashSupport stash) { if (msg instanceof String s) { if (count < 0) { sender.tell(s.length(), self); diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/AbstractActorPreRestartFinalSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/AbstractActorPreRestartFinalSpec.scala new file mode 100644 index 0000000000..aa87c45813 --- /dev/null +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/AbstractActorPreRestartFinalSpec.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.actor + +import java.util.Optional + +import scala.concurrent.duration._ +import scala.runtime.BoxedUnit + +import org.apache.pekko.testkit.{ ImplicitSender, PekkoSpec, TestProbe } + +object AbstractActorPreRestartFinalSpec { + + class PreRestartException extends Exception("test-prerestart") with scala.util.control.NoStackTrace + + /** + * An AbstractActor subclass that overrides preRestart(Throwable, Optional[Any]) + * to verify it is called correctly via the final bridge method. + */ + class JavaStyleActor(probe: ActorRef) extends AbstractActor { + override def createReceive(): AbstractActor.Receive = { + val pf: PartialFunction[Any, BoxedUnit] = { + case "crash" => throw new PreRestartException + case msg => probe ! msg; BoxedUnit.UNIT + } + new AbstractActor.Receive(pf) + } + + override def preRestart(reason: Throwable, message: Optional[Any]): Unit = { + probe ! ("preRestart-Optional", reason.getMessage, message.isPresent) + super.preRestart(reason, message) + } + } + + /** + * An AbstractActorWithStash subclass to verify stash unstashing still works on preRestart. + */ + class StashActorWithPreRestart(probe: ActorRef) extends AbstractActorWithStash { + override def createReceive(): AbstractActor.Receive = { + val pf: PartialFunction[Any, BoxedUnit] = { + case "stash" => stash(); BoxedUnit.UNIT + case "unstash" => unstashAll(); BoxedUnit.UNIT + case "crash" => throw new PreRestartException + case msg => probe ! msg; BoxedUnit.UNIT + } + new AbstractActor.Receive(pf) + } + + override def preRestart(reason: Throwable, message: Optional[Any]): Unit = { + probe ! "preRestart-stash" + super.preRestart(reason, message) + } + } +} + +class AbstractActorPreRestartFinalSpec extends PekkoSpec with ImplicitSender { + import AbstractActorPreRestartFinalSpec._ + + "AbstractActor.preRestart(Throwable, Option[Any])" must { + + "be declared final via reflection" in { + val method = classOf[AbstractActor].getDeclaredMethod( + "preRestart", classOf[Throwable], classOf[Option[_]]) + assert( + java.lang.reflect.Modifier.isFinal(method.getModifiers), + "preRestart(Throwable, Option[Any]) should be final in AbstractActor") + } + + "delegate to preRestart(Throwable, Optional[Any]) which is overridable" in { + val probe = TestProbe() + val supervisor = system.actorOf(Props(new Actor { + val child = context.actorOf(Props(new JavaStyleActor(probe.ref))) + override val supervisorStrategy = OneForOneStrategy() { + case _: PreRestartException => SupervisorStrategy.Restart + } + def receive = { + case msg => child.forward(msg) + } + })) + + supervisor ! "crash" + val (label, msg, hasMsg) = probe.expectMsgType[(String, String, Boolean)](3.seconds) + label shouldBe "preRestart-Optional" + msg shouldBe "test-prerestart" + hasMsg shouldBe true + } + + "not be overridable in AbstractActor subclass (preRestart with Optional is the extension point)" in { + // Verify that preRestart(Throwable, Optional[Any]) is NOT final + val optionalMethod = classOf[AbstractActor].getDeclaredMethod( + "preRestart", classOf[Throwable], classOf[Optional[_]]) + assert( + !java.lang.reflect.Modifier.isFinal(optionalMethod.getModifiers), + "preRestart(Throwable, Optional[Any]) should NOT be final in AbstractActor") + } + } + + "AbstractActorWithStash" must { + + "call overridden preRestart(Throwable, Optional[Any]) and unstash on restart" in { + val probe = TestProbe() + val supervisor = system.actorOf(Props(new Actor { + val child = context.actorOf(Props(new StashActorWithPreRestart(probe.ref))) + override val supervisorStrategy = OneForOneStrategy() { + case _: PreRestartException => SupervisorStrategy.Restart + } + def receive = { + case msg => child.forward(msg) + } + })) + + supervisor ! "stash" + supervisor ! "crash" + probe.expectMsg(3.seconds, "preRestart-stash") + } + + "still extend StashSupport (stash/unstash available)" in { + assert( + classOf[StashSupport].isAssignableFrom(classOf[AbstractActorWithStash]), + "AbstractActorWithStash should extend StashSupport") + assert( + classOf[StashSupport].isAssignableFrom(classOf[AbstractActorWithUnboundedStash]), + "AbstractActorWithUnboundedStash should extend StashSupport") + assert( + classOf[StashSupport].isAssignableFrom(classOf[AbstractActorWithUnrestrictedStash]), + "AbstractActorWithUnrestrictedStash should extend StashSupport") + } + } + + "UntypedAbstractActor" must { + + "also have final preRestart(Throwable, Option[Any]) (inherited from AbstractActor)" in { + val method = classOf[UntypedAbstractActor].getMethod( + "preRestart", classOf[Throwable], classOf[Option[_]]) + assert( + java.lang.reflect.Modifier.isFinal(method.getModifiers), + "preRestart(Throwable, Option[Any]) should be final in UntypedAbstractActor") + } + } +} diff --git a/actor/src/main/scala/org/apache/pekko/actor/AbstractActor.scala b/actor/src/main/scala/org/apache/pekko/actor/AbstractActor.scala index 705ee24f1e..b5f0ad1ec4 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/AbstractActor.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/AbstractActor.scala @@ -15,13 +15,17 @@ package org.apache.pekko.actor import java.util.Optional -import scala.annotation.nowarn import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.runtime.BoxedUnit import org.apache.pekko import pekko.annotation.DoNotInherit +import pekko.dispatch.{ + DequeBasedMessageQueueSemantics, + RequiresMessageQueue, + UnboundedDequeBasedMessageQueueSemantics +} import pekko.japi.pf.ReceiveBuilder /** @@ -281,11 +285,8 @@ abstract class AbstractActor extends Actor { @throws(classOf[Exception]) override def postStop(): Unit = super.postStop() - // TODO In Pekko 1.1.0, we can remove deprecation and make the method final - @deprecated("Override preRestart with message parameter with Optional type instead", "Akka 2.5.0") @throws(classOf[Exception]) - @nowarn("msg=deprecated") - override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + final override def preRestart(reason: Throwable, message: Option[Any]): Unit = { import scala.jdk.OptionConverters._ preRestart(reason, message.toJava) } @@ -375,6 +376,25 @@ abstract class AbstractLoggingActor extends AbstractActor with ActorLogging */ abstract class UntypedAbstractLoggingActor extends UntypedAbstractActor with ActorLogging +/** + * INTERNAL API + * + * Common trait for AbstractActor variants with stash support. + * Handles unstashing messages on preRestart and postStop. + */ +private[pekko] trait AbstractActorStashSupport extends AbstractActor with StashSupport { + @throws(classOf[Exception]) + override def preRestart(reason: Throwable, message: Optional[Any]): Unit = { + try unstashAll() + finally super.preRestart(reason, message) + } + @throws(classOf[Exception]) + override def postStop(): Unit = { + try unstashAll() + finally super.postStop() + } +} + /** * Java API: compatible with lambda expressions * @@ -418,7 +438,8 @@ abstract class UntypedAbstractLoggingActor extends UntypedAbstractActor with Act * There is also an unrestricted version [[pekko.actor.AbstractActorWithUnrestrictedStash]] that does not * enforce the mailbox type. */ -abstract class AbstractActorWithStash extends AbstractActor with Stash +abstract class AbstractActorWithStash extends AbstractActor with AbstractActorStashSupport + with RequiresMessageQueue[DequeBasedMessageQueueSemantics] /** * Java API: compatible with lambda expressions @@ -429,7 +450,8 @@ abstract class AbstractActorWithStash extends AbstractActor with Stash * * @since 2.0.0 */ -abstract class UntypedAbstractActorWithStash extends UntypedAbstractActor with Stash +abstract class UntypedAbstractActorWithStash extends UntypedAbstractActor with AbstractActorStashSupport + with RequiresMessageQueue[DequeBasedMessageQueueSemantics] /** * Java API: compatible with lambda expressions @@ -438,7 +460,8 @@ abstract class UntypedAbstractActorWithStash extends UntypedAbstractActor with S * manually, and the mailbox should extend the [[pekko.dispatch.DequeBasedMessageQueueSemantics]] marker trait. * See [[pekko.actor.AbstractActorWithStash]] for details on how `Stash` works. */ -abstract class AbstractActorWithUnboundedStash extends AbstractActor with UnboundedStash +abstract class AbstractActorWithUnboundedStash extends AbstractActor with AbstractActorStashSupport + with RequiresMessageQueue[UnboundedDequeBasedMessageQueueSemantics] /** * Java API: compatible with lambda expressions @@ -449,7 +472,8 @@ abstract class AbstractActorWithUnboundedStash extends AbstractActor with Unboun * * @since 2.0.0 */ -abstract class UntypedAbstractActorWithUnboundedStash extends UntypedAbstractActor with UnboundedStash +abstract class UntypedAbstractActorWithUnboundedStash extends UntypedAbstractActor with AbstractActorStashSupport + with RequiresMessageQueue[UnboundedDequeBasedMessageQueueSemantics] /** * Java API: compatible with lambda expressions @@ -457,7 +481,7 @@ abstract class UntypedAbstractActorWithUnboundedStash extends UntypedAbstractAct * Actor base class with `Stash` that does not enforce any mailbox type. The mailbox of the actor has to be configured * manually. See [[pekko.actor.AbstractActorWithStash]] for details on how `Stash` works. */ -abstract class AbstractActorWithUnrestrictedStash extends AbstractActor with UnrestrictedStash +abstract class AbstractActorWithUnrestrictedStash extends AbstractActor with AbstractActorStashSupport /** * Java API: compatible with lambda expressions @@ -467,4 +491,4 @@ abstract class AbstractActorWithUnrestrictedStash extends AbstractActor with Unr * * @since 2.0.0 */ -abstract class UntypedAbstractActorWithUnrestrictedStash extends UntypedAbstractActor with UnrestrictedStash +abstract class UntypedAbstractActorWithUnrestrictedStash extends UntypedAbstractActor with AbstractActorStashSupport diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/Persistence.scala b/persistence/src/main/scala/org/apache/pekko/persistence/Persistence.scala index 4f2bd2d57a..4e4b608d07 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/Persistence.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/Persistence.scala @@ -25,6 +25,7 @@ import org.apache.pekko import pekko.actor._ import pekko.annotation.InternalApi import pekko.annotation.InternalStableApi +import pekko.dispatch.{ DequeBasedMessageQueueSemantics, RequiresMessageQueue } import pekko.event.{ Logging, LoggingAdapter } import pekko.japi.Pair import pekko.persistence.journal.{ EventAdapters, IdentityEventAdapters } @@ -114,7 +115,8 @@ trait PersistenceRecovery { // #persistence-recovery } -trait PersistenceStash extends Stash with StashFactory { +trait PersistenceStash extends Actor with StashSupport with RequiresMessageQueue[DequeBasedMessageQueueSemantics] + with StashFactory { /** * The returned [[pekko.persistence.StashOverflowStrategy]] object determines how to handle the message failed to stash --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
