This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new bbb1e8efc9 chore(stream): remove dead ActorProcessor,
ActorProcessorImpl, and ExposedPublisherReceive (#3027)
bbb1e8efc9 is described below
commit bbb1e8efc9bf094041117f619980d99350b0378e
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Jun 1 04:00:35 2026 +0800
chore(stream): remove dead ActorProcessor, ActorProcessorImpl, and
ExposedPublisherReceive (#3027)
Motivation:
These @InternalApi classes were unused remnants of the legacy actor-based
stream runtime. ActorProcessor and ActorProcessorImpl were never
instantiated or extended anywhere in the codebase. ExposedPublisherReceive
was only used by the dead ActorProcessorImpl.
Modification:
- Remove ActorProcessor class and object (lines 31-60)
- Remove ActorProcessorImpl class and object (lines 265-330)
- Remove ExposedPublisherReceive.scala entirely (43 lines)
- Clean up unused imports in ActorProcessor.scala
- Add MiMa exclusion filters for the removed @InternalApi classes
Result:
-147 lines of dead code removed. BatchingInputBuffer and SimpleOutputs
remain (used by FanIn/FanOut for TLSActor).
Tests:
- stream/compile: passes
- stream/mimaReportBinaryIssues: passes (with exclusions)
- stream-tests (FlowSpec, SourceSpec, SinkSpec, PublisherSinkSpec): 166/166
pass
- TLS tests: pre-existing macOS BindException failures (same on main)
Refs: #2860
---
.../remove-dead-actorprocessor.excludes | 26 +++++
.../apache/pekko/stream/impl/ActorProcessor.scala | 105 +--------------------
.../stream/impl/ExposedPublisherReceive.scala | 43 ---------
3 files changed, 27 insertions(+), 147 deletions(-)
diff --git
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-dead-actorprocessor.excludes
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-dead-actorprocessor.excludes
new file mode 100644
index 0000000000..44a981c516
--- /dev/null
+++
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-dead-actorprocessor.excludes
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Remove dead ActorProcessor, ActorProcessorImpl, and ExposedPublisherReceive
+# These @InternalApi classes were unused remnants of the legacy actor-based
stream runtime.
+# Only BatchingInputBuffer and SimpleOutputs remain (used by FanIn/FanOut for
TLSActor).
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessor")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessor$")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessorImpl")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessorImpl$")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessorImpl$SubscriptionTimeout$")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ExposedPublisherReceive")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala
index 101ce73b83..9c159f622d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala
@@ -19,45 +19,9 @@ import org.apache.pekko
import pekko.actor._
import pekko.annotation.InternalApi
import pekko.event.Logging
-import pekko.stream.{ AbruptTerminationException, Attributes }
-import pekko.stream.ActorAttributes
import pekko.stream.impl.ActorSubscriberMessage.{ OnComplete, OnError, OnNext,
OnSubscribe }
-import org.reactivestreams.{ Processor, Subscriber, Subscription }
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] object ActorProcessor {
-
- def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
- val p = new ActorProcessor[I, O](impl)
- // Resolve cyclic dependency with actor. This MUST be the first message no
matter what.
- impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
- p
- }
-}
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] class ActorProcessor[I, O](impl: ActorRef)
- extends ActorPublisher[O](impl)
- with Processor[I, O] {
- override def onSubscribe(s: Subscription): Unit = {
- ReactiveStreamsCompliance.requireNonNullSubscription(s)
- impl ! OnSubscribe(s)
- }
- override def onError(t: Throwable): Unit = {
- ReactiveStreamsCompliance.requireNonNullException(t)
- impl ! OnError(t)
- }
- override def onComplete(): Unit = impl ! OnComplete
- override def onNext(elem: I): Unit = {
- ReactiveStreamsCompliance.requireNonNullElement(elem)
- impl ! OnNext(elem)
- }
-}
+import org.reactivestreams.{ Subscriber, Subscription }
/**
* INTERNAL API
@@ -261,70 +225,3 @@ import org.reactivestreams.{ Processor, Subscriber,
Subscription }
}
}
-
-private[pekko] object ActorProcessorImpl {
- case object SubscriptionTimeout
-}
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] abstract class ActorProcessorImpl(attributes:
Attributes)
- extends Actor
- with ActorLogging
- with Pump {
-
- private val debugLoggingEnabled =
attributes.mandatoryAttribute[ActorAttributes.DebugLogging].enabled
-
- protected val primaryInputs: Inputs = {
- val initialInputBufferSize =
attributes.mandatoryAttribute[Attributes.InputBuffer].initial
- new BatchingInputBuffer(initialInputBufferSize, this) {
- override def inputOnError(e: Throwable): Unit =
ActorProcessorImpl.this.onError(e)
- }
- }
-
- protected val primaryOutputs: Outputs = new SimpleOutputs(self, this)
- def subTimeoutHandling: Receive
-
- /**
- * Subclass may override [[#activeReceive]]
- */
- final override def receive = new ExposedPublisherReceive(activeReceive,
unhandled) {
- override def receiveExposedPublisher(ep: ExposedPublisher): Unit = {
- primaryOutputs.subreceive(ep)
- context.become(activeReceive)
- }
- }
-
- def activeReceive: Receive =
- primaryInputs.subreceive.orElse[Any,
Unit](primaryOutputs.subreceive).orElse(subTimeoutHandling)
-
- protected def onError(e: Throwable): Unit = fail(e)
-
- protected def fail(e: Throwable): Unit = {
- if (debugLoggingEnabled)
- log.debug("fail due to: {}", e.getMessage)
- primaryInputs.cancel()
- primaryOutputs.error(e)
- context.stop(self)
- }
-
- override def pumpFinished(): Unit = {
- primaryInputs.cancel()
- primaryOutputs.complete()
- context.stop(self)
- }
-
- override def pumpFailed(e: Throwable): Unit = fail(e)
-
- override def postStop(): Unit = {
- primaryInputs.cancel()
- primaryOutputs.error(AbruptTerminationException(self))
- }
-
- override def postRestart(reason: Throwable): Unit = {
- super.postRestart(reason)
- throw new IllegalStateException("This actor cannot be restarted", reason)
- }
-
-}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/ExposedPublisherReceive.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/ExposedPublisherReceive.scala
deleted file mode 100644
index 50c4cbe1d8..0000000000
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/ExposedPublisherReceive.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.stream.impl
-
-import org.apache.pekko
-import pekko.actor.Actor
-import pekko.annotation.InternalApi
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] abstract class
ExposedPublisherReceive(activeReceive: Actor.Receive, unhandled: Any => Unit)
- extends Actor.Receive {
- private var stash = List.empty[Any]
-
- def isDefinedAt(o: Any): Boolean = true
-
- def apply(o: Any): Unit = o match {
- case ep: ExposedPublisher =>
- receiveExposedPublisher(ep)
- if (stash.nonEmpty) {
- // we don't use sender() so this is alright
- stash.reverse.foreach { msg =>
- activeReceive.applyOrElse(msg, unhandled)
- }
- }
- case other =>
- stash ::= other
- }
-
- def receiveExposedPublisher(ep: ExposedPublisher): Unit
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]