This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new f5c37ba255 feat(stream): replace fanout publisher runtime with 
GraphStage bridge (#2874)
f5c37ba255 is described below

commit f5c37ba2552fd637c09a3cf6498c92e113e705b1
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Apr 20 20:41:50 2026 +0800

    feat(stream): replace fanout publisher runtime with GraphStage bridge 
(#2874)
    
    * feat(stream): replace fanout publisher runtime with GraphStage bridge
    
    Motivation:
    Sink.asPublisher(fanout = true) still depended on the legacy actor-backed 
FanoutProcessorImpl runtime, which kept issue #2860 blocked on old processor 
infrastructure and implementation-bound tests.
    
    Modification:
    Route FanoutPublisherSink through a new FanoutPublisherBridgeStage, delete 
the legacy FanoutProcessor implementation, replace the old actor-bound spec 
with FanoutPublisherBehaviorSpec, and add the matching 2.0.x MiMa excludes for 
the removed binary-visible classes.
    
    Result:
    The fanout publisher path now runs on a dedicated GraphStage bridge with 
the existing terminal-signal contract preserved, broader behavior coverage 
added, and compile/MiMa/TCK validation passing.
    
    References:
    apache/pekko#2860
    
    Co-authored-by: Copilot <[email protected]>
    
    * chore(stream): format fanout publisher bridge for CI
    
    Motivation:
    GitHub's scalafmt diff-ref job flagged FanoutPublisherBridgeStage.scala on 
PR #2874.
    
    Modification:
    Applied scalafmt formatting to FanoutPublisherBridgeStage.scala without 
changing behavior.
    
    Result:
    The fanout bridge file now matches the repository's CI formatting 
expectations.
    
    References:
    apache/pekko#2860
    apache/pekko#2874
---
 .../pekko/stream/impl/FanoutProcessorSpec.scala    | 144 -------------
 .../stream/impl/FanoutPublisherBehaviorSpec.scala  | 229 ++++++++++++++++++++
 .../remove-fanout-processor.excludes               |  21 ++
 .../apache/pekko/stream/impl/FanoutProcessor.scala | 188 ----------------
 .../stream/impl/FanoutPublisherBridgeStage.scala   | 240 +++++++++++++++++++++
 .../scala/org/apache/pekko/stream/impl/Sinks.scala |  12 +-
 6 files changed, 497 insertions(+), 337 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
deleted file mode 100644
index 6880fcc4e1..0000000000
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.stream.impl
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import org.apache.pekko
-import pekko.stream.ActorAttributes
-import pekko.stream.StreamSubscriptionTimeoutTerminationMode
-import pekko.stream.scaladsl.Keep
-import pekko.stream.scaladsl.Sink
-import pekko.stream.scaladsl.Source
-import pekko.stream.testkit.StreamSpec
-import pekko.stream.testkit.Utils.TE
-import pekko.testkit.TestProbe
-
-class FanoutProcessorSpec extends StreamSpec {
-
-  "The FanoutProcessor" must {
-
-    // #25634
-    "not leak running actors on failed upstream without subscription" in {
-      val probe = TestProbe()
-      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
-      val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
-      probe.watch(publisherRef)
-      promise.failure(TE("boom"))
-      probe.expectTerminated(publisherRef)
-    }
-
-    // #25634
-    "not leak running actors on failed upstream with one subscription" in {
-      val probe = TestProbe()
-      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
-      val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
-      Source.fromPublisher(publisher).runWith(Sink.ignore)
-      probe.watch(publisherRef)
-      val boom = TE("boom")
-      promise.failure(boom)
-      probe.expectTerminated(publisherRef)
-    }
-
-    // #25634
-    "not leak running actors on failed upstream with multiple subscriptions" 
in {
-      val probe = TestProbe()
-      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
-      val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
-      probe.watch(publisherRef)
-      Source.fromPublisher(publisher).runWith(Sink.ignore)
-      Source.fromPublisher(publisher).runWith(Sink.ignore)
-      val boom = TE("boom")
-      promise.failure(boom)
-      probe.expectTerminated(publisherRef)
-    }
-
-    "not leak running actors on completed upstream no subscriptions" in {
-      val probe = TestProbe()
-      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
-      val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
-      probe.watch(publisherRef)
-      promise.success(None)
-
-      probe.expectTerminated(publisherRef)
-    }
-
-    "not leak running actors on completed upstream with one subscription" in {
-      val probe = TestProbe()
-      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
-      val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
-      val completed = Source.fromPublisher(publisher).runWith(Sink.ignore)
-      probe.watch(publisherRef)
-
-      promise.success(None)
-
-      probe.expectTerminated(publisherRef)
-      // would throw if not completed
-      completed.futureValue
-    }
-
-    "not leak running actors on completed upstream with multiple 
subscriptions" in {
-      val probe = TestProbe()
-      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
-      val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
-      val completed1 = Source.fromPublisher(publisher).runWith(Sink.ignore)
-      val completed2 = Source.fromPublisher(publisher).runWith(Sink.ignore)
-      probe.watch(publisherRef)
-      promise.success(None)
-
-      probe.expectTerminated(publisherRef)
-      // would throw if not completed
-      completed1.futureValue
-      completed2.futureValue
-    }
-
-    "not leak running actors on failed downstream" in {
-      val probe = TestProbe()
-      val (_, publisher) = 
Source.repeat(1).toMat(Sink.asPublisher(true))(Keep.both).run()
-      val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
-      probe.watch(publisherRef)
-      Source.fromPublisher(publisher).map(_ => throw 
TE("boom")).runWith(Sink.ignore)
-      probe.expectTerminated(publisherRef)
-    }
-
-    // #2645
-    "fail with SubscriptionTimeoutException instead of 
AbruptTerminationException on subscriber timeout" in {
-      val shortTimeout = 300.millis
-      val timeoutAttributes = ActorAttributes.streamSubscriptionTimeout(
-        shortTimeout, 
StreamSubscriptionTimeoutTerminationMode.CancelTermination)
-
-      val (_, publisher) = Source.maybe[Int]
-        
.toMat(Sink.asPublisher(true).addAttributes(timeoutAttributes))(Keep.both)
-        .run()
-
-      // Do NOT subscribe — let the timeout fire
-      val probe = TestProbe()
-      val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
-      probe.watch(publisherRef)
-
-      // The actor should terminate after the subscription timeout
-      probe.expectTerminated(publisherRef, shortTimeout + 3.seconds)
-
-      // Now try to subscribe after timeout and verify the error type
-      val result = Source.fromPublisher(publisher).runWith(Sink.head)
-      val ex = intercept[SubscriptionTimeoutException] {
-        Await.result(result, 3.seconds)
-      }
-      ex.getMessage should include("Subscription timeout expired")
-    }
-
-  }
-
-}
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
new file mode 100644
index 0000000000..d7e17ba530
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.testkit.EventFilter
+import pekko.stream.Attributes
+import pekko.stream.ActorAttributes
+import pekko.stream.StreamSubscriptionTimeoutTerminationMode
+import pekko.stream.scaladsl.Keep
+import pekko.stream.scaladsl.Sink
+import pekko.stream.scaladsl.Source
+import pekko.stream.testkit.TestPublisher
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.testkit.Utils.TE
+
+class FanoutPublisherBehaviorSpec extends StreamSpec {
+
+  "Sink.asPublisher(fanout = true)" must {
+
+    "surface upstream failure to late subscribers without subscriptions" in {
+      val boom = TE("boom")
+      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+      promise.failure(boom)
+
+      Source.fromPublisher(publisher).runWith(Sink.ignore).failed.futureValue 
shouldBe boom
+    }
+
+    "propagate upstream failure to active and late subscribers" in {
+      val boom = TE("boom")
+      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+      val running = Source.fromPublisher(publisher).runWith(Sink.ignore)
+      promise.failure(boom)
+
+      running.failed.futureValue shouldBe boom
+      Source.fromPublisher(publisher).runWith(Sink.ignore).failed.futureValue 
shouldBe boom
+    }
+
+    "propagate upstream failure to all active subscribers" in {
+      val boom = TE("boom")
+      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+      val running1 = Source.fromPublisher(publisher).runWith(Sink.ignore)
+      val running2 = Source.fromPublisher(publisher).runWith(Sink.ignore)
+      promise.failure(boom)
+
+      running1.failed.futureValue shouldBe boom
+      running2.failed.futureValue shouldBe boom
+      Source.fromPublisher(publisher).runWith(Sink.ignore).failed.futureValue 
shouldBe boom
+    }
+
+    "complete late subscribers after completed upstream without subscriptions" 
in {
+      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+      promise.success(None)
+
+      Source.fromPublisher(publisher).runWith(Sink.seq).futureValue should 
===(Nil)
+    }
+
+    "complete active and late subscribers after completed upstream" in {
+      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+      val completed = Source.fromPublisher(publisher).runWith(Sink.ignore)
+
+      promise.success(None)
+
+      completed.futureValue
+      Source.fromPublisher(publisher).runWith(Sink.seq).futureValue should 
===(Nil)
+    }
+
+    "complete multiple active subscribers after completed upstream" in {
+      val (promise, publisher) = 
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+      val completed1 = Source.fromPublisher(publisher).runWith(Sink.ignore)
+      val completed2 = Source.fromPublisher(publisher).runWith(Sink.ignore)
+      promise.success(None)
+
+      completed1.futureValue
+      completed2.futureValue
+      Source.fromPublisher(publisher).runWith(Sink.seq).futureValue should 
===(Nil)
+    }
+
+    "keep remaining subscribers running when one of multiple subscriptions 
cancels before upstream completes" in {
+      val upstream = TestPublisher.probe[Int]()
+      val publisher = Source
+        .fromPublisher(upstream)
+        
.runWith(Sink.asPublisher(true).withAttributes(Attributes.inputBuffer(1, 1)))
+      val cancellingSubscriber = 
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+      val remainingSubscriber = 
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+
+      val cancellingSubscription = cancellingSubscriber.ensureSubscription()
+      val remainingSubscription = remainingSubscriber.ensureSubscription()
+
+      upstream.expectRequest()
+      remainingSubscription.request(2)
+      cancellingSubscription.cancel()
+
+      upstream.sendNext(1)
+      remainingSubscriber.expectNext(1)
+      upstream.expectRequest()
+      upstream.sendNext(2)
+      remainingSubscriber.expectNext(2)
+      upstream.sendComplete()
+      remainingSubscriber.expectComplete()
+      Source.fromPublisher(publisher).runWith(Sink.seq).futureValue should 
===(Nil)
+    }
+
+    "deliver the same elements to concurrent active subscribers" in {
+      val upstream = TestPublisher.probe[Int]()
+      val publisher = Source
+        .fromPublisher(upstream)
+        
.runWith(Sink.asPublisher(true).withAttributes(Attributes.inputBuffer(4, 4)))
+      val subscriber1 = 
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+      val subscriber2 = 
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+
+      subscriber1.ensureSubscription().request(3)
+      subscriber2.ensureSubscription().request(3)
+
+      upstream.expectRequest()
+      upstream.sendNext(1)
+      upstream.sendNext(2)
+      upstream.sendNext(3)
+
+      subscriber1.expectNext(1)
+      subscriber1.expectNext(2)
+      subscriber1.expectNext(3)
+      subscriber2.expectNext(1)
+      subscriber2.expectNext(2)
+      subscriber2.expectNext(3)
+
+      upstream.sendComplete()
+      subscriber1.expectComplete()
+      subscriber2.expectComplete()
+    }
+
+    "shut down late subscribers normally after buffered active subscribers 
drain a completed upstream" in {
+      val upstream = TestPublisher.probe[Int]()
+      val publisher = Source
+        .fromPublisher(upstream)
+        
.runWith(Sink.asPublisher(true).withAttributes(Attributes.inputBuffer(4, 4)))
+      val fastSubscriber = 
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+      val slowSubscriber = 
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+
+      fastSubscriber.ensureSubscription().request(2)
+      val slowSubscription = slowSubscriber.ensureSubscription()
+
+      upstream.expectRequest()
+      upstream.sendNext(1)
+      upstream.sendNext(2)
+      upstream.sendComplete()
+
+      fastSubscriber.expectNext(1)
+      fastSubscriber.expectNext(2)
+      fastSubscriber.expectComplete()
+
+      slowSubscription.request(2)
+      slowSubscriber.expectNext(1)
+      slowSubscriber.expectNext(2)
+      slowSubscriber.expectComplete()
+
+      Source.fromPublisher(publisher).runWith(Sink.head).failed.futureValue 
should
+      ===(ActorPublisher.NormalShutdownReason)
+    }
+
+    "shut down normally when the last subscriber fails downstream" in {
+      val upstream = TestPublisher.probe[Int]()
+      val publisher = 
Source.fromPublisher(upstream).runWith(Sink.asPublisher(true))
+      val boom = TE("boom")
+
+      val failed = Source.fromPublisher(publisher).map(_ => throw 
boom).runWith(Sink.ignore)
+      upstream.expectRequest()
+      upstream.sendNext(1)
+      failed.failed.futureValue shouldBe boom
+      upstream.expectCancellation()
+      Source.fromPublisher(publisher).runWith(Sink.head).failed.futureValue 
should
+      ===(ActorPublisher.NormalShutdownReason)
+    }
+
+    "fail with SubscriptionTimeoutException on subscriber timeout" in {
+      val shortTimeout = 300.millis
+      val timeoutAttributes = ActorAttributes.streamSubscriptionTimeout(
+        shortTimeout, 
StreamSubscriptionTimeoutTerminationMode.CancelTermination)
+      val upstream = TestPublisher.probe[Int]()
+      val publisher = 
Source.fromPublisher(upstream).runWith(Sink.asPublisher(true).addAttributes(timeoutAttributes))
+      upstream.expectCancellation()
+
+      val result = Source.fromPublisher(publisher).runWith(Sink.head)
+      val ex = intercept[SubscriptionTimeoutException] {
+        Await.result(result, 3.seconds)
+      }
+      ex.getMessage should include("Subscription timeout expired")
+    }
+
+    "continue accepting subscribers after WarnTermination timeout" in {
+      val shortTimeout = 300.millis
+      val timeoutAttributes = ActorAttributes.streamSubscriptionTimeout(
+        shortTimeout, StreamSubscriptionTimeoutTerminationMode.WarnTermination)
+      val upstream = TestPublisher.probe[Int]()
+      val publisher = EventFilter.warning(start = "Subscription timeout for", 
occurrences = 1).intercept {
+        
Source.fromPublisher(upstream).runWith(Sink.asPublisher(true).addAttributes(timeoutAttributes))
+      }
+
+      val subscriber = Source.fromPublisher(publisher).runWith(TestSink[Int]())
+      subscriber.ensureSubscription().request(1)
+      upstream.expectRequest()
+      upstream.sendNext(42)
+      subscriber.expectNext(42)
+      upstream.sendComplete()
+      subscriber.expectComplete()
+    }
+
+  }
+
+}
diff --git 
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-processor.excludes
 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-processor.excludes
new file mode 100644
index 0000000000..908bde4284
--- /dev/null
+++ 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-processor.excludes
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Remove legacy fanout processor implementation
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.FanoutOutputs")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.FanoutProcessorImpl")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.FanoutProcessorImpl$")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
deleted file mode 100644
index 36c36c8ece..0000000000
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.stream.impl
-
-import scala.util.control.NoStackTrace
-
-import org.apache.pekko
-import pekko.actor.Actor
-import pekko.actor.ActorRef
-import pekko.actor.Deploy
-import pekko.actor.Props
-import pekko.annotation.InternalApi
-import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
-import pekko.stream.Attributes
-import pekko.stream.StreamSubscriptionTimeoutTerminationMode
-import pekko.util.OptionVal
-
-import org.reactivestreams.Subscriber
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] abstract class FanoutOutputs(
-    val maxBufferSize: Int,
-    val initialBufferSize: Int,
-    self: ActorRef,
-    val pump: Pump)
-    extends DefaultOutputTransferStates
-    with SubscriberManagement[Any] {
-
-  private var _subscribed = false
-  def subscribed: Boolean = _subscribed
-
-  override type S = ActorSubscriptionWithCursor[_ >: Any]
-  override def createSubscription(subscriber: Subscriber[_ >: Any]): S = {
-    _subscribed = true
-    new ActorSubscriptionWithCursor(self, subscriber)
-  }
-
-  protected var exposedPublisher: ActorPublisher[Any] = _
-
-  private var downstreamBufferSpace: Long = 0L
-  private var downstreamCompleted = false
-  override def demandAvailable = downstreamBufferSpace > 0
-  override def demandCount: Long = downstreamBufferSpace
-
-  override val subreceive = new SubReceive(waitingExposedPublisher)
-
-  def enqueueOutputElement(elem: Any): Unit = {
-    ReactiveStreamsCompliance.requireNonNullElement(elem)
-    downstreamBufferSpace -= 1
-    pushToDownstream(elem)
-  }
-
-  override def complete(): Unit =
-    if (!downstreamCompleted) {
-      downstreamCompleted = true
-      completeDownstream()
-    }
-
-  override def cancel(): Unit = complete()
-
-  override def error(e: Throwable): Unit = {
-    if (!downstreamCompleted) {
-      downstreamCompleted = true
-      abortDownstream(e)
-      if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
-    }
-  }
-
-  def isClosed: Boolean = downstreamCompleted
-
-  def afterShutdown(): Unit
-
-  override protected def requestFromUpstream(elements: Long): Unit = 
downstreamBufferSpace += elements
-
-  private def subscribePending(): Unit =
-    exposedPublisher.takePendingSubscribers().foreach(registerSubscriber)
-
-  override protected def shutdown(completed: Boolean): Unit = {
-    if (exposedPublisher ne null) {
-      if (completed) exposedPublisher.shutdown(None)
-      else exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
-    }
-    afterShutdown()
-  }
-
-  override protected def cancelUpstream(): Unit = {
-    downstreamCompleted = true
-  }
-
-  protected def waitingExposedPublisher: Actor.Receive = {
-    case ExposedPublisher(publisher) =>
-      exposedPublisher = publisher
-      subreceive.become(downstreamRunning)
-    case other =>
-      throw new IllegalStateException(s"The first message must be 
ExposedPublisher but was [$other]")
-  }
-
-  protected def downstreamRunning: Actor.Receive = {
-    case SubscribePending =>
-      subscribePending()
-    case RequestMore(subscription, elements) =>
-      
moreRequested(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]], 
elements)
-      pump.pump()
-    case Cancel(subscription) =>
-      
unregisterSubscription(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]])
-      pump.pump()
-  }
-
-}
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] object FanoutProcessorImpl {
-  def props(attributes: Attributes): Props =
-    Props(new FanoutProcessorImpl(attributes)).withDeploy(Deploy.local)
-}
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] class FanoutProcessorImpl(attributes: Attributes) 
extends ActorProcessorImpl(attributes) {
-
-  val StreamSubscriptionTimeout(timeout, timeoutMode) = 
attributes.mandatoryAttribute[StreamSubscriptionTimeout]
-  val timeoutTimer = if (timeoutMode != 
StreamSubscriptionTimeoutTerminationMode.noop) {
-    import context.dispatcher
-    OptionVal.Some(context.system.scheduler.scheduleOnce(timeout, self, 
ActorProcessorImpl.SubscriptionTimeout))
-  } else OptionVal.None
-
-  override val primaryOutputs: FanoutOutputs = {
-    val inputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer]
-    new FanoutOutputs(inputBuffer.max, inputBuffer.initial, self, this) {
-      override def afterShutdown(): Unit = afterFlush()
-    }
-  }
-
-  val running: TransferPhase = TransferPhase(primaryInputs.NeedsInput && 
primaryOutputs.NeedsDemand) { () =>
-    primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement())
-  }
-
-  override def pumpFinished(): Unit = {
-    primaryInputs.cancel()
-    primaryOutputs.complete()
-  }
-
-  override def postStop(): Unit = {
-    super.postStop()
-    timeoutTimer match {
-      case OptionVal.Some(timer) => timer.cancel()
-      case _                     =>
-    }
-  }
-
-  def afterFlush(): Unit = context.stop(self)
-
-  initialPhase(1, running)
-
-  def subTimeoutHandling: Receive = {
-    case ActorProcessorImpl.SubscriptionTimeout =>
-      import StreamSubscriptionTimeoutTerminationMode._
-      if (!primaryOutputs.subscribed) {
-        timeoutMode match {
-          case CancelTermination =>
-            // Use fail() to propagate a SubscriptionTimeoutException 
downstream instead of
-            // stopping abruptly, which would only produce a non-informative 
AbruptTerminationException
-            log.warning("Subscription timeout expired for [{}], no subscriber 
attached in time", self)
-            fail(new SubscriptionTimeoutException(
-              s"Subscription timeout expired, no subscriber attached to 
[$self]") with NoStackTrace)
-          case WarnTermination =>
-            log.warning("Subscription timeout for {}", this)
-          case NoopTermination => // won't happen
-        }
-      }
-  }
-}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
new file mode 100644
index 0000000000..06eae4ed19
--- /dev/null
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
+
+import scala.annotation.tailrec
+import scala.collection.immutable
+import scala.util.control.NoStackTrace
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
+import pekko.stream.Attributes.InputBuffer
+import pekko.stream.StreamSubscriptionTimeoutTerminationMode
+import pekko.stream._
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.stage.{
+  AsyncCallback,
+  GraphStageLogic,
+  GraphStageWithMaterializedValue,
+  InHandler,
+  TimerGraphStageLogic
+}
+
+import org.reactivestreams.{ Publisher, Subscriber }
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class FanoutPublisherBridgeStage[T]
+    extends GraphStageWithMaterializedValue[SinkShape[T], Publisher[T]] {
+  import StreamSubscriptionTimeoutTerminationMode.{ CancelTermination, 
NoopTermination, WarnTermination }
+
+  private val SubscriptionTimerKey = 
"FanoutPublisherBridgeStage.subscription-timeout"
+
+  val in: Inlet[T] = Inlet("FanoutPublisherBridgeStage.in")
+  override val shape: SinkShape[T] = SinkShape(in)
+  override protected def initialAttributes: Attributes = 
DefaultAttributes.fanoutPublisherSink
+
+  override def toString: String = "FanoutPublisherBridgeStage"
+
+  override def createLogicAndMaterializedValue(
+      inheritedAttributes: Attributes): (GraphStageLogic, Publisher[T]) = {
+    object logic extends TimerGraphStageLogic(shape) with InHandler with 
SubscriberManagement[T] {
+      override type S = FanoutPublisherBridgeSubscription[T]
+
+      private val StreamSubscriptionTimeout(timeout, timeoutMode) =
+        inheritedAttributes.mandatoryAttribute[StreamSubscriptionTimeout]
+
+      override def initialBufferSize: Int = 
inheritedAttributes.mandatoryAttribute[InputBuffer].initial
+      override def maxBufferSize: Int = 
inheritedAttributes.mandatoryAttribute[InputBuffer].max
+
+      private var everSubscribed = false
+      private var requestedFromUpstream = 0L
+
+      private val requestCallback = 
getAsyncCallback[(FanoutPublisherBridgeSubscription[T], Long)] {
+        case (subscription, elements) =>
+          moreRequested(subscription, elements)
+          tryPullIfNeeded()
+      }
+
+      private val cancelCallback = 
getAsyncCallback[FanoutPublisherBridgeSubscription[T]] { subscription =>
+        unregisterSubscription(subscription)
+        tryPullIfNeeded()
+      }
+
+      var materializedPublisher: FanoutPublisherBridgePublisher[T] = null
+
+      private val registerPendingSubscribers = getAsyncCallback[Unit] { _ =>
+        
materializedPublisher.takePendingSubscribers().foreach(registerSubscriber)
+        tryPullIfNeeded()
+      }
+
+      materializedPublisher = new 
FanoutPublisherBridgePublisher[T](registerPendingSubscribers)
+
+      override def preStart(): Unit = {
+        setKeepGoing(true)
+        if (timeoutMode != NoopTermination) scheduleOnce(SubscriptionTimerKey, 
timeout)
+      }
+
+      override protected def onTimer(timerKey: Any): Unit =
+        if (timerKey == SubscriptionTimerKey && !everSubscribed) {
+          timeoutMode match {
+            case CancelTermination =>
+              val ex = new SubscriptionTimeoutException(
+                s"Subscription timeout expired, no subscriber attached to 
[$materializedPublisher]") with NoStackTrace
+              materializer.logger.warning(
+                "Subscription timeout expired for [{}], no subscriber attached 
in time",
+                materializedPublisher)
+              materializedPublisher.shutdown(Some(ex))
+              failStage(ex)
+            case WarnTermination =>
+              materializer.logger.warning("Subscription timeout for {}", this)
+            case NoopTermination => // won't happen
+          }
+        }
+
+      override protected def requestFromUpstream(elements: Long): Unit = {
+        requestedFromUpstream += elements
+        tryPullIfNeeded()
+      }
+
+      override protected def cancelUpstream(): Unit =
+        if (!isClosed(in)) cancel(in)
+
+      override protected def shutdown(completed: Boolean): Unit = {
+        materializedPublisher.shutdown(if (completed) None else 
ActorPublisher.SomeNormalShutdownReason)
+        completeStage()
+      }
+
+      override protected def createSubscription(subscriber: Subscriber[_ >: 
T]): S = {
+        everSubscribed = true
+        cancelTimer(SubscriptionTimerKey)
+        new FanoutPublisherBridgeSubscription[T](subscriber, requestCallback, 
cancelCallback)
+      }
+
+      override def onPush(): Unit = {
+        if (requestedFromUpstream <= 0)
+          throw new IllegalStateException(s"onPush without outstanding 
upstream demand: $requestedFromUpstream")
+        requestedFromUpstream -= 1
+        pushToDownstream(grab(in))
+        tryPullIfNeeded()
+      }
+
+      override def onUpstreamFinish(): Unit =
+        completeDownstream()
+
+      override def onUpstreamFailure(ex: Throwable): Unit = {
+        abortDownstream(ex)
+        materializedPublisher.shutdown(Some(ex))
+        failStage(ex)
+      }
+
+      override def postStop(): Unit =
+        try abortDownstream(ActorPublisher.NormalShutdownReason)
+        finally 
materializedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
+
+      private def tryPullIfNeeded(): Unit =
+        if (requestedFromUpstream > 0 && !hasBeenPulled(in) && !isClosed(in)) 
pull(in)
+
+      setHandler(in, this)
+    }
+
+    (logic, logic.materializedPublisher)
+  }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class FanoutPublisherBridgeSubscription[T](
+    override val subscriber: Subscriber[_ >: T],
+    requestCallback: AsyncCallback[(FanoutPublisherBridgeSubscription[T], 
Long)],
+    cancelCallback: AsyncCallback[FanoutPublisherBridgeSubscription[T]])
+    extends SubscriptionWithCursor[T] {
+
+  override def request(elements: Long): Unit =
+    requestCallback.invoke((this, elements))
+
+  override def cancel(): Unit =
+    cancelCallback.invoke(this)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class FanoutPublisherBridgePublisher[T](
+    registerPendingSubscribers: AsyncCallback[Unit])
+    extends Publisher[T] {
+  import ReactiveStreamsCompliance._
+
+  private val pendingSubscribers = new 
AtomicReference[immutable.Seq[Subscriber[_ >: T]]](Nil)
+  private val shutdownStarted = new AtomicBoolean(false)
+
+  @volatile private var shutdownReason: Option[Throwable] = None
+
+  override def subscribe(subscriber: Subscriber[_ >: T]): Unit = {
+    requireNonNullSubscriber(subscriber)
+
+    @tailrec def doSubscribe(): Unit = {
+      val current = pendingSubscribers.get()
+      if (current eq null) reportSubscribeFailure(subscriber)
+      else if (pendingSubscribers.compareAndSet(current, subscriber +: 
current)) registerPendingSubscribers.invoke(())
+      else doSubscribe()
+    }
+
+    doSubscribe()
+  }
+
+  def takePendingSubscribers(): immutable.Seq[Subscriber[_ >: T]] = {
+    @tailrec def swapPendingSubscribers(): immutable.Seq[Subscriber[_ >: T]] = 
{
+      val current = pendingSubscribers.get()
+      if (current eq null) Nil
+      else if (pendingSubscribers.compareAndSet(current, Nil)) current.reverse
+      else swapPendingSubscribers()
+    }
+
+    swapPendingSubscribers()
+  }
+
+  def shutdown(reason: Option[Throwable]): Unit =
+    if (shutdownStarted.compareAndSet(false, true)) {
+      shutdownReason = reason
+      pendingSubscribers.getAndSet(null) match {
+        case null    => // already shut down
+        case pending =>
+          pending.foreach(reportSubscribeFailure)
+      }
+    }
+
+  private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit =
+    try shutdownReason match {
+        case Some(_: ReactiveStreamsCompliance.SpecViolation) => // ok, not 
allowed to call onError
+        case Some(e)                                          =>
+          tryOnSubscribe(subscriber, CancelledSubscription)
+          tryOnError(subscriber, e)
+        case None =>
+          tryOnSubscribe(subscriber, CancelledSubscription)
+          tryOnComplete(subscriber)
+      }
+    catch {
+      case _: ReactiveStreamsCompliance.SpecViolation => // nothing to do
+    }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
index 08eb2942f6..f1c2915cef 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
@@ -115,11 +115,13 @@ import org.reactivestreams.Subscriber
     extends SinkModule[In, Publisher[In]](shape) {
 
   override def create(context: MaterializationContext): (Subscriber[In], 
Publisher[In]) = {
-    val impl = context.materializer.actorOf(context, 
FanoutProcessorImpl.props(context.effectiveAttributes))
-    val fanoutProcessor = new ActorProcessor[In, In](impl)
-    // Resolve cyclic dependency with actor. This MUST be the first message no 
matter what.
-    impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
-    (fanoutProcessor, fanoutProcessor)
+    context.materializer.materialize(
+      Source
+        // Keep the SinkModule ABI but route the runtime through the new 
GraphStage bridge instead
+        // of reviving the legacy FanoutProcessorImpl / ActorPublisher path.
+        .asSubscriber[In]
+        .toMat(Sink.fromGraph(new FanoutPublisherBridgeStage[In]))(Keep.both),
+      context.effectiveAttributes)
   }
 
   override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] 
=


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to