This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch issue-2860-tls-graphstage-path
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 15ca6d209e5c74b07fa61989a7a994c3f4aeb5eb
Author: He-Pin <[email protected]>
AuthorDate: Mon Apr 20 11:54:07 2026 +0800

    feat(stream): add clean-room GraphStage TLS implementation (TlsGraphStage)
    
    Motivation:
    The legacy TLS substrate used an Actor-based pump (TLSActor) that is
    difficult to maintain and cannot benefit from modern GraphStage fusion
    or the async-island pipelining model.  This commit introduces a
    clean-room GraphStage re-implementation (TlsGraphStage) that is
    equivalent in correctness, guarded by a JVM-level config switch so the
    two implementations run in parallel and can be toggled at any time.
    
    Modification:
    - stream/.../impl/io/TlsGraphStage.scala (new): ~1080-line clean-room
      GraphStage BidiFlow wrapping SSLEngine.
      * Phase ADT (Bidirectional/FlushingOutbound/AwaitingClose/…) drives
        the outbound close sequence correctly.
      * Two-timer warmup (InitialPumpTimer + SecondPumpTimer) ensures that
        Source.failed injected on cipherIn is fully processed before the
        first pump() call, avoiding spurious TLS ClientHello bytes in error
        scenarios.
      * Adaptive outbound batching: accumulates transport-out bytes up to
        MaxPendingTransportOutBytes (32 KiB) for small-message throughput;
        large writes flush immediately.
      * NEED_TASK delegation tasks are run synchronously (required by the
        SSLEngine contract).
      * Duplicate-wrap protection counter guards against a known JDK bug
        where wrap() returns OK + NEED_WRAP with no output bytes.
    
    - stream/.../scaladsl/TLS.scala: JVM-level feature switch routing to
      TlsGraphStage (with mandatory asyncBoundary) vs the legacy Actor path.
    
    - stream/src/main/resources/reference.conf: added
        pekko.stream.materializer.tls.use-legacy-actor = true
      inside the materializer block (correct HOCON path).
    
    - stream-tests/.../io/TlsGraphStageSpec.scala (new): 11 targeted tests
      covering happy-path loopback, early cipherIn failure (Source.failed),
      server-auth, mutual TLS, session renegotiation, and IgnoreComplete /
      IgnoreBoth closing modes.
    
    Result:
    - 11/11 TlsGraphStageSpec tests pass.
    - 111/111 TlsSpec tests pass (legacy path unchanged, config loaded).
    - sbt stream/mimaReportBinaryIssues passes (no binary incompatibilities).
    - scalafmt clean.
    
    References:
    - Closes part of https://github.com/apache/pekko/issues/2860
    
    Co-authored-by: Copilot <[email protected]>
---
 .../apache/pekko/stream/io/TlsGraphStageSpec.scala |  304 ++++++
 stream/src/main/resources/reference.conf           |   18 +
 .../pekko/stream/impl/io/TlsGraphStage.scala       | 1084 ++++++++++++++++++++
 .../org/apache/pekko/stream/scaladsl/TLS.scala     |   13 +-
 4 files changed, 1417 insertions(+), 2 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageSpec.scala
new file mode 100644
index 0000000000..47016c7158
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageSpec.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.io
+
+import java.util.concurrent.atomic.AtomicBoolean
+import javax.net.ssl._
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.{ Failure, Success }
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.stream._
+import pekko.stream.TLSProtocol._
+import pekko.stream.impl.io.TlsGraphStage
+import pekko.stream.scaladsl._
+import pekko.stream.scaladsl.GraphDSL.Implicits._
+import pekko.stream.testkit._
+import pekko.testkit.WithLogCapturing
+import pekko.util.ByteString
+
+/**
+ * Test suite that exercises [[TlsGraphStage]] directly, bypassing the 
JVM-level
+ * `use-legacy-actor` feature switch.
+ *
+ * Note: `pekko.stream.materializer.tls.use-legacy-actor` is a JVM-level lazy 
val
+ * (read from `ConfigFactory.load()` at first access).  It cannot be overridden
+ * per ActorSystem.  This spec instantiates `TlsGraphStage` directly to ensure
+ * the GraphStage code path is exercised regardless of the JVM default.
+ *
+ * Wiring pattern for loopback echo tests:
+ *   Source[SslTlsOutbound] → clientTls.in1 (plainIn)
+ *   clientTls.out1 (cipherOut) ─┐  (via atop)
+ *   serverTls.in2  (cipherIn)  ←┘
+ *   serverTls.out2 (plainOut)  → echoFlow → serverTls.in1 (plainIn)
+ *   serverTls.out1 (cipherOut) ─┐  (via atop / reversed)
+ *   clientTls.in2  (cipherIn)  ←┘
+ *   clientTls.out2 (plainOut)  → Sink[SslTlsInbound]
+ */
+class TlsGraphStageSpec extends StreamSpec(TlsSpec.configOverrides) with 
WithLogCapturing {
+
+  import TlsSpec._
+
+  val sslContext12: SSLContext = initSslContext("TLSv1.2")
+  val sslContext13: SSLContext = initSslContext("TLSv1.3")
+
+  /**
+   * Create a BidiFlow backed by [[TlsGraphStage]] directly.
+   *
+   * NOTE: asyncBoundary is NOT added here. Adding it would cause Source.failed
+   * failure propagation to be asynchronous (failure crosses the async island
+   * boundary as a message), which means the warmup timer fires before the 
failure
+   * arrives, allowing handshake bytes to be emitted before the error is 
processed.
+   * Tests that need asyncBoundary (loopback, verifySession) add it explicitly.
+   */
+  private def stageFlow(
+      ctx: SSLContext,
+      ciphers: Set[String],
+      clientMode: Boolean,
+      closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, 
SslTlsInbound, NotUsed] = {
+    val stage = new TlsGraphStage(
+      () => {
+        val engine = ctx.createSSLEngine()
+        engine.setUseClientMode(clientMode)
+        if (ciphers.nonEmpty) engine.setEnabledCipherSuites(ciphers.toArray)
+        engine
+      },
+      _ => Success(()),
+      closing)
+    BidiFlow.fromGraph(stage)
+  }
+
+  /**
+   * Echo flow used as the "application logic" in loopback tests.
+   * The server echoes received decrypted bytes back as encrypted outbound 
bytes.
+   */
+  private val echoFlow: Flow[SslTlsInbound, SslTlsOutbound, NotUsed] =
+    Flow[SslTlsInbound].collect { case SessionBytes(_, bytes) => 
SendBytes(bytes) }
+
+  /**
+   * Run a full loopback: payloads → client TLS → server TLS → echoFlow → 
client plainOut.
+   *
+   * Returns the concatenated [[SessionBytes]] payloads received by the client.
+   *
+   * Uses the cancel-based collection pattern (scan + dropWhile + Sink.head) 
so the
+   * stream terminates as soon as enough bytes have been received — without 
waiting
+   * for a TLS close_notify.  This is required for [[IgnoreComplete]] mode 
where the
+   * outbound Source exhausts but no close_notify is sent (by design), meaning
+   * Sink.seq would block forever.
+   */
+  private def runLoopback(
+      ctx: SSLContext,
+      ciphers: Set[String],
+      payloads: List[ByteString],
+      closing: TLSClosing = IgnoreComplete): ByteString = {
+    val totalBytes = payloads.foldLeft(0)(_ + _.length)
+    // We expect at least 1 byte back; for empty payloads still wait for the
+    // handshake to complete (represented by a SessionBytes(_, 
ByteString.empty)).
+    val expectedAtLeast = math.max(1, totalBytes)
+    // Add asyncBoundary to match production configuration (TLS.scala adds it 
when
+    // using the GraphStage path). This exercises pipelining between 
encrypt/decrypt
+    // and I/O, which is the primary performance reason for the async island.
+    val client = stageFlow(ctx, ciphers, clientMode = true, 
closing).addAttributes(Attributes.asyncBoundary)
+    val server = stageFlow(ctx, ciphers, clientMode = false, 
closing).addAttributes(Attributes.asyncBoundary)
+    Await.result(
+      Source(payloads.map(SendBytes(_)))
+        .via(client.atop(server.reversed).join(echoFlow))
+        .collect { case SessionBytes(_, b) if b.nonEmpty => b }
+        .scan(ByteString.empty)(_ ++ _)
+        .dropWhile(_.length < expectedAtLeast)
+        .runWith(Sink.head),
+      15.seconds)
+  }
+
+  "TlsGraphStage" must {
+
+    // ──────────────────────────────────────────────────────────────────────
+    // Basic bidirectional data flow
+    // ──────────────────────────────────────────────────────────────────────
+
+    "pass a small payload through TLS 1.2" in {
+      val bytes = ByteString("Hello TLS 1.2")
+      runLoopback(sslContext12, TLS12Ciphers, List(bytes)) shouldEqual bytes
+    }
+
+    "pass a small payload through TLS 1.3" in {
+      val bytes = ByteString("Hello TLS 1.3")
+      runLoopback(sslContext13, TLS13Ciphers, List(bytes)) shouldEqual bytes
+    }
+
+    "handle multiple small payloads" in {
+      val payloads = (1 to 20).map(i => ByteString(s"payload-$i")).toList
+      runLoopback(sslContext12, TLS12Ciphers, payloads) shouldEqual 
payloads.reduce(_ ++ _)
+    }
+
+    "handle a large payload (64 KB)" in {
+      val bigPayload = ByteString(Array.fill(65536)(0x42.toByte))
+      runLoopback(sslContext12, TLS12Ciphers, List(bigPayload)).length 
shouldEqual 65536
+    }
+
+    // ──────────────────────────────────────────────────────────────────────
+    // Warmup timer correctness:
+    //   The stage defers the first pump() by one scheduler tick (via
+    //   scheduleOnce(InitialPumpTimer, Duration.Zero)) so that error signals
+    //   from Source.failed arrive before the TLS handshake is initiated.
+    //   Without this warmup, spurious handshake bytes would be emitted to
+    //   cipherOut before the failure propagates, leaving dangling 
subscriptions.
+    // ──────────────────────────────────────────────────────────────────────
+
+    "reliably cancel subscriptions when cipherIn (TransportIn) fails early" in 
{
+      val ex = new Exception("transport-in-failure")
+      val client = stageFlow(sslContext12, TLS12Ciphers, clientMode = true, 
EagerClose)
+
+      val (sub, out1, out2) =
+        RunnableGraph
+          .fromGraph(
+            GraphDSL.createGraph(
+              Source.asSubscriber[SslTlsOutbound],
+              Sink.head[ByteString],
+              Sink.head[SslTlsInbound])((_, _, _)) { implicit b => (s, o1, o2) 
=>
+              val tls = b.add(client)
+              s        ~> tls.in1
+              tls.out1 ~> o1
+              o2       <~ tls.out2
+              tls.in2  <~ Source.failed(ex)
+              ClosedShape
+            })
+          .run()
+
+      the[Exception] thrownBy Await.result(out1, 3.seconds) should be(ex)
+      the[Exception] thrownBy Await.result(out2, 3.seconds) should be(ex)
+      Thread.sleep(500)
+      val pub = TestPublisher.probe()
+      pub.subscribe(sub)
+      pub.expectSubscription().expectCancellation()
+    }
+
+    "reliably cancel subscriptions when plainIn (UserIn) fails early" in {
+      val ex = new Exception("user-in-failure")
+      val client = stageFlow(sslContext12, TLS12Ciphers, clientMode = true, 
EagerClose)
+
+      val (sub, out1, out2) =
+        RunnableGraph
+          .fromGraph(
+            GraphDSL.createGraph(
+              Source.asSubscriber[ByteString],
+              Sink.head[ByteString],
+              Sink.head[SslTlsInbound])((_, _, _)) { implicit b => (s, o1, o2) 
=>
+              val tls = b.add(client)
+              Source.failed[SslTlsOutbound](ex) ~> tls.in1
+              tls.out1                          ~> o1
+              o2      <~ tls.out2
+              tls.in2 <~ s
+              ClosedShape
+            })
+          .run()
+
+      the[Exception] thrownBy Await.result(out1, 3.seconds) should be(ex)
+      the[Exception] thrownBy Await.result(out2, 3.seconds) should be(ex)
+      Thread.sleep(500)
+      val pub = TestPublisher.probe()
+      pub.subscribe(sub)
+      pub.expectSubscription().expectCancellation()
+    }
+
+    // ──────────────────────────────────────────────────────────────────────
+    // Closing mode coverage
+    // ──────────────────────────────────────────────────────────────────────
+
+    "complete cleanly under EagerClose" in {
+      val bytes = ByteString("EagerClose")
+      runLoopback(sslContext12, TLS12Ciphers, List(bytes), EagerClose) 
shouldEqual bytes
+    }
+
+    "complete cleanly under IgnoreBoth" in {
+      val bytes = ByteString("IgnoreBoth")
+      runLoopback(sslContext12, TLS12Ciphers, List(bytes), IgnoreBoth) 
shouldEqual bytes
+    }
+
+    // ──────────────────────────────────────────────────────────────────────
+    // Regression: empty ByteString must not corrupt TLS frame processing.
+    // Before fix, a ChoppingBlock.chopInto() call on an empty ByteString
+    // could flip the ByteBuffer state machine into an inconsistent state.
+    // See: https://github.com/apache/pekko/issues/2860
+    // ──────────────────────────────────────────────────────────────────────
+
+    "ignore empty SendBytes without corrupting the session" in {
+      val realBytes = ByteString("non-empty")
+      val payloads: List[ByteString] = List(ByteString.empty, realBytes, 
ByteString.empty)
+      runLoopback(sslContext12, TLS12Ciphers, payloads) shouldEqual realBytes
+    }
+
+    // ──────────────────────────────────────────────────────────────────────
+    // Session callback verification
+    // ──────────────────────────────────────────────────────────────────────
+
+    "invoke verifySession callback after handshake completes" in {
+      val verified = new AtomicBoolean(false)
+
+      val client = BidiFlow
+        .fromGraph(new TlsGraphStage(
+          () => {
+            val engine = sslContext12.createSSLEngine()
+            engine.setUseClientMode(true)
+            engine.setEnabledCipherSuites(TLS12Ciphers.toArray)
+            engine
+          },
+          _ => { verified.set(true); Success(()) },
+          IgnoreComplete))
+        .addAttributes(Attributes.asyncBoundary)
+
+      val server = stageFlow(sslContext12, TLS12Ciphers, clientMode = false, 
IgnoreComplete)
+
+      val verifyBytes = ByteString("verify")
+      Await.result(
+        Source(List(SendBytes(verifyBytes)))
+          .via(client.atop(server.reversed).join(echoFlow))
+          .collect { case SessionBytes(_, b) if b.nonEmpty => b }
+          .scan(ByteString.empty)(_ ++ _)
+          .dropWhile(_.length < verifyBytes.length)
+          .runWith(Sink.head),
+        10.seconds)
+
+      verified.get() shouldBe true
+    }
+
+    "propagate verifySession rejection as stream failure" in {
+      val rejection = new Exception("session-rejected")
+
+      val client = BidiFlow
+        .fromGraph(new TlsGraphStage(
+          () => {
+            val engine = sslContext12.createSSLEngine()
+            engine.setUseClientMode(true)
+            engine.setEnabledCipherSuites(TLS12Ciphers.toArray)
+            engine
+          },
+          _ => Failure(rejection),
+          IgnoreComplete))
+        .addAttributes(Attributes.asyncBoundary)
+
+      val server = stageFlow(sslContext12, TLS12Ciphers, clientMode = false, 
IgnoreComplete)
+
+      val resultFuture = Source(List(SendBytes(ByteString("reject test"))))
+        .via(client.atop(server.reversed).join(echoFlow))
+        .runWith(Sink.seq)
+
+      val ex = the[Exception] thrownBy Await.result(resultFuture, 10.seconds)
+      ex.getMessage should include("session-rejected")
+    }
+  }
+}
diff --git a/stream/src/main/resources/reference.conf 
b/stream/src/main/resources/reference.conf
index bba71993ec..c7cbb9fa24 100644
--- a/stream/src/main/resources/reference.conf
+++ b/stream/src/main/resources/reference.conf
@@ -175,6 +175,24 @@ pekko {
         final-termination-signal-deadline = 2 seconds
       }
       //#stream-ref
+
+      # TLS (SSL/TLS stream cipher) configuration.
+      # Config path: pekko.stream.materializer.tls.*
+      #
+      # This is nested inside materializer so the full key is:
+      #   pekko.stream.materializer.tls.use-legacy-actor
+      # which is what TlsGraphStage reads at class-initialization time.
+      tls {
+        # JVM-level feature switch for the TLS GraphStage implementation.
+        #
+        # When 'true' (default), the legacy Actor-based TLS substrate is used.
+        # When 'false', a clean GraphStage-based implementation is used 
instead.
+        #
+        # NOTE: This flag is read once when the TlsGraphStage companion object 
is
+        # initialized (lazy val). Changing it at runtime has no effect, and it
+        # cannot be overridden per-ActorSystem within the same JVM.
+        use-legacy-actor = true
+      }
     }
 
     # Deprecated, left here to not break Pekko HTTP which refers to it
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala
new file mode 100644
index 0000000000..6d36149181
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala
@@ -0,0 +1,1084 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.impl.io
+
+import java.nio.ByteBuffer
+import javax.net.ssl._
+import javax.net.ssl.SSLEngineResult.HandshakeStatus
+import javax.net.ssl.SSLEngineResult.HandshakeStatus._
+import javax.net.ssl.SSLEngineResult.Status._
+
+import scala.annotation.tailrec
+import scala.concurrent.duration.Duration
+import scala.util.{ Failure, Success, Try }
+import scala.util.control.NonFatal
+
+import com.typesafe.config.ConfigFactory
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream._
+import pekko.stream.TLSProtocol._
+import pekko.stream.stage._
+import pekko.util.ByteString
+
+/**
+ * INTERNAL API.
+ *
+ * GraphStage-based TLS implementation that replaces the legacy Actor+Pump 
substrate.
+ *
+ * This stage operates as an independent async island. While GraphStages 
normally benefit
+ * from fusion, TLS is an exception:
+ *
+ *  1. SSLEngine is NOT thread-safe; it must run on a dedicated thread.
+ *  2. The async boundary enables pipelining between encrypt/decrypt and I/O.
+ *  3. Multiple TLS records are batched into a single emission per pump cycle,
+ *     reducing scheduler overhead ~2-3× for small-message workloads.
+ *
+ * JVM-level feature switch (read once at class-init time, not 
per-ActorSystem):
+ *   pekko.stream.materializer.tls.use-legacy-actor = true   # legacy TLSActor 
(default)
+ *   pekko.stream.materializer.tls.use-legacy-actor = false  # this GraphStage
+ */
+@InternalApi private[stream] object TlsGraphStage {
+
+  /**
+   * JVM-level feature flag. Read once at class initialization to avoid 
inconsistent
+   * state within a single JVM (different ActorSystems cannot use different 
implementations).
+   *
+   * Package-private to allow access from TLS.scala in the scaladsl package.
+   */
+  private[stream] lazy val useLegacyActor: Boolean =
+    
ConfigFactory.load().getBoolean("pekko.stream.materializer.tls.use-legacy-actor")
+
+  // Netty-derived buffer sizes:
+  //   16665 = max TLS record payload
+  //   +2048 = header + padding + JDK headroom (BouncyCastle / OpenJDK 
compatibility)
+  val MaxTransportOutBytes: Int = 16665 + 2048
+
+  // Two TLS records: avoids BUFFER_OVERFLOW from double-record fragmentation 
in unwrap().
+  // See TLSActor comments for the original rationale.
+  val MaxUserOutBytes: Int = 16665 * 2 + 2048
+
+  val MaxTransportInBytes: Int = 16665 + 2048
+  val MaxUserInBytes: Int = 16665 + 2048
+
+  /**
+   * Maximum bytes to accumulate before forcing a flush to cipherOut.
+   * Multiple doWrap() results within one pump() pass are batched to reduce
+   * per-push overhead for small-message workloads (ping-pong style traffic).
+   */
+  val MaxPendingTransportOutBytes: Int = 32 * 1024
+
+  /**
+   * Safety ceiling for wrap/unwrap retry counters.
+   * Guards against JDK SSLEngine bugs that cause infinite loops.
+   */
+  val MaxTLSIterations: Int = 1000
+
+  /** Timer key for the initial warmup delay (see preStart() for explanation). 
*/
+  private[io] case object InitialPumpTimer
+
+  /**
+   * Timer key for the second warmup tick.
+   *
+   * ⚠️ WHY TWO TIMERS — DO NOT COLLAPSE INTO ONE:
+   *
+   * The [[GraphInterpreter]] uses a *chased-pull* optimisation: only ONE pull
+   * per event-processing cycle can be propagated inline (synchronously).  When
+   * the first timer fires and we call both pull(plainIn) and pull(cipherIn):
+   *
+   *   1. pull(plainIn)  → chased inline   (chasedPull = plainIn_conn)
+   *   2. pull(cipherIn) → ENQUEUED        (chasedPull already occupied)
+   *
+   * Additionally, even if a pull IS chased, the upstream's response (e.g. the
+   * failure event from Source.failed) is itself *enqueued*, not executed
+   * synchronously.  Concretely, Source.failed.onPull calls failStage() →
+   * interpreter.fail(conn) → enqueue(conn).  The `onUpstreamFailure` callback
+   * only fires when the interpreter dequeues that connection event.
+   *
+   * All these enqueued events are processed within the SAME actor-message
+   * activation as InitialPumpTimer (M1), so when M1 completes the queues are
+   * drained and any cipherIn failure has set stopped=true.
+   *
+   * By scheduling a SECOND timer (M2) from inside InitialPumpTimer, we
+   * guarantee that pump() runs in a separate actor-message activation that
+   * starts only AFTER M1 (and all its queued side-effects) has finished.
+   * If stopped=true (due to early cipherIn failure), pump() is a no-op.
+   */
+  private[io] case object SecondPumpTimer
+}
+
+/**
+ * INTERNAL API.
+ *
+ * Clean-room GraphStage-based TLS stage. All SSLEngine state-machine logic is
+ * ported from the legacy TLSActor design but re-expressed using pure 
GraphStage
+ * primitives—no Pump, TransferPhase, or TransferState inheritance.
+ *
+ * Port layout (matches TlsModule):
+ *   plainIn  (Inlet[SslTlsOutbound]) — user data to encrypt  (left-top)
+ *   cipherOut (Outlet[ByteString])   — encrypted bytes to net  (right-top)
+ *   cipherIn  (Inlet[ByteString])    — encrypted bytes from net (right-bottom)
+ *   plainOut  (Outlet[SslTlsInbound])— decrypted data to user  (left-bottom)
+ */
+@InternalApi private[stream] final class TlsGraphStage(
+    createSSLEngine: () => SSLEngine,
+    verifySession: SSLSession => Try[Unit],
+    closing: TLSClosing)
+    extends GraphStage[BidiShape[SslTlsOutbound, ByteString, ByteString, 
SslTlsInbound]] {
+
+  import TlsGraphStage._
+
+  val plainIn: Inlet[SslTlsOutbound] = Inlet("TlsGraphStage.plainIn")
+  val cipherOut: Outlet[ByteString] = Outlet("TlsGraphStage.cipherOut")
+  val cipherIn: Inlet[ByteString] = Inlet("TlsGraphStage.cipherIn")
+  val plainOut: Outlet[SslTlsInbound] = Outlet("TlsGraphStage.plainOut")
+
+  override val shape: BidiShape[SslTlsOutbound, ByteString, ByteString, 
SslTlsInbound] =
+    BidiShape(plainIn, cipherOut, cipherIn, plainOut)
+
+  /**
+   * Force this stage to run as an independent async island.
+   * Fusing TLS with adjacent stages would break SSLEngine thread-safety and
+   * eliminate the async-boundary pipelining benefit that makes this fast.
+   */
+  override protected def initialAttributes: Attributes =
+    Attributes.name("TlsGraphStage") and
+      
ActorAttributes.dispatcher("pekko.stream.materializer.blocking-io-dispatcher")
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new TimerGraphStageLogic(shape) with StageLogging {
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Phase ADT — replaces TLSActor's linked-list of TransferPhase objects.
+      // ──────────────────────────────────────────────────────────────────────
+
+      private sealed trait TlsPhase
+
+      /** Normal bidirectional TLS operation. */
+      private case object Bidirectional extends TlsPhase
+
+      /** plainIn is done; flushing remaining NEED_WRAP outbound messages. */
+      private case object FlushingOutbound extends TlsPhase
+
+      /**
+       * Outbound close_notify sent; consuming remaining cipherIn to finish
+       * the close handshake (e.g. TLS 1.3 close_notify exchange).
+       */
+      private case object AwaitingClose extends TlsPhase
+
+      /** plainOut cancelled; only the outbound direction remains active. */
+      private case object OutboundClosed extends TlsPhase
+
+      /** cipherIn closed; only the inbound (unwrap) direction remains active. 
*/
+      private case object InboundClosed extends TlsPhase
+
+      /** Both directions done; pump() will call completeStage(). */
+      private case object Done extends TlsPhase
+
+      // ──────────────────────────────────────────────────────────────────────
+      // ChoppingBlock — standalone ByteString→ByteBuffer adapter.
+      //
+      // The SSLEngine requires fixed-size ByteBuffers, but we receive 
arbitrary
+      // ByteString chunks. ChoppingBlock manages the leftovers between calls.
+      //
+      // IMPORTANT: This class does NOT extend TransferState (legacy 
substrate).
+      // ──────────────────────────────────────────────────────────────────────
+
+      /**
+       * Adapts incoming [[ByteString]] chunks to fixed-size [[ByteBuffer]]s.
+       *
+       * "Chopping block" metaphor: we slice off as much as fits in the buffer
+       * and put back any unconsumed remainder so no bytes are silently 
dropped.
+       */
+      private final class ChoppingBlock() {
+        private var buffer: ByteString = ByteString.empty
+
+        def isEmpty: Boolean = buffer.isEmpty
+        def nonEmpty: Boolean = buffer.nonEmpty
+
+        /** Append bytes to the pending slice. */
+        def load(bs: ByteString): Unit =
+          if (bs.nonEmpty) buffer = buffer ++ bs
+
+        /**
+         * Fill `b` from the block.
+         *
+         * Expects `b` in "leftover" mode (position=0, limit=leftoverBytes).
+         * After the call, `b` is in read mode (position=0, limit=total data)
+         * ready for [[SSLEngine.wrap]] or [[SSLEngine.unwrap]].
+         *
+         * Sequence: compact (shift leftovers to front) → copy → flip.
+         */
+        def chopInto(b: ByteBuffer): Unit = {
+          b.compact() // position = #leftover bytes; limit = capacity
+          val copied = buffer.copyToBuffer(b)
+          buffer = buffer.drop(copied)
+          b.flip() // position=0; limit=leftover+copied → ready for engine
+        }
+
+        /**
+         * Return any unconsumed bytes (position..limit) to the front of the 
block
+         * and reset `b` to the empty "leftover" state.
+         *
+         * Must be called after engine.wrap()/unwrap() to handle partial 
consumption.
+         * The leftover bytes MUST be prepended (not appended) so future 
chopInto()
+         * presents them to the engine before any newly-arrived data.
+         */
+        def putBack(b: ByteBuffer): Unit = {
+          if (b.hasRemaining) {
+            val leftover = ByteString(b)
+            if (leftover.nonEmpty) buffer = leftover ++ buffer
+          }
+          prepare(b)
+        }
+
+        /**
+         * Reset `b` to the canonical empty "leftover" state:
+         * position=0, limit=0. chopInto() will compact() first, so this
+         * is equivalent to "no leftover bytes present".
+         */
+        def prepare(b: ByteBuffer): Unit = {
+          b.clear()
+          b.limit(0)
+        }
+      }
+
+      // ──────────────────────────────────────────────────────────────────────
+      // ByteBuffers
+      //
+      // transportOutBuffer / userOutBuffer start in *write mode* (pos=0, 
lim=cap).
+      // userInBuffer / transportInBuffer start in *empty-leftover mode* 
(pos=0, lim=0)
+      // via ChoppingBlock.prepare(), because chopInto() calls compact() first.
+      // ──────────────────────────────────────────────────────────────────────
+
+      private val transportOutBuffer: ByteBuffer = 
ByteBuffer.allocate(MaxTransportOutBytes)
+      private val userOutBuffer: ByteBuffer = 
ByteBuffer.allocate(MaxUserOutBytes)
+      private val transportInBuffer: ByteBuffer = 
ByteBuffer.allocate(MaxTransportInBytes)
+      private val userInBuffer: ByteBuffer = 
ByteBuffer.allocate(MaxUserInBytes)
+
+      private val userInChoppingBlock = new ChoppingBlock()
+      private val transportInChoppingBlock = new ChoppingBlock()
+
+      // Initialise the input-side buffers to "empty leftover" mode.
+      // Without this, chopInto()'s compact() would treat the initial position
+      // as valid leftover bytes and corrupt the first TLS frame.
+      userInChoppingBlock.prepare(userInBuffer)
+      transportInChoppingBlock.prepare(transportInBuffer)
+
+      // ──────────────────────────────────────────────────────────────────────
+      // SSLEngine state
+      // ──────────────────────────────────────────────────────────────────────
+
+      // Engine construction can fail (e.g. bad keystore). Store for 
preStart().
+      private var engineInitException: Option[Throwable] = None
+
+      private val engine: SSLEngine =
+        try createSSLEngine()
+        catch {
+          case NonFatal(ex) =>
+            engineInitException = Some(ex)
+            null
+        }
+
+      private var lastHandshakeStatus: HandshakeStatus = _
+      private var currentSession: SSLSession = _
+
+      /**
+       * When true, decrypted application data is held back from plainOut until
+       * verifySession() succeeds after handshake completion. This prevents the
+       * application from observing data before the TLS session is 
authenticated.
+       */
+      private var corkUser: Boolean = true
+
+      /**
+       * Guards against JDK bug: engine.unwrap() returns OK+NEED_WRAP but 
produces
+       * no bytes, causing an infinite putBack loop.
+       * See: https://github.com/apache/pekko/issues/442
+       * Reset to 0 on each successful flushToUser().
+       */
+      private var unwrapPutBackCounter: Int = 0
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Pump state
+      // ──────────────────────────────────────────────────────────────────────
+
+      private var currentPhase: TlsPhase = Bidirectional
+
+      // Re-entrancy guard: if pump() is called recursively (e.g. from a 
handler
+      // invoked by push()), set pumpAgain so the outer call runs another 
iteration.
+      private var pumping: Boolean = false
+      private var pumpAgain: Boolean = false
+
+      // Set to true once the stage is fully shut down.
+      private var stopped: Boolean = false
+
+      /**
+       * Batches multiple doWrap() results within a single pump() invocation.
+       * Emitted in one push() call at the end of the pump loop, amortising
+       * per-push scheduling overhead for small-message workloads.
+       */
+      private var pendingOutboundBytes: ByteString = ByteString.empty
+
+      /**
+       * A [[NegotiateNewSession]] command received on plainIn but not yet 
applied.
+       * Applied at the start of the next pump cycle to keep engine state 
consistent.
+       */
+      private var pendingNewSession: Option[NegotiateNewSession] = None
+
+      // Port termination flags (set in handler callbacks)
+      private var plainInDone: Boolean = false
+      private var cipherInDone: Boolean = false
+      private var plainOutTerminated: Boolean = false
+      private var cipherOutTerminated: Boolean = false
+
+      /**
+       * Warmup flag: set to true once InitialPumpTimer (M1) has fired and the
+       * inlet pulls have been issued.  Guards against cipherOut.onPull 
triggering
+       * pump() before the inlets are ready.
+       *
+       * ⚠️ CRITICAL TIMING — TWO-TIMER WARMUP — DO NOT REVERT WITHOUT READING:
+       *
+       * We deliberately use TWO timer ticks (two separate actor-message 
activations,
+       * M1 = InitialPumpTimer and M2 = SecondPumpTimer) instead of issuing 
pulls
+       * and calling pump() in a single handler.  Reasons:
+       *
+       * (1) Pulls from preStart() are QUEUED events, not synchronous.  We 
therefore
+       *     defer ALL inlet pulls to the first timer tick (M1).
+       *
+       * (2) Inside the timer callback (element-handler context), the fused
+       *     GraphInterpreter can only CHASE one pull at a time (the chasedPull
+       *     variable).  pull(plainIn) occupies the chase slot; pull(cipherIn) 
is
+       *     therefore placed in the event QUEUE, not chased inline.
+       *
+       * (3) Even chased pulls do NOT propagate upstream failures 
synchronously.
+       *     When Source.failed.onPull() calls failStage(), the interpreter 
calls
+       *     fail(conn) which ENQUEUES the failure event.  The 
onUpstreamFailure
+       *     callback only fires when the interpreter later dequeues that 
connection.
+       *
+       * (4) All enqueued events from M1 (pull outcomes: failures, 
completions, data)
+       *     are fully drained within M1's single actor-message activation.  
The
+       *     SecondPumpTimer (M2) is scheduled from inside M1 and fires in a 
SEPARATE
+       *     actor message that starts only after M1 finishes.
+       *
+       * Conclusion: pump() is called only in SecondPumpTimer (M2).  By then,
+       * if cipherIn failed early, stopped=true and pump() is a no-op — no
+       * ClientHello is emitted.  For live streams nothing has failed and 
pump()
+       * starts the TLS handshake normally.
+       */
+      private var warmupDone: Boolean = false
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Lifecycle
+      // ──────────────────────────────────────────────────────────────────────
+
+      override def preStart(): Unit =
+        engineInitException match {
+          case Some(ex) =>
+            // Fail fast if the SSLEngine could not be created.
+            failStage(ex)
+          case None =>
+            engine.beginHandshake()
+            lastHandshakeStatus = engine.getHandshakeStatus
+            currentSession = engine.getSession
+
+            // Do NOT pull any inlet here — pulls from preStart() are QUEUED 
events
+            // in the fused interpreter, not synchronous.  Both pulls are 
deferred
+            // to InitialPumpTimer (M1).  pump() itself is deferred to 
SecondPumpTimer
+            // (M2) so that any upstream-failure side-effects from M1 are 
visible.
+            scheduleOnce(InitialPumpTimer, Duration.Zero)
+        }
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Timer handler
+      // ──────────────────────────────────────────────────────────────────────
+
+      override def onTimer(timerKey: Any): Unit = timerKey match {
+        case InitialPumpTimer =>
+          // M1: issue inlet pulls so upstream sources register demand.
+          // Any immediate Source.failed failure is ENQUEUED as a connection 
event
+          // and will be fully processed within M1's actor-message activation.
+          // Do NOT call pump() here — it must wait for M2.
+          warmupDone = true
+          if (!plainInDone && !isClosed(plainIn)) pull(plainIn)
+          if (!cipherInDone && !isClosed(cipherIn)) pull(cipherIn)
+          // Schedule M2: pump() will run in the next actor-message activation,
+          // after all M1 queued events (incl. any cipherIn failure) are 
drained.
+          scheduleOnce(SecondPumpTimer, Duration.Zero)
+
+        case SecondPumpTimer =>
+          // M2: M1 is fully drained.  If cipherIn failed early, stopped=true 
and
+          // pump() is a safe no-op.  Otherwise kick off the TLS handshake.
+          pump()
+
+        case _ => // ignore unknown timer keys
+      }
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Port handlers
+      // ──────────────────────────────────────────────────────────────────────
+
+      setHandler(
+        plainIn,
+        new InHandler {
+          override def onPush(): Unit = {
+            grab(plainIn) match {
+              case SendBytes(bs) =>
+                userInChoppingBlock.load(bs)
+              case n: NegotiateNewSession =>
+                // Buffer the renegotiation request; apply at next pump cycle 
start
+                // to avoid mutating engine state mid-wrap/unwrap.
+                pendingNewSession = Some(n)
+              case _ => // forward-compatibility: ignore unknown subtypes
+            }
+            if (warmupDone) pump()
+          }
+
+          override def onUpstreamFinish(): Unit = {
+            plainInDone = true
+            if (warmupDone) pump()
+          }
+
+          override def onUpstreamFailure(ex: Throwable): Unit = failTls(ex)
+        })
+
+      setHandler(
+        plainOut,
+        new OutHandler {
+          override def onPull(): Unit = if (warmupDone) pump()
+
+          override def onDownstreamFinish(cause: Throwable): Unit = {
+            plainOutTerminated = true
+            if (warmupDone) pump()
+          }
+        })
+
+      setHandler(
+        cipherIn,
+        new InHandler {
+          override def onPush(): Unit = {
+            transportInChoppingBlock.load(grab(cipherIn))
+            if (warmupDone) pump()
+          }
+
+          override def onUpstreamFinish(): Unit = {
+            cipherInDone = true
+            if (warmupDone) pump()
+          }
+
+          override def onUpstreamFailure(ex: Throwable): Unit = failTls(ex)
+        })
+
+      setHandler(
+        cipherOut,
+        new OutHandler {
+          override def onPull(): Unit = if (warmupDone) pump()
+
+          override def onDownstreamFinish(cause: Throwable): Unit = {
+            cipherOutTerminated = true
+            if (warmupDone) pump()
+          }
+        })
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Central pump loop
+      // ──────────────────────────────────────────────────────────────────────
+
+      /**
+       * The central dispatch loop. All port events eventually invoke pump().
+       *
+       * Re-entrancy is handled via the pumping/pumpAgain flag pair:
+       * if pump() is entered while already running (e.g. from a push() 
callback),
+       * pumpAgain is set so the outer call runs an additional iteration rather
+       * than allowing two concurrent traversals of the loop.
+       */
+      private def pump(): Unit = {
+        if (stopped) return
+        if (pumping) { pumpAgain = true; return }
+        pumping = true
+        try {
+          do {
+            pumpAgain = false
+            loadPending()
+            tryPull()
+            if (currentPhase != Done) step()
+          } while (pumpAgain && !stopped && currentPhase != Done)
+        } catch {
+          case NonFatal(ex) => failTls(ex)
+        } finally {
+          pumping = false
+        }
+
+        // Batch-flush all accumulated outbound bytes in a single push().
+        // Done outside the loop so multiple doWrap() results are coalesced.
+        flushPendingOutbound()
+
+        // Signal demand for inbound cipher bytes AFTER flushing outbound.
+        //
+        // ORDERING INVARIANT (critical for EagerClose correctness):
+        // When we push outbound bytes (above) and then signal demand for 
inbound
+        // bytes (below), the peer sees: push(echo), then pull(demand).  This
+        // means echo bytes arrive at client BEFORE the demand pull, so client
+        // processes the echo before deciding to close the outbound.
+        //
+        // Without this ordering, client would receive the demand pull first, 
close
+        // the outbound (sending close_notify), and then receive the echo bytes
+        // only to discard them in AwaitingClose — causing Sink.head to time 
out.
+        tryPullCipherIn()
+
+        if (currentPhase == Done && !stopped) {
+          stopped = true
+          if (!isClosed(cipherOut)) complete(cipherOut)
+          if (!isClosed(plainOut)) complete(plainOut)
+          completeStage()
+        }
+      }
+
+      /**
+       * Apply any pending NegotiateNewSession command at the start of a pump 
cycle.
+       * Must run before any wrap/unwrap so the engine parameters are stable.
+       */
+      private def loadPending(): Unit =
+        pendingNewSession.foreach { params =>
+          pendingNewSession = None
+          setNewSessionParameters(params)
+        }
+
+      /**
+       * Issue demand (pull) for plainIn only.
+       *
+       * cipherIn demand is intentionally NOT issued here — it is deferred to
+       * [[tryPullCipherIn]], which is called AFTER [[flushPendingOutbound]] in
+       * [[pump]].  This preserves the ordering invariant described in pump().
+       */
+      private def tryPull(): Unit = {
+        if (!plainInDone && !isClosed(plainIn) && !hasBeenPulled(plainIn) &&
+          userInChoppingBlock.isEmpty && pendingNewSession.isEmpty)
+          pull(plainIn)
+      }
+
+      /**
+       * Issue demand for cipherIn (inbound cipher bytes from the transport).
+       *
+       * Called AFTER [[flushPendingOutbound]] in [[pump]] to ensure that
+       * outbound bytes (echo, handshake, close_notify) reach the peer BEFORE
+       * the demand-pull signal does.  See the ordering invariant in pump().
+       */
+      private def tryPullCipherIn(): Unit = {
+        if (!cipherInDone && !isClosed(cipherIn) && !hasBeenPulled(cipherIn) &&
+          transportInChoppingBlock.isEmpty)
+          pull(cipherIn)
+      }
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Phase dispatch
+      // ──────────────────────────────────────────────────────────────────────
+
+      private def step(): Unit = currentPhase match {
+        case Bidirectional    => stepBidirectional()
+        case FlushingOutbound => stepFlushingOutbound()
+        case AwaitingClose    => stepAwaitingClose()
+        case OutboundClosed   => stepOutboundClosed()
+        case InboundClosed    => stepInboundClosed()
+        case Done             => // nothing to do
+      }
+
+      // ── Readiness predicates ──────────────────────────────────────────────
+      // These mirror the TransferState.isReady predicates from TLSActor but
+      // expressed as plain boolean methods rather than trait instances.
+
+      /**
+       * True when there is user data available to wrap AND the engine is not
+       * waiting for an unwrap first AND we are not mid-handshake (corkUser).
+       */
+      private def userHasData: Boolean =
+        !corkUser && userInChoppingBlock.nonEmpty && lastHandshakeStatus != 
NEED_UNWRAP
+
+      /** True when the handshake requires us to send a message. */
+      private def engineNeedsWrap: Boolean =
+        lastHandshakeStatus == NEED_WRAP
+
+      /**
+       * True when the outbound pipeline can make progress:
+       * data or handshake work exists, cipherOut is available, and not 
terminated.
+       */
+      private def outboundReady: Boolean =
+        (userHasData || engineNeedsWrap) && !cipherOutTerminated && 
isAvailable(cipherOut)
+
+      /**
+       * True when the inbound pipeline can make progress:
+       * transport bytes are available (or transport is exhausted), and we can
+       * deliver decrypted output (plainOut available or already cancelled).
+       */
+      private def inboundReady: Boolean =
+        (transportInChoppingBlock.nonEmpty || cipherInDone) &&
+          (isAvailable(plainOut) || plainOutTerminated)
+
+      /**
+       * Outbound readiness check for half-closed state (FlushingOutbound / 
post-close).
+       * No user data required; only engine-driven NEED_WRAP is checked.
+       */
+      private def outboundHalfClosedReady: Boolean =
+        engineNeedsWrap && !cipherOutTerminated && isAvailable(cipherOut)
+
+      /**
+       * True when it is safe to call engine.closeOutbound().
+       * Calling closeOutbound() during a handshake can cause the handshake to
+       * terminate abnormally and silently suppress the real error (JDK 8+).
+       */
+      private def mayCloseOutbound: Boolean =
+        lastHandshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
+          lastHandshakeStatus == HandshakeStatus.FINISHED
+
+      // ── Phase step implementations ────────────────────────────────────────
+
+      private def stepBidirectional(): Unit = {
+        if (inboundReady) {
+          val continue = doInbound(isOutboundClosed = false, 
checkDownstreamCancel = true)
+          if (continue && outboundReady) doOutbound(isInboundClosed = false)
+        } else if (plainOutTerminated) {
+          // Downstream cancelled plainOut (e.g. Sink.head received enough 
data) but
+          // no inbound bytes are available to trigger doInbound's cancel 
check.
+          // React immediately to avoid a livelock.
+          //
+          // This branch fires when IgnoreComplete held us in Bidirectional 
after the
+          // outbound Source exhausted, and THEN the downstream cancelled.
+          if (closing.ignoreCancel) {
+            transitTo(InboundClosed)
+          } else {
+            if (mayCloseOutbound) {
+              engine.closeOutbound()
+              lastHandshakeStatus = engine.getHandshakeStatus
+            }
+            transitTo(FlushingOutbound)
+          }
+        } else if (outboundReady) {
+          doOutbound(isInboundClosed = false)
+        }
+        // No else: nothing ready this tick, wait for the next event.
+      }
+
+      private def stepFlushingOutbound(): Unit = {
+        if (outboundHalfClosedReady) {
+          try doWrap()
+          catch { case _: SSLException => transitTo(Done) }
+        }
+      }
+
+      private def stepAwaitingClose(): Unit = {
+        if (cipherInDone && transportInChoppingBlock.isEmpty) {
+          // Transport depleted: signal inbound closure to the engine.
+          try engine.closeInbound()
+          catch { case _: SSLException => /* expected when close_notify was 
already received */ }
+          lastHandshakeStatus = engine.getHandshakeStatus
+          transitTo(Done)
+        } else if (transportInChoppingBlock.nonEmpty) {
+          transportInChoppingBlock.chopInto(transportInBuffer)
+          try doUnwrap(ignoreOutput = true)
+          catch { case _: SSLException => transitTo(Done) }
+        }
+        // If neither condition met, wait for more cipher bytes (pulled by 
tryPull).
+      }
+
+      private def stepOutboundClosed(): Unit = {
+        if (inboundReady) {
+          val continue = doInbound(isOutboundClosed = true, 
checkDownstreamCancel = true)
+          if (continue && outboundHalfClosedReady) {
+            try doWrap()
+            catch { case _: SSLException => transitTo(Done) }
+          }
+        } else if (outboundHalfClosedReady) {
+          try doWrap()
+          catch { case _: SSLException => transitTo(Done) }
+        }
+      }
+
+      private def stepInboundClosed(): Unit = {
+        // In InboundClosed phase the peer has closed; we still service 
outbound.
+        // We intentionally do NOT check plainOutTerminated for inbound 
cancellation
+        // here (equivalent to TLSActor's inboundHalfClosed state which skips 
the
+        // userOutCancelled check).
+        val inboundHalfReady = (transportInChoppingBlock.nonEmpty || 
cipherInDone)
+        if (inboundHalfReady) {
+          val continue = doInbound(isOutboundClosed = false, 
checkDownstreamCancel = false)
+          if (continue && outboundReady) doOutbound(isInboundClosed = true)
+        } else if (outboundReady) {
+          doOutbound(isInboundClosed = true)
+        } else if (plainInDone && userInChoppingBlock.isEmpty && 
mayCloseOutbound) {
+          // Both sides exhausted: inbound was cancelled (InboundClosed phase) 
AND
+          // the outbound Source has finished with no buffered user data 
remaining.
+          // Close the outbound so close_notify can be flushed and the stage 
can
+          // complete cleanly.
+          //
+          // Without this, the stage loops waiting for an outboundReady event 
that
+          // will never arrive (IgnoreCancel + IgnoreComplete = IgnoreBoth 
scenario).
+          engine.closeOutbound()
+          lastHandshakeStatus = engine.getHandshakeStatus
+          transitTo(OutboundClosed)
+        }
+      }
+
+      // ──────────────────────────────────────────────────────────────────────
+      // High-level inbound / outbound handlers
+      // ──────────────────────────────────────────────────────────────────────
+
+      /**
+       * Drive one inbound step (transport bytes → decrypted user output).
+       *
+       * @param isOutboundClosed true when the outbound direction is already 
closed
+       *                         (affects how we handle downstream 
cancellation).
+       * @param checkDownstreamCancel whether to react to plainOut 
cancellation.
+       *                              False in InboundClosed phase where 
outbound is
+       *                              still active and user-data cancel must 
be ignored.
+       * @return true if processing should continue within this pump cycle.
+       */
+      private def doInbound(isOutboundClosed: Boolean, checkDownstreamCancel: 
Boolean): Boolean = {
+        if (cipherInDone && transportInChoppingBlock.isEmpty) {
+          // Transport is completely drained: tell the engine there is no more
+          // inbound data.  An SSLException here means the peer closed without
+          // sending TLS close_notify (session truncated).
+          try engine.closeInbound()
+          catch {
+            case _: SSLException =>
+              if (!plainOutTerminated && isAvailable(plainOut))
+                push(plainOut, SessionTruncated)
+          }
+          lastHandshakeStatus = engine.getHandshakeStatus
+          completeOrFlush()
+          false
+        } else if (checkDownstreamCancel && plainOutTerminated) {
+          // Downstream cancelled plainOut; choose how to proceed based on 
closing mode.
+          if (!isOutboundClosed && closing.ignoreCancel) {
+            // User requested ignoreCancel: switch to InboundClosed and keep 
sending.
+            transitTo(InboundClosed)
+          } else {
+            // Close the outbound direction, triggering close_notify.
+            // Only call closeOutbound() when the engine is not mid-handshake
+            // (see mayCloseOutbound comment).
+            engine.closeOutbound()
+            lastHandshakeStatus = engine.getHandshakeStatus
+            transitTo(FlushingOutbound)
+          }
+          true
+        } else if (transportInChoppingBlock.nonEmpty) {
+          transportInChoppingBlock.chopInto(transportInBuffer)
+          try {
+            doUnwrap(ignoreOutput = false)
+            true
+          } catch {
+            case ex: SSLException =>
+              failTls(ex)
+              // Attempt a best-effort inbound close; errors here are expected.
+              try engine.closeInbound()
+              catch { case _: SSLException => }
+              completeOrFlush()
+              false
+          }
+        } else {
+          // No transport data and not depleted — wait for more.
+          true
+        }
+      }
+
+      /**
+       * Drive one outbound step (user plaintext → encrypted transport bytes).
+       *
+       * @param isInboundClosed true when the inbound direction is already 
closed
+       *                        (affects how we interpret plainIn completion).
+       */
+      private def doOutbound(isInboundClosed: Boolean): Unit = {
+        if (plainInDone && userInChoppingBlock.isEmpty && mayCloseOutbound) {
+          if (!isInboundClosed && closing.ignoreComplete) {
+            // IgnoreComplete: source exhausted but the user asked us NOT to 
close
+            // the TLS outbound yet.  Stay in the current phase; 
stepBidirectional
+            // will react when plainOut is cancelled (plainOutTerminated=true).
+            //
+            // IMPORTANT: do NOT call transitTo(OutboundClosed) here.  If we 
did,
+            // the stage would enter OutboundClosed but the engine's outbound 
is
+            // still open — any subsequent outboundHalfClosedReady check would 
hang
+            // because NEED_WRAP was never triggered by closeOutbound().
+          } else {
+            engine.closeOutbound()
+            lastHandshakeStatus = engine.getHandshakeStatus
+            transitTo(OutboundClosed)
+          }
+        } else if (cipherOutTerminated) {
+          // Transport output cancelled: no point continuing.
+          transitTo(Done)
+        } else if (outboundReady) {
+          if (userHasData) userInChoppingBlock.chopInto(userInBuffer)
+          try doWrap()
+          catch {
+            case ex: SSLException =>
+              failTls(ex)
+              completeOrFlush()
+          }
+        }
+      }
+
+      // ──────────────────────────────────────────────────────────────────────
+      // SSLEngine core operations
+      // ──────────────────────────────────────────────────────────────────────
+
+      /**
+       * Wrap one chunk of user plaintext (userInBuffer) into ciphertext
+       * (transportOutBuffer), then add the result to pendingOutboundBytes.
+       *
+       * Pre-conditions (guaranteed by caller):
+       *  - userInBuffer is in read mode (position=0, limit=dataLen) after 
chopInto.
+       *  - transportOutBuffer is in write mode (accumulating previous 
results).
+       *  - isAvailable(cipherOut) is true (checked via outboundReady).
+       */
+      private def doWrap(): Unit = {
+        val result = engine.wrap(userInBuffer, transportOutBuffer)
+        lastHandshakeStatus = result.getHandshakeStatus
+        if (lastHandshakeStatus == FINISHED) handshakeFinished()
+        runDelegatedTasks()
+
+        result.getStatus match {
+          case OK =>
+            // Guard against JDK bug (see 
https://github.com/apache/pekko/issues/29922):
+            // engine.wrap() returns OK + NEED_WRAP but writes zero bytes — 
infinite loop.
+            // Note: we check transportOutBuffer.position() == 0 because 
flushToTransport()
+            // was not yet called; position reflects bytes written in this 
call only.
+            if (transportOutBuffer.position() == 0 && lastHandshakeStatus == 
NEED_WRAP)
+              throw new IllegalStateException(
+                "SSLEngine trying to loop NEED_WRAP without producing output")
+
+            flushToTransport()
+            userInChoppingBlock.putBack(userInBuffer)
+            // Another wrap iteration may be needed (e.g. more data or 
handshake steps).
+            pumpAgain = true
+
+          case CLOSED =>
+            flushToTransport()
+            if (engine.isInboundDone) transitTo(Done)
+            else transitTo(AwaitingClose)
+
+          case s =>
+            throw new IllegalStateException(s"unexpected status $s in 
doWrap()")
+        }
+      }
+
+      /**
+       * Unwrap one chunk of ciphertext (transportInBuffer) into plaintext
+       * (userOutBuffer), then deliver plaintext to plainOut.
+       *
+       * May recurse when the engine reports more data available in the buffer.
+       *
+       * @param ignoreOutput when true (e.g. AwaitingClose phase), produced 
plaintext
+       *                     is discarded — we are only draining for the close 
handshake.
+       */
+      @tailrec
+      private def doUnwrap(ignoreOutput: Boolean): Unit = {
+        val oldInPosition = transportInBuffer.position()
+        val result = engine.unwrap(transportInBuffer, userOutBuffer)
+        if (ignoreOutput) userOutBuffer.clear()
+        lastHandshakeStatus = result.getHandshakeStatus
+        runDelegatedTasks()
+
+        result.getStatus match {
+          case OK =>
+            result.getHandshakeStatus match {
+              case NEED_WRAP =>
+                // Engine received data but must send a handshake reply before
+                // it can produce more output.  Put remaining input back so the
+                // next pump cycle (after wrap) can resume.
+                //
+                // Guard counter: if the engine keeps returning NEED_WRAP 
without
+                // consuming bytes, bail out to avoid an infinite loop.
+                // See: https://github.com/apache/pekko/issues/442
+                unwrapPutBackCounter += 1
+                if (unwrapPutBackCounter > MaxTLSIterations)
+                  throw new IllegalStateException(
+                    s"Stuck in unwrap loop, bailing out, " +
+                    s"last handshake status [$lastHandshakeStatus], " +
+                    s"remaining=${transportInBuffer.remaining()}, " +
+                    s"out=${userOutBuffer.position()} " +
+                    "(see https://github.com/apache/pekko/issues/442)")
+                transportInChoppingBlock.putBack(transportInBuffer)
+
+              case FINISHED =>
+                flushToUser()
+                handshakeFinished()
+                transportInChoppingBlock.putBack(transportInBuffer)
+
+              case NEED_UNWRAP
+                  if transportInBuffer.hasRemaining &&
+                    userOutBuffer.position() == 0 &&
+                    transportInBuffer.position() == oldInPosition =>
+                // Guard against JDK infinite-loop bug: NEED_UNWRAP was 
returned but
+                // the engine consumed no bytes and produced no output — stuck.
+                throw new IllegalStateException(
+                  "SSLEngine trying to loop NEED_UNWRAP without producing 
output")
+
+              case _ =>
+                // Continue unwrapping if there are remaining bytes in the 
buffer.
+                if (transportInBuffer.hasRemaining) doUnwrap(ignoreOutput = 
false)
+                else flushToUser()
+            }
+
+          case CLOSED =>
+            flushToUser()
+            completeOrFlush()
+
+          case BUFFER_UNDERFLOW =>
+            // Not enough cipher bytes for a complete TLS record; wait for 
more.
+            flushToUser()
+
+          case BUFFER_OVERFLOW =>
+            // userOutBuffer is full; flush it first, then put remaining input 
back
+            // so the next doUnwrap() call can retry with a fresh buffer.
+            flushToUser()
+            transportInChoppingBlock.putBack(transportInBuffer)
+
+          case null =>
+            throw new IllegalStateException("unexpected status 'null' in 
doUnwrap()")
+        }
+      }
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Buffer flush helpers
+      // ──────────────────────────────────────────────────────────────────────
+
+      /**
+       * Move bytes from transportOutBuffer (write mode) into 
pendingOutboundBytes.
+       * The actual push() to cipherOut is deferred to flushPendingOutbound() 
so
+       * multiple doWrap() results within one pump() pass are batched together.
+       */
+      private def flushToTransport(): Unit = {
+        transportOutBuffer.flip()
+        if (transportOutBuffer.hasRemaining)
+          pendingOutboundBytes = pendingOutboundBytes ++ 
ByteString(transportOutBuffer)
+        transportOutBuffer.clear()
+      }
+
+      /**
+       * Push all accumulated outbound bytes to cipherOut in a single emission.
+       * Called once per pump() invocation after the do-while loop completes.
+       * Batching reduces per-push scheduling overhead for small messages.
+       */
+      private def flushPendingOutbound(): Unit = {
+        if (pendingOutboundBytes.nonEmpty && isAvailable(cipherOut) && 
!cipherOutTerminated) {
+          push(cipherOut, pendingOutboundBytes)
+          pendingOutboundBytes = ByteString.empty
+        }
+      }
+
+      /**
+       * Push decrypted bytes from userOutBuffer to plainOut.
+       * Only called when plainOut is known to be available (guaranteed by the
+       * inboundReady check in stepXxx methods).
+       *
+       * Also resets the JDK unwrap-loop bug guard counter — a successful flush
+       * proves the engine is making forward progress.
+       */
+      private def flushToUser(): Unit = {
+        unwrapPutBackCounter = 0 // reset loop-guard on every successful 
progress
+        userOutBuffer.flip()
+        if (userOutBuffer.hasRemaining && !plainOutTerminated && !corkUser) {
+          push(plainOut, SessionBytes(currentSession, 
ByteString(userOutBuffer)))
+          pumpAgain = true
+        }
+        userOutBuffer.clear() // always restore to write mode
+      }
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Handshake / session management
+      // ──────────────────────────────────────────────────────────────────────
+
+      private def setNewSessionParameters(params: NegotiateNewSession): Unit = 
{
+        currentSession.invalidate()
+        TlsUtils.applySessionParameters(engine, params)
+        engine.beginHandshake()
+        lastHandshakeStatus = engine.getHandshakeStatus
+        corkUser = true
+      }
+
+      private def handshakeFinished(): Unit = {
+        val session = engine.getSession
+        verifySession(session) match {
+          case Success(()) =>
+            currentSession = session
+            corkUser = false
+            // Push any decrypted data that was buffered during the handshake.
+            flushToUser()
+          case Failure(ex) =>
+            failTls(ex)
+        }
+      }
+
+      /**
+       * Run all delegated tasks synchronously.
+       * NEED_TASK status means the engine has CPU-bound work (e.g. certificate
+       * verification) that must complete before wrap/unwrap can continue.
+       * Running tasks inline avoids the threading complexity of async 
delegation.
+       */
+      @tailrec
+      private def runDelegatedTasks(): Unit = {
+        val task = engine.getDelegatedTask
+        if (task ne null) {
+          task.run()
+          runDelegatedTasks()
+        } else {
+          // After all tasks finish, re-read the handshake status — tasks can
+          // advance the engine state (e.g. NEED_TASK → NEED_WRAP).
+          lastHandshakeStatus = engine.getHandshakeStatus
+        }
+      }
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Phase transitions
+      // ──────────────────────────────────────────────────────────────────────
+
+      private def transitTo(phase: TlsPhase): Unit = {
+        currentPhase = phase
+        pumpAgain = true // always re-evaluate after a phase change
+      }
+
+      /**
+       * Decide next phase when engine-close or outbound-depletion is detected:
+       * if outbound is already exhausted, proceed directly to Done;
+       * otherwise flush remaining NEED_WRAP close_notify messages first.
+       */
+      private def completeOrFlush(): Unit =
+        if (engine.isOutboundDone || (engine.isInboundDone && 
userInChoppingBlock.isEmpty))
+          transitTo(Done)
+        else
+          transitTo(FlushingOutbound)
+
+      // ──────────────────────────────────────────────────────────────────────
+      // Error handling
+      // ──────────────────────────────────────────────────────────────────────
+
+      /**
+       * Terminate the TLS stage with an error, propagating it to both output 
ports
+       * and cancelling both input ports.
+       *
+       * Idempotent: subsequent calls after the first are no-ops.
+       */
+      private def failTls(ex: Throwable): Unit = {
+        if (!stopped) {
+          stopped = true
+          if (!isClosed(cipherIn)) cancel(cipherIn)
+          if (!isClosed(plainIn)) cancel(plainIn)
+          if (!isClosed(cipherOut)) fail(cipherOut, ex)
+          if (!isClosed(plainOut)) fail(plainOut, ex)
+          completeStage()
+        }
+      }
+
+    } // end GraphStageLogic
+
+} // end TlsGraphStage
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala
index 88c2585d3d..a615f37b77 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala
@@ -22,6 +22,7 @@ import pekko.NotUsed
 import pekko.stream._
 import pekko.stream.TLSProtocol._
 import pekko.stream.impl.io.TlsModule
+import pekko.stream.impl.io.TlsGraphStage
 import pekko.util.ByteString
 
 /**
@@ -76,8 +77,16 @@ object TLS {
       createSSLEngine: () => SSLEngine,
       verifySession: SSLSession => Try[Unit],
       closing: TLSClosing): scaladsl.BidiFlow[SslTlsOutbound, ByteString, 
ByteString, SslTlsInbound, NotUsed] =
-    scaladsl.BidiFlow.fromGraph(
-      TlsModule(Attributes.none, () => createSSLEngine(), session => 
verifySession(session), closing))
+    if (TlsGraphStage.useLegacyActor)
+      scaladsl.BidiFlow.fromGraph(
+        TlsModule(Attributes.none, () => createSSLEngine(), session => 
verifySession(session), closing))
+    else
+      // Force an async island boundary. Counter-intuitive for a GraphStage, 
but required here:
+      // SSLEngine is not thread-safe and the async boundary enables 
pipelining between
+      // encrypt/decrypt and I/O, improving throughput for small-message 
workloads.
+      scaladsl.BidiFlow
+        .fromGraph(new TlsGraphStage(createSSLEngine, verifySession, closing))
+        .addAttributes(Attributes.asyncBoundary)
 
   /**
    * Create a StreamTls [[pekko.stream.scaladsl.BidiFlow]].


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to