This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 4e2c1fe78d fix(stream): preserve abrupt fanout publisher shutdown
signal (#2910)
4e2c1fe78d is described below
commit 4e2c1fe78d68d696d2110461d56805360643805f
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Apr 26 21:34:20 2026 +0800
fix(stream): preserve abrupt fanout publisher shutdown signal (#2910)
Motivation:
FanoutPublisherBridgeStage.postStop always completed the exposed publisher
with ActorPublisher.NormalShutdownReason, even when the stage was stopped
abruptly without an upstream or subscriber terminal signal. That made active
and late subscribers observe a normal shutdown instead of the GraphStage abrupt
termination failure.
Modification:
Track whether the bridge is stopping from an intentional terminal path.
Only synthesize AbruptStageTerminationException from postStop when no terminal
path was already signalled, and add a bridge-level regression test for active
and late subscribers.
Result:
Controlled completion, failure, timeout, and last-subscriber cancellation
keep their existing reasons, while abrupt bridge shutdown is now reported as
AbruptStageTerminationException.
References:
apache/pekko#2874
---
.../stream/impl/FanoutPublisherBehaviorSpec.scala | 30 +++++++++++++++++++---
.../stream/impl/FanoutPublisherBridgeStage.scala | 11 ++++++--
2 files changed, 36 insertions(+), 5 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
index d7e17ba530..884db05a05 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
@@ -22,14 +22,19 @@ import scala.concurrent.duration._
import org.apache.pekko
import pekko.testkit.EventFilter
-import pekko.stream.Attributes
-import pekko.stream.ActorAttributes
-import pekko.stream.StreamSubscriptionTimeoutTerminationMode
+import pekko.stream.{
+ AbruptStageTerminationException,
+ ActorAttributes,
+ Attributes,
+ Materializer,
+ StreamSubscriptionTimeoutTerminationMode
+}
import pekko.stream.scaladsl.Keep
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.testkit.TestPublisher
import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.Utils.TE
@@ -226,4 +231,23 @@ class FanoutPublisherBehaviorSpec extends StreamSpec {
}
+ "FanoutPublisherBridgeStage" must {
+
+ "fail active and late subscribers on abrupt materializer shutdown" in {
+ val mat = Materializer(system)
+ val publisher = Source.maybe[Int].runWith(Sink.fromGraph(new
FanoutPublisherBridgeStage[Int]))(mat)
+ val subscriber = TestSubscriber.manualProbe[Int]()
+
+ publisher.subscribe(subscriber)
+ subscriber.expectSubscription()
+ mat.shutdown()
+
+ subscriber.expectError() shouldBe an[AbruptStageTerminationException]
+ val lateSubscriber = TestSubscriber.manualProbe[Int]()
+ publisher.subscribe(lateSubscriber)
+ lateSubscriber.expectSubscriptionAndError() shouldBe
an[AbruptStageTerminationException]
+ }
+
+ }
+
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
index 06eae4ed19..187dc2bb43 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
@@ -68,6 +68,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
private var everSubscribed = false
private var requestedFromUpstream = 0L
+ private var terminationSignalled = false
private val requestCallback =
getAsyncCallback[(FanoutPublisherBridgeSubscription[T], Long)] {
case (subscription, elements) =>
@@ -103,6 +104,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
materializer.logger.warning(
"Subscription timeout expired for [{}], no subscriber attached
in time",
materializedPublisher)
+ terminationSignalled = true
materializedPublisher.shutdown(Some(ex))
failStage(ex)
case WarnTermination =>
@@ -120,6 +122,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
if (!isClosed(in)) cancel(in)
override protected def shutdown(completed: Boolean): Unit = {
+ terminationSignalled = true
materializedPublisher.shutdown(if (completed) None else
ActorPublisher.SomeNormalShutdownReason)
completeStage()
}
@@ -142,14 +145,18 @@ import org.reactivestreams.{ Publisher, Subscriber }
completeDownstream()
override def onUpstreamFailure(ex: Throwable): Unit = {
+ terminationSignalled = true
abortDownstream(ex)
materializedPublisher.shutdown(Some(ex))
failStage(ex)
}
override def postStop(): Unit =
- try abortDownstream(ActorPublisher.NormalShutdownReason)
- finally
materializedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
+ if (!terminationSignalled) {
+ val ex = new AbruptStageTerminationException(this)
+ try abortDownstream(ex)
+ finally materializedPublisher.shutdown(Some(ex))
+ }
private def tryPullIfNeeded(): Unit =
if (requestedFromUpstream > 0 && !hasBeenPulled(in) && !isClosed(in))
pull(in)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]