This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch issue-2860-remove-dead-actorprocessor
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit c668685c925a5bee184c839dabb87d207094108b
Author: He-Pin <[email protected]>
AuthorDate: Mon Jun 1 03:20:17 2026 +0800

    chore(stream): remove dead ActorProcessor, ActorProcessorImpl, and 
ExposedPublisherReceive
    
    Motivation:
    These @InternalApi classes were unused remnants of the legacy actor-based
    stream runtime. ActorProcessor and ActorProcessorImpl were never
    instantiated or extended anywhere in the codebase. ExposedPublisherReceive
    was only used by the dead ActorProcessorImpl.
    
    Modification:
    - Remove ActorProcessor class and object (lines 31-60)
    - Remove ActorProcessorImpl class and object (lines 265-330)
    - Remove ExposedPublisherReceive.scala entirely (43 lines)
    - Clean up unused imports in ActorProcessor.scala
    - Add MiMa exclusion filters for the removed @InternalApi classes
    
    Result:
    -147 lines of dead code removed. BatchingInputBuffer and SimpleOutputs
    remain (used by FanIn/FanOut for TLSActor).
    
    Tests:
    - stream/compile: passes
    - stream/mimaReportBinaryIssues: passes (with exclusions)
    - stream-tests (FlowSpec, SourceSpec, SinkSpec, PublisherSinkSpec): 166/166 
pass
    - TLS tests: pre-existing macOS BindException failures (same on main)
    
    Refs: #2860
---
 .../remove-dead-actorprocessor.excludes            |  26 +++++
 .../apache/pekko/stream/impl/ActorProcessor.scala  | 105 +--------------------
 .../stream/impl/ExposedPublisherReceive.scala      |  43 ---------
 3 files changed, 27 insertions(+), 147 deletions(-)

diff --git 
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-dead-actorprocessor.excludes
 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-dead-actorprocessor.excludes
new file mode 100644
index 0000000000..44a981c516
--- /dev/null
+++ 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-dead-actorprocessor.excludes
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Remove dead ActorProcessor, ActorProcessorImpl, and ExposedPublisherReceive
+# These @InternalApi classes were unused remnants of the legacy actor-based 
stream runtime.
+# Only BatchingInputBuffer and SimpleOutputs remain (used by FanIn/FanOut for 
TLSActor).
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessor")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessor$")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessorImpl")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessorImpl$")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ActorProcessorImpl$SubscriptionTimeout$")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.ExposedPublisherReceive")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala
index 101ce73b83..9c159f622d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorProcessor.scala
@@ -19,45 +19,9 @@ import org.apache.pekko
 import pekko.actor._
 import pekko.annotation.InternalApi
 import pekko.event.Logging
-import pekko.stream.{ AbruptTerminationException, Attributes }
-import pekko.stream.ActorAttributes
 import pekko.stream.impl.ActorSubscriberMessage.{ OnComplete, OnError, OnNext, 
OnSubscribe }
 
-import org.reactivestreams.{ Processor, Subscriber, Subscription }
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] object ActorProcessor {
-
-  def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
-    val p = new ActorProcessor[I, O](impl)
-    // Resolve cyclic dependency with actor. This MUST be the first message no 
matter what.
-    impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
-    p
-  }
-}
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] class ActorProcessor[I, O](impl: ActorRef)
-    extends ActorPublisher[O](impl)
-    with Processor[I, O] {
-  override def onSubscribe(s: Subscription): Unit = {
-    ReactiveStreamsCompliance.requireNonNullSubscription(s)
-    impl ! OnSubscribe(s)
-  }
-  override def onError(t: Throwable): Unit = {
-    ReactiveStreamsCompliance.requireNonNullException(t)
-    impl ! OnError(t)
-  }
-  override def onComplete(): Unit = impl ! OnComplete
-  override def onNext(elem: I): Unit = {
-    ReactiveStreamsCompliance.requireNonNullElement(elem)
-    impl ! OnNext(elem)
-  }
-}
+import org.reactivestreams.{ Subscriber, Subscription }
 
 /**
  * INTERNAL API
@@ -261,70 +225,3 @@ import org.reactivestreams.{ Processor, Subscriber, 
Subscription }
   }
 
 }
-
-private[pekko] object ActorProcessorImpl {
-  case object SubscriptionTimeout
-}
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] abstract class ActorProcessorImpl(attributes: 
Attributes)
-    extends Actor
-    with ActorLogging
-    with Pump {
-
-  private val debugLoggingEnabled = 
attributes.mandatoryAttribute[ActorAttributes.DebugLogging].enabled
-
-  protected val primaryInputs: Inputs = {
-    val initialInputBufferSize = 
attributes.mandatoryAttribute[Attributes.InputBuffer].initial
-    new BatchingInputBuffer(initialInputBufferSize, this) {
-      override def inputOnError(e: Throwable): Unit = 
ActorProcessorImpl.this.onError(e)
-    }
-  }
-
-  protected val primaryOutputs: Outputs = new SimpleOutputs(self, this)
-  def subTimeoutHandling: Receive
-
-  /**
-   * Subclass may override [[#activeReceive]]
-   */
-  final override def receive = new ExposedPublisherReceive(activeReceive, 
unhandled) {
-    override def receiveExposedPublisher(ep: ExposedPublisher): Unit = {
-      primaryOutputs.subreceive(ep)
-      context.become(activeReceive)
-    }
-  }
-
-  def activeReceive: Receive =
-    primaryInputs.subreceive.orElse[Any, 
Unit](primaryOutputs.subreceive).orElse(subTimeoutHandling)
-
-  protected def onError(e: Throwable): Unit = fail(e)
-
-  protected def fail(e: Throwable): Unit = {
-    if (debugLoggingEnabled)
-      log.debug("fail due to: {}", e.getMessage)
-    primaryInputs.cancel()
-    primaryOutputs.error(e)
-    context.stop(self)
-  }
-
-  override def pumpFinished(): Unit = {
-    primaryInputs.cancel()
-    primaryOutputs.complete()
-    context.stop(self)
-  }
-
-  override def pumpFailed(e: Throwable): Unit = fail(e)
-
-  override def postStop(): Unit = {
-    primaryInputs.cancel()
-    primaryOutputs.error(AbruptTerminationException(self))
-  }
-
-  override def postRestart(reason: Throwable): Unit = {
-    super.postRestart(reason)
-    throw new IllegalStateException("This actor cannot be restarted", reason)
-  }
-
-}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/ExposedPublisherReceive.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/ExposedPublisherReceive.scala
deleted file mode 100644
index 50c4cbe1d8..0000000000
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/ExposedPublisherReceive.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.stream.impl
-
-import org.apache.pekko
-import pekko.actor.Actor
-import pekko.annotation.InternalApi
-
-/**
- * INTERNAL API
- */
-@InternalApi private[pekko] abstract class 
ExposedPublisherReceive(activeReceive: Actor.Receive, unhandled: Any => Unit)
-    extends Actor.Receive {
-  private var stash = List.empty[Any]
-
-  def isDefinedAt(o: Any): Boolean = true
-
-  def apply(o: Any): Unit = o match {
-    case ep: ExposedPublisher =>
-      receiveExposedPublisher(ep)
-      if (stash.nonEmpty) {
-        // we don't use sender() so this is alright
-        stash.reverse.foreach { msg =>
-          activeReceive.applyOrElse(msg, unhandled)
-        }
-      }
-    case other =>
-      stash ::= other
-  }
-
-  def receiveExposedPublisher(ep: ExposedPublisher): Unit
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to