This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch pr-2878-clean in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 666b0fcdd3366515027b90d9fbea1992ff7610fb Author: He-Pin <[email protected]> AuthorDate: Sun Apr 26 00:33:28 2026 +0800 test(stream): run TlsSpec against both TLS implementations + add GraphStage edge cases Motivation: After wiring the TlsGraphStage routing layer in cc73eced66, the existing TlsSpec suite only exercised whichever implementation `TLS.UseLegacyActor` selected at JVM startup. Without a way to drive both paths in the same JVM run, regression coverage was halved. We also lacked focused tests for GraphStage-only properties (async-island isolation, BUFFER_UNDERFLOW recovery on fragmented input, plainOut backpressure, per-materialization engine isolation) that the legacy actor path either does not have or trivially passes by virtue of its different substrate. Modification: - TlsSpec.scala: extract the suite body into AbstractTlsSpec parameterized by `useLegacyActor`. Replace direct `TLS(...)` calls with a `tlsBidi` helper that bypasses the global `TLS.UseLegacyActor` switch and instead instantiates either `TlsModule` (legacy) or `TlsGraphStage` (new) based on the subclass parameter. `TlsSpec` exercises the legacy actor path; the new `TlsGraphStageSpec` exercises the GraphStage path with the identical body. Suite name suffix differentiates output. - New TlsGraphStageEdgeCasesSpec.scala: five focused cases for properties unique to the GraphStage path: async-island wrapping, BUFFER_UNDERFLOW recovery from one-byte cipher fragments, plainOut backpressure under 10x16KiB blocks, per-materialization engine isolation across three consecutive runs, and EagerClose-with-empty-source clean completion. Tests follow the same termination model as TlsSpec (both sides use `IgnoreComplete`, sink drives termination via cancellation after a byte-count threshold). Result: - Both TLS implementations now run the full 111-case suite in the same test run. - Five additional edge-case tests guard against GraphStage-specific regressions; all pass in 1.3s on JDK 21. - Test infrastructure no longer depends on JVM-global config to choose which path to exercise. References: - PR #2878 --- .../stream/io/TlsGraphStageEdgeCasesSpec.scala | 168 +++++++++++++++++++++ .../scala/org/apache/pekko/stream/io/TlsSpec.scala | 39 ++++- 2 files changed, 199 insertions(+), 8 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala new file mode 100644 index 0000000000..5951895587 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.stream.io + +import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession } + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.{ Success, Try } + +import org.apache.pekko +import pekko.NotUsed +import pekko.stream._ +import pekko.stream.TLSProtocol._ +import pekko.stream.impl.io.TlsGraphStage +import pekko.stream.scaladsl._ +import pekko.stream.testkit.StreamSpec +import pekko.testkit.TestDuration +import pekko.util.ByteString + +/** + * Edge cases that specifically exercise the [[TlsGraphStage]] direct-push engine. + * These complement [[TlsGraphStageSpec]] (which mirrors the full legacy suite) by + * targeting properties that only the GraphStage path has: async-island isolation, + * BUFFER_UNDERFLOW recovery on fragmented cipher input, plainOut backpressure handling, + * and per-materialization engine isolation. + * + * Tests follow the same termination model as [[TlsSpec]]: both sides use + * `IgnoreComplete`, and the sink drives termination by cancelling once it has + * received the expected number of bytes (the resulting cancel cascade closes + * the TLS sessions cleanly). + */ +class TlsGraphStageEdgeCasesSpec extends StreamSpec(TlsSpec.configOverrides) { + + private val sslContext: SSLContext = TlsSpec.initSslContext("TLSv1.2") + private val ciphers = TlsSpec.TLS12Ciphers.toArray + + private def engine(role: TLSRole): SSLEngine = { + val e = sslContext.createSSLEngine() + e.setUseClientMode(role == Client) + e.setEnabledCipherSuites(ciphers) + e.setEnabledProtocols(Array("TLSv1.2")) + e + } + + private def graphStageBidi( + role: TLSRole, + closing: TLSClosing = IgnoreComplete, + verifySession: SSLSession => Try[Unit] = _ => Success(())) + : BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + BidiFlow + .fromGraph(new TlsGraphStage(() => engine(role), verifySession, closing)) + .withAttributes(Attributes.asyncBoundary) + + /** + * Drive a roundtrip and stop once `expectedBytes` plain bytes have been + * collected on the sink side; the resulting cancel propagates through both + * TLS sessions and lets the stream shut down cleanly. + */ + private def collectExactly( + stream: Source[SslTlsInbound, NotUsed], + expectedBytes: Int, + timeout: FiniteDuration = 30.seconds): ByteString = + Await.result( + stream + .collect { case SessionBytes(_, b) => b } + .scan(ByteString.empty)(_ ++ _) + .dropWhile(_.size < expectedBytes) + .runWith(Sink.headOption), + timeout.dilated).getOrElse(ByteString.empty) + + "TlsGraphStage" must { + + "be wrapped in its own async island via Attributes.asyncBoundary" in { + val client = graphStageBidi(Client) + val server = graphStageBidi(Server) + val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => SendBytes(b) } + val payload = ByteString("ping") + val received = collectExactly( + Source.single[SslTlsOutbound](SendBytes(payload)) + .via(client.atop(server.reversed).join(echo)), + payload.size) + received shouldBe payload + } + + "decode cipher payloads delivered as one-byte fragments (BUFFER_UNDERFLOW recovery)" in { + // Fragmenting cipher chunks to one byte each is the most aggressive way to force + // BUFFER_UNDERFLOW: every TLS record header (5 bytes) and the entire payload arrive + // across many separate `onPush` events. The stage must keep pulling more bytes + // instead of deadlocking once it sees its first underflow. + val client = graphStageBidi(Client) + val server = graphStageBidi(Server) + val fragmenter = Flow[ByteString].mapConcat(_.toIndexedSeq.map(b => ByteString(b))) + val transport = BidiFlow.fromFlows(fragmenter, fragmenter) + val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => SendBytes(b) } + val payload = ByteString("0123456789" * 200) // 2000 bytes of varied content + val received = collectExactly( + Source.single[SslTlsOutbound](SendBytes(payload)) + .via(client.atop(transport).atop(server.reversed).join(echo)), + payload.size, + timeout = 60.seconds) + received shouldBe payload + } + + "deliver all bytes when plainOut downstream is slow (backpressure)" in { + val client = graphStageBidi(Client) + val server = graphStageBidi(Server) + // 10x ~16 KiB blocks crosses several TLS records and sustained plainOut + // backpressure forces the stage to buffer/wait without losing data. + val blocks = (0 until 10).map(_ => ByteString(("a" * 16384))) + val totalSize = blocks.map(_.length).sum + val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => SendBytes(b) } + val received = Await.result( + Source(blocks) + .map[SslTlsOutbound](b => SendBytes(b)) + .via(client.atop(server.reversed).join(echo)) + .collect { case SessionBytes(_, b) => b } + .throttle(1, 10.millis) + .scan(ByteString.empty)(_ ++ _) + .dropWhile(_.size < totalSize) + .runWith(Sink.headOption), + 60.seconds.dilated).getOrElse(ByteString.empty) + received.length shouldBe totalSize + received shouldBe blocks.foldLeft(ByteString.empty)(_ ++ _) + } + + "give each materialization its own SSLEngine (no shared state)" in { + // If the engine factory were memoized incorrectly, running the same flow twice + // would reuse a closed engine and the second run would fail. + def round(payload: ByteString): ByteString = { + val client = graphStageBidi(Client) + val server = graphStageBidi(Server) + val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => SendBytes(b) } + collectExactly( + Source.single[SslTlsOutbound](SendBytes(payload)) + .via(client.atop(server.reversed).join(echo)), + payload.size) + } + + round(ByteString("first")) shouldBe ByteString("first") + round(ByteString("second")) shouldBe ByteString("second") + round(ByteString("third")) shouldBe ByteString("third") + } + + "complete cleanly when source is empty (CompletedImmediately variant)" in { + // Both sides use EagerClose: an empty source completes immediately, the + // close cascade fires before any payload travels, and both stages must + // finalize without errors. No bytes should be delivered. + val client = graphStageBidi(Client, EagerClose) + val server = graphStageBidi(Server, EagerClose) + val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => SendBytes(b) } + val received = Await.result( + Source.empty[SslTlsOutbound] + .via(client.atop(server.reversed).join(echo)) + .collect { case SessionBytes(_, b) => b } + .runFold(ByteString.empty)(_ ++ _), + 30.seconds.dilated) + received shouldBe ByteString.empty + } + } +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsSpec.scala index 10f3655f81..580817f140 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsSpec.scala @@ -23,7 +23,7 @@ import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ -import scala.util.Random +import scala.util.{ Random, Success } import org.apache.pekko import pekko.NotUsed @@ -31,6 +31,7 @@ import pekko.pattern.{ after => later } import pekko.stream._ import pekko.stream.TLSProtocol._ import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import pekko.stream.impl.io.{ TlsGraphStage, TlsModule } import pekko.stream.scaladsl._ import pekko.stream.stage._ import pekko.stream.testkit._ @@ -111,12 +112,34 @@ object TlsSpec { """ } -class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing { +class TlsSpec extends AbstractTlsSpec(useLegacyActor = true) +class TlsGraphStageSpec extends AbstractTlsSpec(useLegacyActor = false) + +abstract class AbstractTlsSpec(useLegacyActor: Boolean) + extends StreamSpec(TlsSpec.configOverrides) + with WithLogCapturing { import GraphDSL.Implicits._ import TlsSpec._ import system.dispatcher - "SslTls" must { + /** + * Build a TLS BidiFlow without going through the global `TLS.UseLegacyActor` switch so + * each subclass can independently exercise the legacy actor path or the GraphStage path + * within the same JVM. + */ + protected def tlsBidi( + createSSLEngine: () => SSLEngine, + verifySession: SSLSession => scala.util.Try[Unit] = _ => Success(()), + closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + if (useLegacyActor) + BidiFlow.fromGraph( + TlsModule(Attributes.none, () => createSSLEngine(), verifySession, closing)) + else + BidiFlow + .fromGraph(new TlsGraphStage(createSSLEngine, verifySession, closing)) + .withAttributes(Attributes.asyncBoundary) + + s"SslTls (${if (useLegacyActor) "legacy actor path" else "GraphStage path"})" must { "work for TLSv1.2" must { workFor("TLSv1.2", TLS12Ciphers) } "work for TLSv1.3" must { workFor("TLSv1.3", TLS13Ciphers) } @@ -163,13 +186,13 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing } def clientTls(closing: TLSClosing) = - TLS(() => createSSLEngine(sslContext, Client), closing) + tlsBidi(() => createSSLEngine(sslContext, Client), closing = closing) def badClientTls(closing: TLSClosing) = - TLS(() => createSSLEngine(initWithTrust("/badtruststore", protocol), Client), closing) + tlsBidi(() => createSSLEngine(initWithTrust("/badtruststore", protocol), Client), closing = closing) def serverTls(closing: TLSClosing) = - TLS(() => createSSLEngine(sslContext, Server), closing) + tlsBidi(() => createSSLEngine(sslContext, Server), closing = closing) trait Named { def name: String = @@ -567,9 +590,9 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing case SessionTruncated => SendBytes(ByteString.empty) case SessionBytes(_, b) => SendBytes(b) } - val clientTls = TLS( + val clientTls = tlsBidi( () => createSSLEngine2(sslContext, Client, hostnameVerification = true, hostInfo = Some((hostName, 80))), - EagerClose) + closing = EagerClose) val flow = clientTls.atop(serverTls(EagerClose).reversed).join(rhs) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
