This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch optimize-stage-actor-ref-lazy-dispatch in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 3619719aa619a4355c0deda3856f22352b76626e Author: He-Pin <[email protected]> AuthorDate: Mon Jun 1 13:11:36 2026 +0800 fix: optimize lazy stage actor dispatch via MPSC drain coalescing Motivation: Lazy `getStageActor` refs paid one actor mailbox enqueue per external tell: sender -> FunctionRef -> ConcurrentAsyncCallback.invokeWithPromise -> interpreter self ! AsyncInput. Under high tell rate to a single stage actor the bottleneck is mailbox traffic (envelope alloc, cross-thread wakeup, dequeue), not the dispatch lambda. Each tell also allocated a Tuple2, an AsyncInput, and a mailbox Envelope. Modification: Lazy `getStageActor` now installs an MPSC dispatch (`LazyDispatch`) that: - enqueues (sender, msg) into a Vyukov MPSC queue (`AbstractNodeQueue`) - elects a single drain via IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox enqueue - drains on the interpreter thread in a tight loop bounded by `stage-actor-drain-batch` (default 16), then either publishes IDLE (with the canonical recheck race fix) or re-schedules another envelope so other BoundaryEvents interleave naturally via the actor mailbox - preserves `isStageCompleted` semantics: items added after completion are dropped exactly as the old per-tell path silently skipped them. The eager construction path (used before stream demand) is unchanged and still routes through the materializer supervisor + `AsyncCallback`. JIT/GC notes: - `LazyDispatch` is a `final class` and extends `AbstractNodeQueue` directly so it is its own queue (one fewer allocation and field deref). - `scheduledState` is a plain `@volatile var Int` driven by a static `VarHandle` (created via `MethodHandles.privateLookupIn`), avoiding the per-instance `AtomicBoolean` wrapper. Same pattern as `AbstractNodeQueue` itself. - The dispatch `apply` is monomorphic per StageActor instance; the drain callback is allocated once and reused. The FunctionRef lambda is rewritten as `(sender, msg) =>` to skip the Tuple2 allocation on the PoisonPill / Kill warning path. - Per-tell allocation is now 1 Node + 1 Tuple2 (the Tuple2 is forced by the public `StageActorRef.Receive` type); AsyncInput and Envelope are amortized across the batch. Configuration: `pekko.stream.materializer.stage-actor-drain-batch` (default 16) bounds the per-envelope drain. The default aligns with `InputBuffer.max` and keeps the per-actor-wakeup work in the same order of magnitude as the dispatcher throughput; smaller values trade tell throughput for tighter interleaving with upstream/downstream events, larger values do the opposite. Binary compatibility: The original 4-arg `private[pekko] StageActor` constructor (`materializer, getAsyncCallback, initialReceive, name`) is preserved as an auxiliary constructor and continues to use the eager `AsyncCallback` path. A new 5-arg `private[pekko]` constructor (`materializer, interpreter, logic, initialReceive, name`) is added for the lazy path. `sbt stream/mimaReportBinaryIssues` passes clean. Result: `StageActorRefBenchmark.lazy_stage_actor_ref_tell_10k` (JMH 2 forks x 10 iter x 2s, macOS) - throughput is now bounded by Vyukov enqueue + drain loop rather than per-tell mailbox traffic: | Variant | Throughput (ops/s) | vs main | |----------------------------------|----------------------|---------| | main | 6,587,561 +- 616,243 | 1.00x | | MPSC + drain coalescing (cap=16) | 13,044,829 +- 1,525K | 1.98x | | MPSC + drain coalescing (cap=8) | 13,589,612 +- 2,114K | 2.06x | BroadcastHubBenchmark is unchanged in this measurement (its bottleneck is fan-out broadcasting, not stage-actor tell traffic). Tests: - sbt "stream / compile" "stream / mimaReportBinaryIssues" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec" (11/11) - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.ActorRefSinkSpec org.apache.pekko.stream.scaladsl.ActorRefSourceSpec org.apache.pekko.stream.scaladsl.ActorRefBackpressureSinkSpec org.apache.pekko.stream.scaladsl.ActorRefBackpressureSourceSpec" (42/42) - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.QueueSinkSpec org.apache.pekko.stream.scaladsl.QueueSourceSpec org.apache.pekko.stream.scaladsl.HubSpec" (94/94) - sbt scalafmt headerCheck - sbt "bench-jmh / Jmh / run -i 10 -wi 5 -f 2 -r 2s -w 2s .*StageActorRefBenchmark.*" References: Refs https://github.com/akka/akka-core/issues/26857 (public issue only; clean-room implementation) --- .../pekko/stream/StageActorRefBenchmark.scala | 136 +++++++++++++ .../pekko/stream/scaladsl/StageActorRefSpec.scala | 114 ++++++++++- stream/src/main/resources/reference.conf | 7 + .../org/apache/pekko/stream/stage/GraphStage.scala | 211 ++++++++++++++++++--- 4 files changed, 441 insertions(+), 27 deletions(-) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala new file mode 100644 index 0000000000..007253e275 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await +import scala.concurrent.Promise +import scala.concurrent.duration._ + +import org.openjdk.jmh.annotations._ + +import org.apache.pekko +import pekko.actor.ActorRef +import pekko.actor.ActorSystem +import pekko.actor.NoSerializationVerificationNeeded +import pekko.stream.scaladsl.Keep +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.Source +import pekko.stream.stage.GraphStageLogic +import pekko.stream.stage.GraphStageWithMaterializedValue +import pekko.stream.stage.InHandler + +object StageActorRefBenchmark { + final val OperationsPerInvocation = 10000 + private case object CountDown extends NoSerializationVerificationNeeded + + private final class Control { + private val ready = new CountDownLatch(1) + @volatile private var ref: ActorRef = _ + @volatile private var latch: CountDownLatch = _ + + def init(ref: ActorRef): Unit = { + this.ref = ref + ready.countDown() + } + + def stageActorRef: ActorRef = { + if (!ready.await(10, TimeUnit.SECONDS)) + throw new RuntimeException("Stage actor ref was not initialized") + ref + } + + def reset(expectedMessages: Int): Unit = + latch = new CountDownLatch(expectedMessages) + + def countDown(): Unit = + latch.countDown() + + def awaitDone(): Unit = + if (!latch.await(10, TimeUnit.SECONDS)) + throw new RuntimeException("Stage actor ref benchmark messages timed out") + } + + private final class StageActorSink extends GraphStageWithMaterializedValue[SinkShape[Any], Control] { + val in: Inlet[Any] = Inlet("StageActorSink.in") + override val shape: SinkShape[Any] = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Control) = { + val control = new Control + + val logic = new GraphStageLogic(shape) { + override def preStart(): Unit = { + control.init(getStageActor { + case (_, CountDown) => control.countDown() + }.ref) + pull(in) + } + + setHandler( + in, + new InHandler { + override def onPush(): Unit = pull(in) + }) + } + + logic -> control + } + } +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class StageActorRefBenchmark { + import StageActorRefBenchmark._ + + implicit val system: ActorSystem = ActorSystem("StageActorRefBenchmark") + + private var completion: Promise[Option[Any]] = _ + private var control: Control = _ + private var stageActorRef: ActorRef = _ + + @Setup + def setup(): Unit = { + SystemMaterializer(system).materializer + val materialized = Source.maybe[Any].toMat(Sink.fromGraph(new StageActorSink))(Keep.both).run() + completion = materialized._1 + control = materialized._2 + stageActorRef = control.stageActorRef + } + + @TearDown + def shutdown(): Unit = { + completion.trySuccess(None) + Await.result(system.terminate(), 5.seconds) + } + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def lazy_stage_actor_ref_tell_10k(): Unit = { + control.reset(OperationsPerInvocation) + var remaining = OperationsPerInvocation + while (remaining > 0) { + stageActorRef ! CountDown + remaining -= 1 + } + control.awaitDone() + } +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala index 372e98f3ea..b5bc5722d7 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala @@ -18,16 +18,19 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import org.apache.pekko +import pekko.actor.ActorPath import pekko.actor.ActorRef import pekko.actor.Kill import pekko.actor.NoSerializationVerificationNeeded import pekko.actor.PoisonPill import pekko.event.Logging import pekko.stream._ +import pekko.stream.impl.fusing.GraphInterpreter import pekko.stream.stage.GraphStageLogic import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.stream.stage.InHandler import pekko.stream.testkit.StreamSpec +import pekko.stream.testkit.scaladsl.TestSink import pekko.testkit.EventFilter import pekko.testkit.ImplicitSender import pekko.testkit.TestEvent @@ -181,6 +184,50 @@ class StageActorRefSpec extends StreamSpec with ImplicitSender { res.futureValue should ===(42) } + "run non-eager stage actor messages in the graph interpreter actor" in { + val (_, res) = Source.maybe[Int].toMat(sumStage(testActor))(Keep.both).run() + + val stageRef = expectMsgType[ActorRef] + stageRef ! AddAndTell(1) + expectMsg(1) + + stageRef ! ReportStageActorInterpreter + val location = expectMsgType[StageActorLocation] + + location.stageActorParent should ===(location.interpreter) + + stageRef ! StopNow + res.futureValue should ===(1) + } + + "keep eagerly materialized stage actors usable before stream demand" in { + val (ref, probe) = Source + .actorRef[Int]({ + case CompleteNow => CompletionStrategy.Immediately + }, PartialFunction.empty, bufferSize = 8, OverflowStrategy.fail) + .toMat(TestSink[Int]())(Keep.both) + .run() + + ref ! 1 + probe.request(1).expectNext(1) + ref ! CompleteNow + probe.expectComplete() + } + + "keep eagerly materialized stage actors attached to the stream supervisor" in { + val (source, res) = Source.maybe[Int].toMat(eagerLocationStage(testActor))(Keep.both).run() + + val stageRef = expectMsgType[ActorRef] + stageRef ! ReportEagerStageActorInterpreter + val location = expectMsgType[EagerStageActorLocation] + + location.stageActorParent should ===(location.supervisor) + location.stageActorParent should !==(location.interpreter) + + source.success(None) + res.futureValue should ===(0) + } + } } @@ -194,10 +241,19 @@ object StageActorRefSpec { case object BecomeStringEcho extends NoSerializationVerificationNeeded case object PullNow extends NoSerializationVerificationNeeded case object StopNow extends NoSerializationVerificationNeeded + case object ReportStageActorInterpreter extends NoSerializationVerificationNeeded + case object ReportEagerStageActorInterpreter extends NoSerializationVerificationNeeded + case object CompleteNow extends NoSerializationVerificationNeeded + final case class StageActorLocation(stageActorParent: ActorPath, interpreter: ActorPath) + extends NoSerializationVerificationNeeded + final case class EagerStageActorLocation(stageActorParent: ActorPath, supervisor: ActorPath, interpreter: ActorPath) + extends NoSerializationVerificationNeeded } import ControlProtocol._ + def eagerLocationStage(probe: ActorRef) = EagerLocationStage(probe) + case class SumTestStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] { val in = Inlet[Int]("IntSum.in") override val shape: SinkShape[Int] = SinkShape.of(in) @@ -216,10 +272,12 @@ object StageActorRefSpec { def behavior(m: (ActorRef, Any)): Unit = { m match { - case (_, Add(n)) => sum += n - case (_, PullNow) => pull(in) - case (sender, CallInitStageActorRef) => sender ! getStageActor(behavior).ref - case (_, BecomeStringEcho) => + case (_, Add(n)) => sum += n + case (_, PullNow) => pull(in) + case (sender, CallInitStageActorRef) => sender ! getStageActor(behavior).ref + case (sender, ReportStageActorInterpreter) => + sender ! StageActorLocation(stageActor.ref.path.parent, GraphInterpreter.currentInterpreter.context.path) + case (_, BecomeStringEcho) => getStageActor { case (theSender, msg) => theSender ! msg.toString } @@ -258,4 +316,52 @@ object StageActorRefSpec { } } + case class EagerLocationStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] { + val in = Inlet[Int]("EagerLocation.in") + override val shape: SinkShape[Int] = SinkShape.of(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = { + val p: Promise[Int] = Promise() + + val logic = new GraphStageLogic(shape) { + var stageRef: ActorRef = _ + var interpreterPath: ActorPath = _ + var supervisorPath: ActorPath = _ + + override def preStart(): Unit = { + interpreterPath = interpreter.context.path + supervisorPath = interpreter.materializer.supervisor.path + stageRef = getEagerStageActor(interpreter.materializer) { + case (sender, ReportEagerStageActorInterpreter) => + sender ! EagerStageActorLocation(stageRef.path.parent, supervisorPath, interpreterPath) + case _ => throw new RuntimeException("unexpected message") + }.ref + pull(in) + probe ! stageRef + } + + setHandler( + in, + new InHandler { + override def onPush(): Unit = { + p.trySuccess(grab(in)) + completeStage() + } + + override def onUpstreamFinish(): Unit = { + p.trySuccess(0) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + p.tryFailure(ex) + failStage(ex) + } + }) + } + + logic -> p.future + } + } + } diff --git a/stream/src/main/resources/reference.conf b/stream/src/main/resources/reference.conf index f4e4bd3193..059d28079a 100644 --- a/stream/src/main/resources/reference.conf +++ b/stream/src/main/resources/reference.conf @@ -83,6 +83,13 @@ pekko { # Allows to accelerate message processing that happening within same actor but keep system responsive. sync-processing-limit = 1000 + # Upper bound on stage-actor messages drained per envelope for non-eager `getStageActor` refs. Lazy + # stage actors batch external `tell` deliveries into a MPSC queue and elect a single drain envelope; + # this cap bounds the burst so that other BoundaryEvents (pull/push/complete) can still interleave + # naturally via the actor mailbox. Smaller = better fairness for upstream/downstream events; + # larger = better tell throughput. Must be >= 1. + stage-actor-drain-batch = 16 + debug { # Enables the fuzzing mode which increases the chance of race conditions # by aggressively reordering events and making certain operations more diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index 5e1aec586c..b676e1c29d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -14,6 +14,7 @@ package org.apache.pekko.stream.stage import java.util.Spliterator +import java.lang.invoke.{ MethodHandles, VarHandle } import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } import java.util.concurrent.atomic.AtomicReference @@ -27,6 +28,7 @@ import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor._ import pekko.annotation.InternalApi +import pekko.dispatch.AbstractNodeQueue import pekko.japi.function.{ Effect, Procedure } import pekko.stream._ import pekko.stream.Attributes.SourceLocation @@ -206,29 +208,61 @@ object GraphStageLogic { * * Not for user instantiation, use [[GraphStageLogic.getStageActor]]. */ - final class StageActor @InternalApi() private[pekko] ( + final class StageActor @InternalApi() private ( materializer: Materializer, - getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], initialReceive: StageActorRef.Receive, - name: String) { + name: String, + cell: ActorCell, + buildDispatch: StageActorRef.Receive => ((ActorRef, Any)) => Unit) { + + @InternalApi private[pekko] def this( + materializer: Materializer, + getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], + initialReceive: StageActorRef.Receive, + name: String) = + this( + materializer, + initialReceive, + name, + StageActor.localCell(materializer.supervisor, "Stream supervisor"), + receive => getAsyncCallback(receive).invoke) + + @InternalApi private[pekko] def this( + materializer: Materializer, + interpreter: GraphInterpreter, + logic: GraphStageLogic, + initialReceive: StageActorRef.Receive, + name: String) = + this( + materializer, + initialReceive, + name, + StageActor.localCell(interpreter.context, "Graph interpreter"), + // Coalesce per-tell mailbox traffic: N tells produce 1 AsyncInput envelope (amortized). + receive => + new StageActor.LazyDispatch( + interpreter, + logic, + receive.asInstanceOf[Any => Unit], + StageActor.drainBatchSize(materializer))) + + // Monomorphic Function1 captured once; JIT can inline the apply at the FunctionRef call site. + private val dispatch: ((ActorRef, Any)) => Unit = buildDispatch(internalReceive) - private val callback = getAsyncCallback(internalReceive) - - private def cell = materializer.supervisor match { - case ref: LocalActorRef => ref.underlying - case unknown => - throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") - } private val functionRef: FunctionRef = { - val f: (ActorRef, Any) => Unit = { - case (_, m @ (PoisonPill | Kill)) => - materializer.logger.warning( - "{} message sent to StageActor({}) will be ignored, since it is not a real Actor." + - "Use a custom message type to communicate with it instead.", - m, - functionRef.path) - case pair => callback.invoke(pair) - } + // Explicit (sender, msg) lambda (not a pattern-match Function2 literal) so the PoisonPill / Kill + // branch matches on `msg` directly and does not allocate a Tuple2. The regular branch still + // constructs one tuple per tell, as required by the `((ActorRef, Any)) => Unit` public Receive type. + val f: (ActorRef, Any) => Unit = (sender, msg) => + msg match { + case PoisonPill | Kill => + materializer.logger.warning( + "{} message sent to StageActor({}) will be ignored, since it is not a real Actor." + + "Use a custom message type to communicate with it instead.", + msg, + functionRef.path) + case _ => dispatch((sender, msg)) + } cell.addFunctionRef(f, name) } @@ -275,6 +309,124 @@ object GraphStageLogic { type Receive = ((ActorRef, Any)) => Unit } + private object StageActor { + def localCell(ref: ActorRef, description: String): ActorCell = + ref match { + case ref: LocalActorRef => ref.underlying + case ref: RepointableActorRef => + ref.underlying match { + case cell: ActorCell => cell + case unknown => + throw new IllegalStateException(s"$description must be a local actor, was [${unknown.getClass.getName}]") + } + case unknown => + throw new IllegalStateException(s"$description must be a local actor, was [${unknown.getClass.getName}]") + } + + /** + * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the materializer's ActorSystem config. + * Called once per lazy StageActor construction (never on the hot path). Bounded to `>= 1`. + */ + def drainBatchSize(materializer: Materializer): Int = + Math.max(1, materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch")) + + private final val SchedStateIdle: Int = 0 + private final val SchedStateScheduled: Int = 1 + + /** + * VarHandle for [[LazyDispatch.scheduledState]]: one static handle shared across all instances; the + * per-instance state is just a primitive `int` on the LazyDispatch object — no AtomicBoolean wrapper. + * `privateLookupIn` grants access to LazyDispatch's private fields from this (same-package) companion; + * this is the same pattern used by [[AbstractNodeQueue]] itself. + */ + private val schedStateHandle: VarHandle = { + val lookup = MethodHandles.privateLookupIn(classOf[LazyDispatch], MethodHandles.lookup()) + lookup.findVarHandle(classOf[LazyDispatch], "scheduledState", java.lang.Integer.TYPE) + } + + /** + * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and elect a single drain via + * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox enqueue. The drain runs on the + * interpreter thread, polls in a tight loop bounded by `drainBatchSize`, then either publishes IDLE + * (with a recheck for the publish-window race) or re-schedules another envelope to yield to other + * BoundaryEvents. + * + * JIT/GC notes: + * - `final class` + monomorphic per-StageActor instance → JIT devirtualizes the apply at the + * FunctionRef call site. + * - Extends [[AbstractNodeQueue]] directly so the queue head atomic and the dispatch function share one + * object (one allocation per StageActor, one fewer field deref on the producer hot path). + * - `scheduledState` is a plain `@volatile var Int` updated via a shared `VarHandle` (companion-static) + * so the per-instance state cost is one `int` field instead of a separate AtomicBoolean object. + * - All hot-path state is `private[this]` → direct field access, no accessor methods. + * - `drainBatchSize` is read once into a stack-local at the top of `drain` so the JIT can treat the loop + * bound as a constant. + * - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) + 1 Tuple2 (~24 bytes). The Tuple2 + * is forced by the public `StageActorRef.Receive` type. No AsyncInput / Envelope per tell — those are + * amortized across the batch. + */ + private final class LazyDispatch( + interpreter: GraphInterpreter, + logic: GraphStageLogic, + handler: Any => Unit, + drainBatchSize: Int) + extends AbstractNodeQueue[(ActorRef, Any)] + with (((ActorRef, Any)) => Unit) { + + // Updated only via `schedStateHandle` (compareAndSet / setRelease). The compiler can't see those + // reflective accesses, hence the @nowarn for "never updated" and "never used". + @nowarn("cat=unused-privates") + @nowarn("msg=never updated") + @volatile private var scheduledState: Int = SchedStateIdle + + // Reused across all drain batches; allocated once at construction. + private[this] val drainCallback: Any => Unit = (_: Any) => drain() + + override def apply(pair: (ActorRef, Any)): Unit = { + add(pair) // Vyukov producer path: getAndSet + release-store, no CAS spin + // Double-checked CAS: uncontended fast path is one acquire-load; only the IDLE->SCHEDULED winner + // pays a CAS + mailbox push. + if (schedStateHandle.getAcquire(this).asInstanceOf[Int] == SchedStateIdle && + schedStateHandle.compareAndSet(this, SchedStateIdle, SchedStateScheduled)) + scheduleDrain() + } + + private def scheduleDrain(): Unit = + // 1 AsyncInput + 1 Envelope per drain batch (amortized across up to drainBatchSize tells). + interpreter.onAsyncInput(logic, null, NoPromise, drainCallback) + + private def drain(): Unit = { + val limit = drainBatchSize // hoisted to a local so JIT treats it as a loop-invariant constant + var processed = 0 + while (processed < limit) { + if (interpreter.isStageCompleted(logic)) { + // Stage completed mid-drain; drop the remainder (matches the original per-tell behaviour where + // runAsyncInput silently skipped completed stages). Don't reschedule — no future drain will run. + while (poll() ne null) () + schedStateHandle.setRelease(this, SchedStateIdle) + return + } + val item = poll() + if (item eq null) { + schedStateHandle.setRelease(this, SchedStateIdle) + // Recheck race: a producer may have added between `poll == null` and the IDLE publish above. + // That producer saw scheduled=SCHEDULED and skipped the mailbox send, so we must re-elect. + if (!isEmpty && + schedStateHandle.compareAndSet(this, SchedStateIdle, SchedStateScheduled)) + scheduleDrain() + return + } + handler(item) + processed += 1 + } + // Hit batch cap with items potentially still queued. Re-schedule another envelope so other + // BoundaryEvents (pull/push/complete) can interleave via the actor mailbox. `scheduledState` stays + // SCHEDULED: concurrent producers correctly observe SCHEDULED and skip; the new envelope will drain. + scheduleDrain() + } + } + } + /** * Internal API * @@ -1339,8 +1491,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Initialize a [[GraphStageLogic.StageActorRef]] which can be used to interact with from the outside world "as-if" a [[pekko.actor.Actor]]. - * The messages are looped through the [[getAsyncCallback]] mechanism of [[GraphStage]] so they are safe to modify - * internal state of this operator. + * The messages are delivered through the owning stream interpreter so they are safe to modify internal state of this + * operator. * * This method must (the earliest) be called after the [[GraphStageLogic]] constructor has finished running, * for example from the [[preStart]] callback the graph operator logic provides. @@ -1358,7 +1510,20 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * @return minimal actor with watch method */ final protected def getStageActor(receive: ((ActorRef, Any)) => Unit): StageActor = - getEagerStageActor(interpreter.materializer)(receive) + _stageActor match { + case null => + val currentInterpreter = interpreter + _stageActor = new StageActor( + currentInterpreter.materializer, + currentInterpreter, + this, + receive, + stageActorName) + _stageActor + case existing => + existing.become(receive) + existing + } /** * INTERNAL API @@ -1382,7 +1547,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Override and return a name to be given to the StageActor of this operator. * * This method will be only invoked and used once, during the first [[getStageActor]] - * invocation whichc reates the actor, since subsequent `getStageActors` calls function + * invocation which creates the actor, since subsequent `getStageActors` calls function * like `become`, rather than creating new actors. * * Returns an empty string by default, which means that the name will a unique generated String (e.g. "$$a"). --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
