This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new f5c37ba255 feat(stream): replace fanout publisher runtime with
GraphStage bridge (#2874)
f5c37ba255 is described below
commit f5c37ba2552fd637c09a3cf6498c92e113e705b1
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Apr 20 20:41:50 2026 +0800
feat(stream): replace fanout publisher runtime with GraphStage bridge
(#2874)
* feat(stream): replace fanout publisher runtime with GraphStage bridge
Motivation:
Sink.asPublisher(fanout = true) still depended on the legacy actor-backed
FanoutProcessorImpl runtime, which kept issue #2860 blocked on old processor
infrastructure and implementation-bound tests.
Modification:
Route FanoutPublisherSink through a new FanoutPublisherBridgeStage, delete
the legacy FanoutProcessor implementation, replace the old actor-bound spec
with FanoutPublisherBehaviorSpec, and add the matching 2.0.x MiMa excludes for
the removed binary-visible classes.
Result:
The fanout publisher path now runs on a dedicated GraphStage bridge with
the existing terminal-signal contract preserved, broader behavior coverage
added, and compile/MiMa/TCK validation passing.
References:
apache/pekko#2860
Co-authored-by: Copilot <[email protected]>
* chore(stream): format fanout publisher bridge for CI
Motivation:
GitHub's scalafmt diff-ref job flagged FanoutPublisherBridgeStage.scala on
PR #2874.
Modification:
Applied scalafmt formatting to FanoutPublisherBridgeStage.scala without
changing behavior.
Result:
The fanout bridge file now matches the repository's CI formatting
expectations.
References:
apache/pekko#2860
apache/pekko#2874
---
.../pekko/stream/impl/FanoutProcessorSpec.scala | 144 -------------
.../stream/impl/FanoutPublisherBehaviorSpec.scala | 229 ++++++++++++++++++++
.../remove-fanout-processor.excludes | 21 ++
.../apache/pekko/stream/impl/FanoutProcessor.scala | 188 ----------------
.../stream/impl/FanoutPublisherBridgeStage.scala | 240 +++++++++++++++++++++
.../scala/org/apache/pekko/stream/impl/Sinks.scala | 12 +-
6 files changed, 497 insertions(+), 337 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
deleted file mode 100644
index 6880fcc4e1..0000000000
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.stream.impl
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import org.apache.pekko
-import pekko.stream.ActorAttributes
-import pekko.stream.StreamSubscriptionTimeoutTerminationMode
-import pekko.stream.scaladsl.Keep
-import pekko.stream.scaladsl.Sink
-import pekko.stream.scaladsl.Source
-import pekko.stream.testkit.StreamSpec
-import pekko.stream.testkit.Utils.TE
-import pekko.testkit.TestProbe
-
-class FanoutProcessorSpec extends StreamSpec {
-
- "The FanoutProcessor" must {
-
- // #25634
- "not leak running actors on failed upstream without subscription" in {
- val probe = TestProbe()
- val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
- val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
- probe.watch(publisherRef)
- promise.failure(TE("boom"))
- probe.expectTerminated(publisherRef)
- }
-
- // #25634
- "not leak running actors on failed upstream with one subscription" in {
- val probe = TestProbe()
- val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
- val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
- Source.fromPublisher(publisher).runWith(Sink.ignore)
- probe.watch(publisherRef)
- val boom = TE("boom")
- promise.failure(boom)
- probe.expectTerminated(publisherRef)
- }
-
- // #25634
- "not leak running actors on failed upstream with multiple subscriptions"
in {
- val probe = TestProbe()
- val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
- val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
- probe.watch(publisherRef)
- Source.fromPublisher(publisher).runWith(Sink.ignore)
- Source.fromPublisher(publisher).runWith(Sink.ignore)
- val boom = TE("boom")
- promise.failure(boom)
- probe.expectTerminated(publisherRef)
- }
-
- "not leak running actors on completed upstream no subscriptions" in {
- val probe = TestProbe()
- val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
- val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
- probe.watch(publisherRef)
- promise.success(None)
-
- probe.expectTerminated(publisherRef)
- }
-
- "not leak running actors on completed upstream with one subscription" in {
- val probe = TestProbe()
- val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
- val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
- val completed = Source.fromPublisher(publisher).runWith(Sink.ignore)
- probe.watch(publisherRef)
-
- promise.success(None)
-
- probe.expectTerminated(publisherRef)
- // would throw if not completed
- completed.futureValue
- }
-
- "not leak running actors on completed upstream with multiple
subscriptions" in {
- val probe = TestProbe()
- val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
- val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
- val completed1 = Source.fromPublisher(publisher).runWith(Sink.ignore)
- val completed2 = Source.fromPublisher(publisher).runWith(Sink.ignore)
- probe.watch(publisherRef)
- promise.success(None)
-
- probe.expectTerminated(publisherRef)
- // would throw if not completed
- completed1.futureValue
- completed2.futureValue
- }
-
- "not leak running actors on failed downstream" in {
- val probe = TestProbe()
- val (_, publisher) =
Source.repeat(1).toMat(Sink.asPublisher(true))(Keep.both).run()
- val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
- probe.watch(publisherRef)
- Source.fromPublisher(publisher).map(_ => throw
TE("boom")).runWith(Sink.ignore)
- probe.expectTerminated(publisherRef)
- }
-
- // #2645
- "fail with SubscriptionTimeoutException instead of
AbruptTerminationException on subscriber timeout" in {
- val shortTimeout = 300.millis
- val timeoutAttributes = ActorAttributes.streamSubscriptionTimeout(
- shortTimeout,
StreamSubscriptionTimeoutTerminationMode.CancelTermination)
-
- val (_, publisher) = Source.maybe[Int]
-
.toMat(Sink.asPublisher(true).addAttributes(timeoutAttributes))(Keep.both)
- .run()
-
- // Do NOT subscribe — let the timeout fire
- val probe = TestProbe()
- val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
- probe.watch(publisherRef)
-
- // The actor should terminate after the subscription timeout
- probe.expectTerminated(publisherRef, shortTimeout + 3.seconds)
-
- // Now try to subscribe after timeout and verify the error type
- val result = Source.fromPublisher(publisher).runWith(Sink.head)
- val ex = intercept[SubscriptionTimeoutException] {
- Await.result(result, 3.seconds)
- }
- ex.getMessage should include("Subscription timeout expired")
- }
-
- }
-
-}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
new file mode 100644
index 0000000000..d7e17ba530
--- /dev/null
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.testkit.EventFilter
+import pekko.stream.Attributes
+import pekko.stream.ActorAttributes
+import pekko.stream.StreamSubscriptionTimeoutTerminationMode
+import pekko.stream.scaladsl.Keep
+import pekko.stream.scaladsl.Sink
+import pekko.stream.scaladsl.Source
+import pekko.stream.testkit.TestPublisher
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.testkit.Utils.TE
+
+class FanoutPublisherBehaviorSpec extends StreamSpec {
+
+ "Sink.asPublisher(fanout = true)" must {
+
+ "surface upstream failure to late subscribers without subscriptions" in {
+ val boom = TE("boom")
+ val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+ promise.failure(boom)
+
+ Source.fromPublisher(publisher).runWith(Sink.ignore).failed.futureValue
shouldBe boom
+ }
+
+ "propagate upstream failure to active and late subscribers" in {
+ val boom = TE("boom")
+ val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+ val running = Source.fromPublisher(publisher).runWith(Sink.ignore)
+ promise.failure(boom)
+
+ running.failed.futureValue shouldBe boom
+ Source.fromPublisher(publisher).runWith(Sink.ignore).failed.futureValue
shouldBe boom
+ }
+
+ "propagate upstream failure to all active subscribers" in {
+ val boom = TE("boom")
+ val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+ val running1 = Source.fromPublisher(publisher).runWith(Sink.ignore)
+ val running2 = Source.fromPublisher(publisher).runWith(Sink.ignore)
+ promise.failure(boom)
+
+ running1.failed.futureValue shouldBe boom
+ running2.failed.futureValue shouldBe boom
+ Source.fromPublisher(publisher).runWith(Sink.ignore).failed.futureValue
shouldBe boom
+ }
+
+ "complete late subscribers after completed upstream without subscriptions"
in {
+ val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+ promise.success(None)
+
+ Source.fromPublisher(publisher).runWith(Sink.seq).futureValue should
===(Nil)
+ }
+
+ "complete active and late subscribers after completed upstream" in {
+ val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+ val completed = Source.fromPublisher(publisher).runWith(Sink.ignore)
+
+ promise.success(None)
+
+ completed.futureValue
+ Source.fromPublisher(publisher).runWith(Sink.seq).futureValue should
===(Nil)
+ }
+
+ "complete multiple active subscribers after completed upstream" in {
+ val (promise, publisher) =
Source.maybe[Int].toMat(Sink.asPublisher(true))(Keep.both).run()
+ val completed1 = Source.fromPublisher(publisher).runWith(Sink.ignore)
+ val completed2 = Source.fromPublisher(publisher).runWith(Sink.ignore)
+ promise.success(None)
+
+ completed1.futureValue
+ completed2.futureValue
+ Source.fromPublisher(publisher).runWith(Sink.seq).futureValue should
===(Nil)
+ }
+
+ "keep remaining subscribers running when one of multiple subscriptions
cancels before upstream completes" in {
+ val upstream = TestPublisher.probe[Int]()
+ val publisher = Source
+ .fromPublisher(upstream)
+
.runWith(Sink.asPublisher(true).withAttributes(Attributes.inputBuffer(1, 1)))
+ val cancellingSubscriber =
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+ val remainingSubscriber =
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+
+ val cancellingSubscription = cancellingSubscriber.ensureSubscription()
+ val remainingSubscription = remainingSubscriber.ensureSubscription()
+
+ upstream.expectRequest()
+ remainingSubscription.request(2)
+ cancellingSubscription.cancel()
+
+ upstream.sendNext(1)
+ remainingSubscriber.expectNext(1)
+ upstream.expectRequest()
+ upstream.sendNext(2)
+ remainingSubscriber.expectNext(2)
+ upstream.sendComplete()
+ remainingSubscriber.expectComplete()
+ Source.fromPublisher(publisher).runWith(Sink.seq).futureValue should
===(Nil)
+ }
+
+ "deliver the same elements to concurrent active subscribers" in {
+ val upstream = TestPublisher.probe[Int]()
+ val publisher = Source
+ .fromPublisher(upstream)
+
.runWith(Sink.asPublisher(true).withAttributes(Attributes.inputBuffer(4, 4)))
+ val subscriber1 =
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+ val subscriber2 =
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+
+ subscriber1.ensureSubscription().request(3)
+ subscriber2.ensureSubscription().request(3)
+
+ upstream.expectRequest()
+ upstream.sendNext(1)
+ upstream.sendNext(2)
+ upstream.sendNext(3)
+
+ subscriber1.expectNext(1)
+ subscriber1.expectNext(2)
+ subscriber1.expectNext(3)
+ subscriber2.expectNext(1)
+ subscriber2.expectNext(2)
+ subscriber2.expectNext(3)
+
+ upstream.sendComplete()
+ subscriber1.expectComplete()
+ subscriber2.expectComplete()
+ }
+
+ "shut down late subscribers normally after buffered active subscribers
drain a completed upstream" in {
+ val upstream = TestPublisher.probe[Int]()
+ val publisher = Source
+ .fromPublisher(upstream)
+
.runWith(Sink.asPublisher(true).withAttributes(Attributes.inputBuffer(4, 4)))
+ val fastSubscriber =
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+ val slowSubscriber =
Source.fromPublisher(publisher).runWith(TestSink[Int]())
+
+ fastSubscriber.ensureSubscription().request(2)
+ val slowSubscription = slowSubscriber.ensureSubscription()
+
+ upstream.expectRequest()
+ upstream.sendNext(1)
+ upstream.sendNext(2)
+ upstream.sendComplete()
+
+ fastSubscriber.expectNext(1)
+ fastSubscriber.expectNext(2)
+ fastSubscriber.expectComplete()
+
+ slowSubscription.request(2)
+ slowSubscriber.expectNext(1)
+ slowSubscriber.expectNext(2)
+ slowSubscriber.expectComplete()
+
+ Source.fromPublisher(publisher).runWith(Sink.head).failed.futureValue
should
+ ===(ActorPublisher.NormalShutdownReason)
+ }
+
+ "shut down normally when the last subscriber fails downstream" in {
+ val upstream = TestPublisher.probe[Int]()
+ val publisher =
Source.fromPublisher(upstream).runWith(Sink.asPublisher(true))
+ val boom = TE("boom")
+
+ val failed = Source.fromPublisher(publisher).map(_ => throw
boom).runWith(Sink.ignore)
+ upstream.expectRequest()
+ upstream.sendNext(1)
+ failed.failed.futureValue shouldBe boom
+ upstream.expectCancellation()
+ Source.fromPublisher(publisher).runWith(Sink.head).failed.futureValue
should
+ ===(ActorPublisher.NormalShutdownReason)
+ }
+
+ "fail with SubscriptionTimeoutException on subscriber timeout" in {
+ val shortTimeout = 300.millis
+ val timeoutAttributes = ActorAttributes.streamSubscriptionTimeout(
+ shortTimeout,
StreamSubscriptionTimeoutTerminationMode.CancelTermination)
+ val upstream = TestPublisher.probe[Int]()
+ val publisher =
Source.fromPublisher(upstream).runWith(Sink.asPublisher(true).addAttributes(timeoutAttributes))
+ upstream.expectCancellation()
+
+ val result = Source.fromPublisher(publisher).runWith(Sink.head)
+ val ex = intercept[SubscriptionTimeoutException] {
+ Await.result(result, 3.seconds)
+ }
+ ex.getMessage should include("Subscription timeout expired")
+ }
+
+ "continue accepting subscribers after WarnTermination timeout" in {
+ val shortTimeout = 300.millis
+ val timeoutAttributes = ActorAttributes.streamSubscriptionTimeout(
+ shortTimeout, StreamSubscriptionTimeoutTerminationMode.WarnTermination)
+ val upstream = TestPublisher.probe[Int]()
+ val publisher = EventFilter.warning(start = "Subscription timeout for",
occurrences = 1).intercept {
+
Source.fromPublisher(upstream).runWith(Sink.asPublisher(true).addAttributes(timeoutAttributes))
+ }
+
+ val subscriber = Source.fromPublisher(publisher).runWith(TestSink[Int]())
+ subscriber.ensureSubscription().request(1)
+ upstream.expectRequest()
+ upstream.sendNext(42)
+ subscriber.expectNext(42)
+ upstream.sendComplete()
+ subscriber.expectComplete()
+ }
+
+ }
+
+}
diff --git
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-processor.excludes
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-processor.excludes
new file mode 100644
index 0000000000..908bde4284
--- /dev/null
+++
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-processor.excludes
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Remove legacy fanout processor implementation
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.FanoutOutputs")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.FanoutProcessorImpl")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.FanoutProcessorImpl$")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
deleted file mode 100644
index 36c36c8ece..0000000000
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.stream.impl
-
-import scala.util.control.NoStackTrace
-
-import org.apache.pekko
-import pekko.actor.Actor
-import pekko.actor.ActorRef
-import pekko.actor.Deploy
-import pekko.actor.Props
-import pekko.annotation.InternalApi
-import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
-import pekko.stream.Attributes
-import pekko.stream.StreamSubscriptionTimeoutTerminationMode
-import pekko.util.OptionVal
-
-import org.reactivestreams.Subscriber
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] abstract class FanoutOutputs(
- val maxBufferSize: Int,
- val initialBufferSize: Int,
- self: ActorRef,
- val pump: Pump)
- extends DefaultOutputTransferStates
- with SubscriberManagement[Any] {
-
- private var _subscribed = false
- def subscribed: Boolean = _subscribed
-
- override type S = ActorSubscriptionWithCursor[_ >: Any]
- override def createSubscription(subscriber: Subscriber[_ >: Any]): S = {
- _subscribed = true
- new ActorSubscriptionWithCursor(self, subscriber)
- }
-
- protected var exposedPublisher: ActorPublisher[Any] = _
-
- private var downstreamBufferSpace: Long = 0L
- private var downstreamCompleted = false
- override def demandAvailable = downstreamBufferSpace > 0
- override def demandCount: Long = downstreamBufferSpace
-
- override val subreceive = new SubReceive(waitingExposedPublisher)
-
- def enqueueOutputElement(elem: Any): Unit = {
- ReactiveStreamsCompliance.requireNonNullElement(elem)
- downstreamBufferSpace -= 1
- pushToDownstream(elem)
- }
-
- override def complete(): Unit =
- if (!downstreamCompleted) {
- downstreamCompleted = true
- completeDownstream()
- }
-
- override def cancel(): Unit = complete()
-
- override def error(e: Throwable): Unit = {
- if (!downstreamCompleted) {
- downstreamCompleted = true
- abortDownstream(e)
- if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
- }
- }
-
- def isClosed: Boolean = downstreamCompleted
-
- def afterShutdown(): Unit
-
- override protected def requestFromUpstream(elements: Long): Unit =
downstreamBufferSpace += elements
-
- private def subscribePending(): Unit =
- exposedPublisher.takePendingSubscribers().foreach(registerSubscriber)
-
- override protected def shutdown(completed: Boolean): Unit = {
- if (exposedPublisher ne null) {
- if (completed) exposedPublisher.shutdown(None)
- else exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
- }
- afterShutdown()
- }
-
- override protected def cancelUpstream(): Unit = {
- downstreamCompleted = true
- }
-
- protected def waitingExposedPublisher: Actor.Receive = {
- case ExposedPublisher(publisher) =>
- exposedPublisher = publisher
- subreceive.become(downstreamRunning)
- case other =>
- throw new IllegalStateException(s"The first message must be
ExposedPublisher but was [$other]")
- }
-
- protected def downstreamRunning: Actor.Receive = {
- case SubscribePending =>
- subscribePending()
- case RequestMore(subscription, elements) =>
-
moreRequested(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]],
elements)
- pump.pump()
- case Cancel(subscription) =>
-
unregisterSubscription(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]])
- pump.pump()
- }
-
-}
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] object FanoutProcessorImpl {
- def props(attributes: Attributes): Props =
- Props(new FanoutProcessorImpl(attributes)).withDeploy(Deploy.local)
-}
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] class FanoutProcessorImpl(attributes: Attributes)
extends ActorProcessorImpl(attributes) {
-
- val StreamSubscriptionTimeout(timeout, timeoutMode) =
attributes.mandatoryAttribute[StreamSubscriptionTimeout]
- val timeoutTimer = if (timeoutMode !=
StreamSubscriptionTimeoutTerminationMode.noop) {
- import context.dispatcher
- OptionVal.Some(context.system.scheduler.scheduleOnce(timeout, self,
ActorProcessorImpl.SubscriptionTimeout))
- } else OptionVal.None
-
- override val primaryOutputs: FanoutOutputs = {
- val inputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer]
- new FanoutOutputs(inputBuffer.max, inputBuffer.initial, self, this) {
- override def afterShutdown(): Unit = afterFlush()
- }
- }
-
- val running: TransferPhase = TransferPhase(primaryInputs.NeedsInput &&
primaryOutputs.NeedsDemand) { () =>
- primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement())
- }
-
- override def pumpFinished(): Unit = {
- primaryInputs.cancel()
- primaryOutputs.complete()
- }
-
- override def postStop(): Unit = {
- super.postStop()
- timeoutTimer match {
- case OptionVal.Some(timer) => timer.cancel()
- case _ =>
- }
- }
-
- def afterFlush(): Unit = context.stop(self)
-
- initialPhase(1, running)
-
- def subTimeoutHandling: Receive = {
- case ActorProcessorImpl.SubscriptionTimeout =>
- import StreamSubscriptionTimeoutTerminationMode._
- if (!primaryOutputs.subscribed) {
- timeoutMode match {
- case CancelTermination =>
- // Use fail() to propagate a SubscriptionTimeoutException
downstream instead of
- // stopping abruptly, which would only produce a non-informative
AbruptTerminationException
- log.warning("Subscription timeout expired for [{}], no subscriber
attached in time", self)
- fail(new SubscriptionTimeoutException(
- s"Subscription timeout expired, no subscriber attached to
[$self]") with NoStackTrace)
- case WarnTermination =>
- log.warning("Subscription timeout for {}", this)
- case NoopTermination => // won't happen
- }
- }
- }
-}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
new file mode 100644
index 0000000000..06eae4ed19
--- /dev/null
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
+
+import scala.annotation.tailrec
+import scala.collection.immutable
+import scala.util.control.NoStackTrace
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
+import pekko.stream.Attributes.InputBuffer
+import pekko.stream.StreamSubscriptionTimeoutTerminationMode
+import pekko.stream._
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.stage.{
+ AsyncCallback,
+ GraphStageLogic,
+ GraphStageWithMaterializedValue,
+ InHandler,
+ TimerGraphStageLogic
+}
+
+import org.reactivestreams.{ Publisher, Subscriber }
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class FanoutPublisherBridgeStage[T]
+ extends GraphStageWithMaterializedValue[SinkShape[T], Publisher[T]] {
+ import StreamSubscriptionTimeoutTerminationMode.{ CancelTermination,
NoopTermination, WarnTermination }
+
+ private val SubscriptionTimerKey =
"FanoutPublisherBridgeStage.subscription-timeout"
+
+ val in: Inlet[T] = Inlet("FanoutPublisherBridgeStage.in")
+ override val shape: SinkShape[T] = SinkShape(in)
+ override protected def initialAttributes: Attributes =
DefaultAttributes.fanoutPublisherSink
+
+ override def toString: String = "FanoutPublisherBridgeStage"
+
+ override def createLogicAndMaterializedValue(
+ inheritedAttributes: Attributes): (GraphStageLogic, Publisher[T]) = {
+ object logic extends TimerGraphStageLogic(shape) with InHandler with
SubscriberManagement[T] {
+ override type S = FanoutPublisherBridgeSubscription[T]
+
+ private val StreamSubscriptionTimeout(timeout, timeoutMode) =
+ inheritedAttributes.mandatoryAttribute[StreamSubscriptionTimeout]
+
+ override def initialBufferSize: Int =
inheritedAttributes.mandatoryAttribute[InputBuffer].initial
+ override def maxBufferSize: Int =
inheritedAttributes.mandatoryAttribute[InputBuffer].max
+
+ private var everSubscribed = false
+ private var requestedFromUpstream = 0L
+
+ private val requestCallback =
getAsyncCallback[(FanoutPublisherBridgeSubscription[T], Long)] {
+ case (subscription, elements) =>
+ moreRequested(subscription, elements)
+ tryPullIfNeeded()
+ }
+
+ private val cancelCallback =
getAsyncCallback[FanoutPublisherBridgeSubscription[T]] { subscription =>
+ unregisterSubscription(subscription)
+ tryPullIfNeeded()
+ }
+
+ var materializedPublisher: FanoutPublisherBridgePublisher[T] = null
+
+ private val registerPendingSubscribers = getAsyncCallback[Unit] { _ =>
+
materializedPublisher.takePendingSubscribers().foreach(registerSubscriber)
+ tryPullIfNeeded()
+ }
+
+ materializedPublisher = new
FanoutPublisherBridgePublisher[T](registerPendingSubscribers)
+
+ override def preStart(): Unit = {
+ setKeepGoing(true)
+ if (timeoutMode != NoopTermination) scheduleOnce(SubscriptionTimerKey,
timeout)
+ }
+
+ override protected def onTimer(timerKey: Any): Unit =
+ if (timerKey == SubscriptionTimerKey && !everSubscribed) {
+ timeoutMode match {
+ case CancelTermination =>
+ val ex = new SubscriptionTimeoutException(
+ s"Subscription timeout expired, no subscriber attached to
[$materializedPublisher]") with NoStackTrace
+ materializer.logger.warning(
+ "Subscription timeout expired for [{}], no subscriber attached
in time",
+ materializedPublisher)
+ materializedPublisher.shutdown(Some(ex))
+ failStage(ex)
+ case WarnTermination =>
+ materializer.logger.warning("Subscription timeout for {}", this)
+ case NoopTermination => // won't happen
+ }
+ }
+
+ override protected def requestFromUpstream(elements: Long): Unit = {
+ requestedFromUpstream += elements
+ tryPullIfNeeded()
+ }
+
+ override protected def cancelUpstream(): Unit =
+ if (!isClosed(in)) cancel(in)
+
+ override protected def shutdown(completed: Boolean): Unit = {
+ materializedPublisher.shutdown(if (completed) None else
ActorPublisher.SomeNormalShutdownReason)
+ completeStage()
+ }
+
+ override protected def createSubscription(subscriber: Subscriber[_ >:
T]): S = {
+ everSubscribed = true
+ cancelTimer(SubscriptionTimerKey)
+ new FanoutPublisherBridgeSubscription[T](subscriber, requestCallback,
cancelCallback)
+ }
+
+ override def onPush(): Unit = {
+ if (requestedFromUpstream <= 0)
+ throw new IllegalStateException(s"onPush without outstanding
upstream demand: $requestedFromUpstream")
+ requestedFromUpstream -= 1
+ pushToDownstream(grab(in))
+ tryPullIfNeeded()
+ }
+
+ override def onUpstreamFinish(): Unit =
+ completeDownstream()
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ abortDownstream(ex)
+ materializedPublisher.shutdown(Some(ex))
+ failStage(ex)
+ }
+
+ override def postStop(): Unit =
+ try abortDownstream(ActorPublisher.NormalShutdownReason)
+ finally
materializedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
+
+ private def tryPullIfNeeded(): Unit =
+ if (requestedFromUpstream > 0 && !hasBeenPulled(in) && !isClosed(in))
pull(in)
+
+ setHandler(in, this)
+ }
+
+ (logic, logic.materializedPublisher)
+ }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class FanoutPublisherBridgeSubscription[T](
+ override val subscriber: Subscriber[_ >: T],
+ requestCallback: AsyncCallback[(FanoutPublisherBridgeSubscription[T],
Long)],
+ cancelCallback: AsyncCallback[FanoutPublisherBridgeSubscription[T]])
+ extends SubscriptionWithCursor[T] {
+
+ override def request(elements: Long): Unit =
+ requestCallback.invoke((this, elements))
+
+ override def cancel(): Unit =
+ cancelCallback.invoke(this)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class FanoutPublisherBridgePublisher[T](
+ registerPendingSubscribers: AsyncCallback[Unit])
+ extends Publisher[T] {
+ import ReactiveStreamsCompliance._
+
+ private val pendingSubscribers = new
AtomicReference[immutable.Seq[Subscriber[_ >: T]]](Nil)
+ private val shutdownStarted = new AtomicBoolean(false)
+
+ @volatile private var shutdownReason: Option[Throwable] = None
+
+ override def subscribe(subscriber: Subscriber[_ >: T]): Unit = {
+ requireNonNullSubscriber(subscriber)
+
+ @tailrec def doSubscribe(): Unit = {
+ val current = pendingSubscribers.get()
+ if (current eq null) reportSubscribeFailure(subscriber)
+ else if (pendingSubscribers.compareAndSet(current, subscriber +:
current)) registerPendingSubscribers.invoke(())
+ else doSubscribe()
+ }
+
+ doSubscribe()
+ }
+
+ def takePendingSubscribers(): immutable.Seq[Subscriber[_ >: T]] = {
+ @tailrec def swapPendingSubscribers(): immutable.Seq[Subscriber[_ >: T]] =
{
+ val current = pendingSubscribers.get()
+ if (current eq null) Nil
+ else if (pendingSubscribers.compareAndSet(current, Nil)) current.reverse
+ else swapPendingSubscribers()
+ }
+
+ swapPendingSubscribers()
+ }
+
+ def shutdown(reason: Option[Throwable]): Unit =
+ if (shutdownStarted.compareAndSet(false, true)) {
+ shutdownReason = reason
+ pendingSubscribers.getAndSet(null) match {
+ case null => // already shut down
+ case pending =>
+ pending.foreach(reportSubscribeFailure)
+ }
+ }
+
+ private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit =
+ try shutdownReason match {
+ case Some(_: ReactiveStreamsCompliance.SpecViolation) => // ok, not
allowed to call onError
+ case Some(e) =>
+ tryOnSubscribe(subscriber, CancelledSubscription)
+ tryOnError(subscriber, e)
+ case None =>
+ tryOnSubscribe(subscriber, CancelledSubscription)
+ tryOnComplete(subscriber)
+ }
+ catch {
+ case _: ReactiveStreamsCompliance.SpecViolation => // nothing to do
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
index 08eb2942f6..f1c2915cef 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
@@ -115,11 +115,13 @@ import org.reactivestreams.Subscriber
extends SinkModule[In, Publisher[In]](shape) {
override def create(context: MaterializationContext): (Subscriber[In],
Publisher[In]) = {
- val impl = context.materializer.actorOf(context,
FanoutProcessorImpl.props(context.effectiveAttributes))
- val fanoutProcessor = new ActorProcessor[In, In](impl)
- // Resolve cyclic dependency with actor. This MUST be the first message no
matter what.
- impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
- (fanoutProcessor, fanoutProcessor)
+ context.materializer.materialize(
+ Source
+ // Keep the SinkModule ABI but route the runtime through the new
GraphStage bridge instead
+ // of reviving the legacy FanoutProcessorImpl / ActorPublisher path.
+ .asSubscriber[In]
+ .toMat(Sink.fromGraph(new FanoutPublisherBridgeStage[In]))(Keep.both),
+ context.effectiveAttributes)
}
override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]]
=
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]