This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch issue-2860-remove-dead-actorprocessor in repository https://gitbox.apache.org/repos/asf/pekko.git
commit c668685c925a5bee184c839dabb87d207094108b Author: He-Pin <[email protected]> AuthorDate: Mon Jun 1 03:20:17 2026 +0800 chore(stream): remove dead ActorProcessor, ActorProcessorImpl, and ExposedPublisherReceive Motivation: These @InternalApi classes were unused remnants of the legacy actor-based stream runtime. ActorProcessor and ActorProcessorImpl were never instantiated or extended anywhere in the codebase. ExposedPublisherReceive was only used by the dead ActorProcessorImpl. Modification: - Remove ActorProcessor class and object (lines 31-60) - Remove ActorProcessorImpl class and object (lines 265-330) - Remove ExposedPublisherReceive.scala entirely (43 lines) - Clean up unused imports in ActorProcessor.scala - Add MiMa exclusion filters for the removed @InternalApi classes Result: -147 lines of dead code removed. BatchingInputBuffer and SimpleOutputs remain (used by FanIn/FanOut for TLSActor). Tests: - stream/compile: passes - stream/mimaReportBinaryIssues: passes (with exclusions) - stream-tests (FlowSpec, SourceSpec, SinkSpec, PublisherSinkSpec): 166/166 pass - TLS tests: pre-existing macOS BindException failures (same on main) Refs: #2860 --- .../remove-dead-actorprocessor.excludes | 26 +++++ .../apache/pekko/stream/impl/ActorProcessor.scala | 105 +-------------------- .../stream/impl/ExposedPublisherReceive.scala | 43 --------- 3 files changed, 27 insertions(+), 147 deletions(-) diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-dead-actorprocessor.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-dead-actorprocessor.excludes new file mode 100644 index 0000000000..44a981c516 --- /dev/null +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-dead-actorprocessor.excludes @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Remove dead ActorProcessor, ActorProcessorImpl, and ExposedPublisherReceive +# These @InternalApi classes were unused remnants of the legacy actor-based stream runtime. +# Only BatchingInputBuffer and SimpleOutputs remain (used by FanIn/FanOut for TLSActor). +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessor") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessor$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessorImpl") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessorImpl$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessorImpl$SubscriptionTimeout$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ExposedPublisherReceive") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala index 101ce73b83..9c159f622d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala @@ -19,45 +19,9 @@ import org.apache.pekko import pekko.actor._ import pekko.annotation.InternalApi import pekko.event.Logging -import pekko.stream.{ AbruptTerminationException, Attributes } -import pekko.stream.ActorAttributes import pekko.stream.impl.ActorSubscriberMessage.{ OnComplete, OnError, OnNext, OnSubscribe } -import org.reactivestreams.{ Processor, Subscriber, Subscription } - -/** - * INTERNAL API - */ -@InternalApi private[pekko] object ActorProcessor { - - def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { - val p = new ActorProcessor[I, O](impl) - // Resolve cyclic dependency with actor. This MUST be the first message no matter what. - impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) - p - } -} - -/** - * INTERNAL API - */ -@InternalApi private[pekko] class ActorProcessor[I, O](impl: ActorRef) - extends ActorPublisher[O](impl) - with Processor[I, O] { - override def onSubscribe(s: Subscription): Unit = { - ReactiveStreamsCompliance.requireNonNullSubscription(s) - impl ! OnSubscribe(s) - } - override def onError(t: Throwable): Unit = { - ReactiveStreamsCompliance.requireNonNullException(t) - impl ! OnError(t) - } - override def onComplete(): Unit = impl ! OnComplete - override def onNext(elem: I): Unit = { - ReactiveStreamsCompliance.requireNonNullElement(elem) - impl ! OnNext(elem) - } -} +import org.reactivestreams.{ Subscriber, Subscription } /** * INTERNAL API @@ -261,70 +225,3 @@ import org.reactivestreams.{ Processor, Subscriber, Subscription } } } - -private[pekko] object ActorProcessorImpl { - case object SubscriptionTimeout -} - -/** - * INTERNAL API - */ -@InternalApi private[pekko] abstract class ActorProcessorImpl(attributes: Attributes) - extends Actor - with ActorLogging - with Pump { - - private val debugLoggingEnabled = attributes.mandatoryAttribute[ActorAttributes.DebugLogging].enabled - - protected val primaryInputs: Inputs = { - val initialInputBufferSize = attributes.mandatoryAttribute[Attributes.InputBuffer].initial - new BatchingInputBuffer(initialInputBufferSize, this) { - override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e) - } - } - - protected val primaryOutputs: Outputs = new SimpleOutputs(self, this) - def subTimeoutHandling: Receive - - /** - * Subclass may override [[#activeReceive]] - */ - final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) { - override def receiveExposedPublisher(ep: ExposedPublisher): Unit = { - primaryOutputs.subreceive(ep) - context.become(activeReceive) - } - } - - def activeReceive: Receive = - primaryInputs.subreceive.orElse[Any, Unit](primaryOutputs.subreceive).orElse(subTimeoutHandling) - - protected def onError(e: Throwable): Unit = fail(e) - - protected def fail(e: Throwable): Unit = { - if (debugLoggingEnabled) - log.debug("fail due to: {}", e.getMessage) - primaryInputs.cancel() - primaryOutputs.error(e) - context.stop(self) - } - - override def pumpFinished(): Unit = { - primaryInputs.cancel() - primaryOutputs.complete() - context.stop(self) - } - - override def pumpFailed(e: Throwable): Unit = fail(e) - - override def postStop(): Unit = { - primaryInputs.cancel() - primaryOutputs.error(AbruptTerminationException(self)) - } - - override def postRestart(reason: Throwable): Unit = { - super.postRestart(reason) - throw new IllegalStateException("This actor cannot be restarted", reason) - } - -} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/ExposedPublisherReceive.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/ExposedPublisherReceive.scala deleted file mode 100644 index 50c4cbe1d8..0000000000 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/ExposedPublisherReceive.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com> - */ - -package org.apache.pekko.stream.impl - -import org.apache.pekko -import pekko.actor.Actor -import pekko.annotation.InternalApi - -/** - * INTERNAL API - */ -@InternalApi private[pekko] abstract class ExposedPublisherReceive(activeReceive: Actor.Receive, unhandled: Any => Unit) - extends Actor.Receive { - private var stash = List.empty[Any] - - def isDefinedAt(o: Any): Boolean = true - - def apply(o: Any): Unit = o match { - case ep: ExposedPublisher => - receiveExposedPublisher(ep) - if (stash.nonEmpty) { - // we don't use sender() so this is alright - stash.reverse.foreach { msg => - activeReceive.applyOrElse(msg, unhandled) - } - } - case other => - stash ::= other - } - - def receiveExposedPublisher(ep: ExposedPublisher): Unit -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
