This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch pr-2878-clean
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 666b0fcdd3366515027b90d9fbea1992ff7610fb
Author: He-Pin <[email protected]>
AuthorDate: Sun Apr 26 00:33:28 2026 +0800

    test(stream): run TlsSpec against both TLS implementations + add GraphStage 
edge cases
    
    Motivation:
    After wiring the TlsGraphStage routing layer in cc73eced66, the existing
    TlsSpec suite only exercised whichever implementation `TLS.UseLegacyActor`
    selected at JVM startup. Without a way to drive both paths in the same
    JVM run, regression coverage was halved. We also lacked focused tests for
    GraphStage-only properties (async-island isolation, BUFFER_UNDERFLOW
    recovery on fragmented input, plainOut backpressure, per-materialization
    engine isolation) that the legacy actor path either does not have or
    trivially passes by virtue of its different substrate.
    
    Modification:
    - TlsSpec.scala: extract the suite body into AbstractTlsSpec parameterized
      by `useLegacyActor`. Replace direct `TLS(...)` calls with a `tlsBidi`
      helper that bypasses the global `TLS.UseLegacyActor` switch and instead
      instantiates either `TlsModule` (legacy) or `TlsGraphStage` (new) based
      on the subclass parameter. `TlsSpec` exercises the legacy actor path;
      the new `TlsGraphStageSpec` exercises the GraphStage path with the
      identical body. Suite name suffix differentiates output.
    - New TlsGraphStageEdgeCasesSpec.scala: five focused cases for properties
      unique to the GraphStage path: async-island wrapping, BUFFER_UNDERFLOW
      recovery from one-byte cipher fragments, plainOut backpressure under
      10x16KiB blocks, per-materialization engine isolation across three
      consecutive runs, and EagerClose-with-empty-source clean completion.
      Tests follow the same termination model as TlsSpec (both sides use
      `IgnoreComplete`, sink drives termination via cancellation after a
      byte-count threshold).
    
    Result:
    - Both TLS implementations now run the full 111-case suite in the same
      test run.
    - Five additional edge-case tests guard against GraphStage-specific
      regressions; all pass in 1.3s on JDK 21.
    - Test infrastructure no longer depends on JVM-global config to choose
      which path to exercise.
    
    References:
    - PR #2878
---
 .../stream/io/TlsGraphStageEdgeCasesSpec.scala     | 168 +++++++++++++++++++++
 .../scala/org/apache/pekko/stream/io/TlsSpec.scala |  39 ++++-
 2 files changed, 199 insertions(+), 8 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
new file mode 100644
index 0000000000..5951895587
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.stream.io
+
+import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession }
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.{ Success, Try }
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.stream._
+import pekko.stream.TLSProtocol._
+import pekko.stream.impl.io.TlsGraphStage
+import pekko.stream.scaladsl._
+import pekko.stream.testkit.StreamSpec
+import pekko.testkit.TestDuration
+import pekko.util.ByteString
+
+/**
+ * Edge cases that specifically exercise the [[TlsGraphStage]] direct-push 
engine.
+ * These complement [[TlsGraphStageSpec]] (which mirrors the full legacy 
suite) by
+ * targeting properties that only the GraphStage path has: async-island 
isolation,
+ * BUFFER_UNDERFLOW recovery on fragmented cipher input, plainOut backpressure 
handling,
+ * and per-materialization engine isolation.
+ *
+ * Tests follow the same termination model as [[TlsSpec]]: both sides use
+ * `IgnoreComplete`, and the sink drives termination by cancelling once it has
+ * received the expected number of bytes (the resulting cancel cascade closes
+ * the TLS sessions cleanly).
+ */
+class TlsGraphStageEdgeCasesSpec extends StreamSpec(TlsSpec.configOverrides) {
+
+  private val sslContext: SSLContext = TlsSpec.initSslContext("TLSv1.2")
+  private val ciphers = TlsSpec.TLS12Ciphers.toArray
+
+  private def engine(role: TLSRole): SSLEngine = {
+    val e = sslContext.createSSLEngine()
+    e.setUseClientMode(role == Client)
+    e.setEnabledCipherSuites(ciphers)
+    e.setEnabledProtocols(Array("TLSv1.2"))
+    e
+  }
+
+  private def graphStageBidi(
+      role: TLSRole,
+      closing: TLSClosing = IgnoreComplete,
+      verifySession: SSLSession => Try[Unit] = _ => Success(()))
+      : BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, 
NotUsed] =
+    BidiFlow
+      .fromGraph(new TlsGraphStage(() => engine(role), verifySession, closing))
+      .withAttributes(Attributes.asyncBoundary)
+
+  /**
+   * Drive a roundtrip and stop once `expectedBytes` plain bytes have been
+   * collected on the sink side; the resulting cancel propagates through both
+   * TLS sessions and lets the stream shut down cleanly.
+   */
+  private def collectExactly(
+      stream: Source[SslTlsInbound, NotUsed],
+      expectedBytes: Int,
+      timeout: FiniteDuration = 30.seconds): ByteString =
+    Await.result(
+      stream
+        .collect { case SessionBytes(_, b) => b }
+        .scan(ByteString.empty)(_ ++ _)
+        .dropWhile(_.size < expectedBytes)
+        .runWith(Sink.headOption),
+      timeout.dilated).getOrElse(ByteString.empty)
+
+  "TlsGraphStage" must {
+
+    "be wrapped in its own async island via Attributes.asyncBoundary" in {
+      val client = graphStageBidi(Client)
+      val server = graphStageBidi(Server)
+      val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => 
SendBytes(b) }
+      val payload = ByteString("ping")
+      val received = collectExactly(
+        Source.single[SslTlsOutbound](SendBytes(payload))
+          .via(client.atop(server.reversed).join(echo)),
+        payload.size)
+      received shouldBe payload
+    }
+
+    "decode cipher payloads delivered as one-byte fragments (BUFFER_UNDERFLOW 
recovery)" in {
+      // Fragmenting cipher chunks to one byte each is the most aggressive way 
to force
+      // BUFFER_UNDERFLOW: every TLS record header (5 bytes) and the entire 
payload arrive
+      // across many separate `onPush` events. The stage must keep pulling 
more bytes
+      // instead of deadlocking once it sees its first underflow.
+      val client = graphStageBidi(Client)
+      val server = graphStageBidi(Server)
+      val fragmenter = Flow[ByteString].mapConcat(_.toIndexedSeq.map(b => 
ByteString(b)))
+      val transport = BidiFlow.fromFlows(fragmenter, fragmenter)
+      val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => 
SendBytes(b) }
+      val payload = ByteString("0123456789" * 200) // 2000 bytes of varied 
content
+      val received = collectExactly(
+        Source.single[SslTlsOutbound](SendBytes(payload))
+          .via(client.atop(transport).atop(server.reversed).join(echo)),
+        payload.size,
+        timeout = 60.seconds)
+      received shouldBe payload
+    }
+
+    "deliver all bytes when plainOut downstream is slow (backpressure)" in {
+      val client = graphStageBidi(Client)
+      val server = graphStageBidi(Server)
+      // 10x ~16 KiB blocks crosses several TLS records and sustained plainOut
+      // backpressure forces the stage to buffer/wait without losing data.
+      val blocks = (0 until 10).map(_ => ByteString(("a" * 16384)))
+      val totalSize = blocks.map(_.length).sum
+      val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => 
SendBytes(b) }
+      val received = Await.result(
+        Source(blocks)
+          .map[SslTlsOutbound](b => SendBytes(b))
+          .via(client.atop(server.reversed).join(echo))
+          .collect { case SessionBytes(_, b) => b }
+          .throttle(1, 10.millis)
+          .scan(ByteString.empty)(_ ++ _)
+          .dropWhile(_.size < totalSize)
+          .runWith(Sink.headOption),
+        60.seconds.dilated).getOrElse(ByteString.empty)
+      received.length shouldBe totalSize
+      received shouldBe blocks.foldLeft(ByteString.empty)(_ ++ _)
+    }
+
+    "give each materialization its own SSLEngine (no shared state)" in {
+      // If the engine factory were memoized incorrectly, running the same 
flow twice
+      // would reuse a closed engine and the second run would fail.
+      def round(payload: ByteString): ByteString = {
+        val client = graphStageBidi(Client)
+        val server = graphStageBidi(Server)
+        val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => 
SendBytes(b) }
+        collectExactly(
+          Source.single[SslTlsOutbound](SendBytes(payload))
+            .via(client.atop(server.reversed).join(echo)),
+          payload.size)
+      }
+
+      round(ByteString("first")) shouldBe ByteString("first")
+      round(ByteString("second")) shouldBe ByteString("second")
+      round(ByteString("third")) shouldBe ByteString("third")
+    }
+
+    "complete cleanly when source is empty (CompletedImmediately variant)" in {
+      // Both sides use EagerClose: an empty source completes immediately, the
+      // close cascade fires before any payload travels, and both stages must
+      // finalize without errors. No bytes should be delivered.
+      val client = graphStageBidi(Client, EagerClose)
+      val server = graphStageBidi(Server, EagerClose)
+      val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => 
SendBytes(b) }
+      val received = Await.result(
+        Source.empty[SslTlsOutbound]
+          .via(client.atop(server.reversed).join(echo))
+          .collect { case SessionBytes(_, b) => b }
+          .runFold(ByteString.empty)(_ ++ _),
+        30.seconds.dilated)
+      received shouldBe ByteString.empty
+    }
+  }
+}
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsSpec.scala
index 10f3655f81..580817f140 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsSpec.scala
@@ -23,7 +23,7 @@ import scala.collection.immutable
 import scala.concurrent.Await
 import scala.concurrent.Future
 import scala.concurrent.duration._
-import scala.util.Random
+import scala.util.{ Random, Success }
 
 import org.apache.pekko
 import pekko.NotUsed
@@ -31,6 +31,7 @@ import pekko.pattern.{ after => later }
 import pekko.stream._
 import pekko.stream.TLSProtocol._
 import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
+import pekko.stream.impl.io.{ TlsGraphStage, TlsModule }
 import pekko.stream.scaladsl._
 import pekko.stream.stage._
 import pekko.stream.testkit._
@@ -111,12 +112,34 @@ object TlsSpec {
     """
 }
 
-class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with 
WithLogCapturing {
+class TlsSpec extends AbstractTlsSpec(useLegacyActor = true)
+class TlsGraphStageSpec extends AbstractTlsSpec(useLegacyActor = false)
+
+abstract class AbstractTlsSpec(useLegacyActor: Boolean)
+    extends StreamSpec(TlsSpec.configOverrides)
+    with WithLogCapturing {
   import GraphDSL.Implicits._
   import TlsSpec._
   import system.dispatcher
 
-  "SslTls" must {
+  /**
+   * Build a TLS BidiFlow without going through the global 
`TLS.UseLegacyActor` switch so
+   * each subclass can independently exercise the legacy actor path or the 
GraphStage path
+   * within the same JVM.
+   */
+  protected def tlsBidi(
+      createSSLEngine: () => SSLEngine,
+      verifySession: SSLSession => scala.util.Try[Unit] = _ => Success(()),
+      closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, 
SslTlsInbound, NotUsed] =
+    if (useLegacyActor)
+      BidiFlow.fromGraph(
+        TlsModule(Attributes.none, () => createSSLEngine(), verifySession, 
closing))
+    else
+      BidiFlow
+        .fromGraph(new TlsGraphStage(createSSLEngine, verifySession, closing))
+        .withAttributes(Attributes.asyncBoundary)
+
+  s"SslTls (${if (useLegacyActor) "legacy actor path" else "GraphStage 
path"})" must {
     "work for TLSv1.2" must { workFor("TLSv1.2", TLS12Ciphers) }
 
     "work for TLSv1.3" must { workFor("TLSv1.3", TLS13Ciphers) }
@@ -163,13 +186,13 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) 
with WithLogCapturing
       }
 
       def clientTls(closing: TLSClosing) =
-        TLS(() => createSSLEngine(sslContext, Client), closing)
+        tlsBidi(() => createSSLEngine(sslContext, Client), closing = closing)
 
       def badClientTls(closing: TLSClosing) =
-        TLS(() => createSSLEngine(initWithTrust("/badtruststore", protocol), 
Client), closing)
+        tlsBidi(() => createSSLEngine(initWithTrust("/badtruststore", 
protocol), Client), closing = closing)
 
       def serverTls(closing: TLSClosing) =
-        TLS(() => createSSLEngine(sslContext, Server), closing)
+        tlsBidi(() => createSSLEngine(sslContext, Server), closing = closing)
 
       trait Named {
         def name: String =
@@ -567,9 +590,9 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) 
with WithLogCapturing
             case SessionTruncated   => SendBytes(ByteString.empty)
             case SessionBytes(_, b) => SendBytes(b)
           }
-          val clientTls = TLS(
+          val clientTls = tlsBidi(
             () => createSSLEngine2(sslContext, Client, hostnameVerification = 
true, hostInfo = Some((hostName, 80))),
-            EagerClose)
+            closing = EagerClose)
 
           val flow = clientTls.atop(serverTls(EagerClose).reversed).join(rhs)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to