This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e2c1fe78d fix(stream): preserve abrupt fanout publisher shutdown 
signal (#2910)
4e2c1fe78d is described below

commit 4e2c1fe78d68d696d2110461d56805360643805f
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Apr 26 21:34:20 2026 +0800

    fix(stream): preserve abrupt fanout publisher shutdown signal (#2910)
    
    Motivation:
    FanoutPublisherBridgeStage.postStop always completed the exposed publisher 
with ActorPublisher.NormalShutdownReason, even when the stage was stopped 
abruptly without an upstream or subscriber terminal signal. That made active 
and late subscribers observe a normal shutdown instead of the GraphStage abrupt 
termination failure.
    
    Modification:
    Track whether the bridge is stopping from an intentional terminal path. 
Only synthesize AbruptStageTerminationException from postStop when no terminal 
path was already signalled, and add a bridge-level regression test for active 
and late subscribers.
    
    Result:
    Controlled completion, failure, timeout, and last-subscriber cancellation 
keep their existing reasons, while abrupt bridge shutdown is now reported as 
AbruptStageTerminationException.
    
    References:
    apache/pekko#2874
---
 .../stream/impl/FanoutPublisherBehaviorSpec.scala  | 30 +++++++++++++++++++---
 .../stream/impl/FanoutPublisherBridgeStage.scala   | 11 ++++++--
 2 files changed, 36 insertions(+), 5 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
index d7e17ba530..884db05a05 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
@@ -22,14 +22,19 @@ import scala.concurrent.duration._
 
 import org.apache.pekko
 import pekko.testkit.EventFilter
-import pekko.stream.Attributes
-import pekko.stream.ActorAttributes
-import pekko.stream.StreamSubscriptionTimeoutTerminationMode
+import pekko.stream.{
+  AbruptStageTerminationException,
+  ActorAttributes,
+  Attributes,
+  Materializer,
+  StreamSubscriptionTimeoutTerminationMode
+}
 import pekko.stream.scaladsl.Keep
 import pekko.stream.scaladsl.Sink
 import pekko.stream.scaladsl.Source
 import pekko.stream.testkit.TestPublisher
 import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.stream.testkit.Utils.TE
 
@@ -226,4 +231,23 @@ class FanoutPublisherBehaviorSpec extends StreamSpec {
 
   }
 
+  "FanoutPublisherBridgeStage" must {
+
+    "fail active and late subscribers on abrupt materializer shutdown" in {
+      val mat = Materializer(system)
+      val publisher = Source.maybe[Int].runWith(Sink.fromGraph(new 
FanoutPublisherBridgeStage[Int]))(mat)
+      val subscriber = TestSubscriber.manualProbe[Int]()
+
+      publisher.subscribe(subscriber)
+      subscriber.expectSubscription()
+      mat.shutdown()
+
+      subscriber.expectError() shouldBe an[AbruptStageTerminationException]
+      val lateSubscriber = TestSubscriber.manualProbe[Int]()
+      publisher.subscribe(lateSubscriber)
+      lateSubscriber.expectSubscriptionAndError() shouldBe 
an[AbruptStageTerminationException]
+    }
+
+  }
+
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
index 06eae4ed19..187dc2bb43 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
@@ -68,6 +68,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
 
       private var everSubscribed = false
       private var requestedFromUpstream = 0L
+      private var terminationSignalled = false
 
       private val requestCallback = 
getAsyncCallback[(FanoutPublisherBridgeSubscription[T], Long)] {
         case (subscription, elements) =>
@@ -103,6 +104,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
               materializer.logger.warning(
                 "Subscription timeout expired for [{}], no subscriber attached 
in time",
                 materializedPublisher)
+              terminationSignalled = true
               materializedPublisher.shutdown(Some(ex))
               failStage(ex)
             case WarnTermination =>
@@ -120,6 +122,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
         if (!isClosed(in)) cancel(in)
 
       override protected def shutdown(completed: Boolean): Unit = {
+        terminationSignalled = true
         materializedPublisher.shutdown(if (completed) None else 
ActorPublisher.SomeNormalShutdownReason)
         completeStage()
       }
@@ -142,14 +145,18 @@ import org.reactivestreams.{ Publisher, Subscriber }
         completeDownstream()
 
       override def onUpstreamFailure(ex: Throwable): Unit = {
+        terminationSignalled = true
         abortDownstream(ex)
         materializedPublisher.shutdown(Some(ex))
         failStage(ex)
       }
 
       override def postStop(): Unit =
-        try abortDownstream(ActorPublisher.NormalShutdownReason)
-        finally 
materializedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
+        if (!terminationSignalled) {
+          val ex = new AbruptStageTerminationException(this)
+          try abortDownstream(ex)
+          finally materializedPublisher.shutdown(Some(ex))
+        }
 
       private def tryPullIfNeeded(): Unit =
         if (requestedFromUpstream > 0 && !hasBeenPulled(in) && !isClosed(in)) 
pull(in)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to