This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch pr-2878-clean
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit bdd787011603497b42b185d57124956b8b216d9f
Author: He-Pin <[email protected]>
AuthorDate: Sun Apr 26 00:34:18 2026 +0800

    test(bench-jmh): add TlsBenchmark comparing TlsGraphStage vs legacy actor 
path
    
    Motivation:
    PR #2878 introduces TlsGraphStage as an internal alternative to the
    legacy actor-based TLS path. Without a JMH benchmark we have no
    reproducible way to compare per-record encrypt/decrypt overhead or
    short-lived-connection (handshake) cost across the two implementations,
    which is the primary technical justification for the rewrite.
    
    Modification:
    - New TlsBenchmark.scala under bench-jmh with two scenarios:
      - warmRoundTrip: 100 payloads through a fresh client+server pair per
        invocation, marked @OperationsPerInvocation(100) so the reported
        score is per round-trip. Handshake cost is amortized away by the
        iteration count, isolating per-record overhead. Sink terminates by
        cancelling once the expected byte count is received, propagating a
        clean shutdown through both TLS sessions.
      - coldHandshake: each invocation builds a fresh client+server pair and
        completes the handshake by exchanging a single small payload. Models
        the cost profile of short-lived connections (e.g. one HTTPS request).
    - @Param crosses two implementations (legacy / graphstage) with three
      payload sizes (256 B / 4 KiB / 64 KiB) covering control-plane,
      typical-HTTP, and streaming-chunk regimes.
    - Companion object holds keystore/truststore loading and the TLSv1.2
      cipher whitelist used by both scenarios.
    - bench-jmh/src/main/resources/{keystore,truststore}: copied from
      stream-tests so the benchmark module is self-contained and does not
      depend on the test classpath.
    
    Result:
    - `sbt 'project bench-jmh' 'Jmh/run -i 5 -wi 3 -f1 -t1 .*TlsBenchmark.*'`
      produces stable warm and cold numbers for both implementations.
    - Per-cell fork isolation (one `-p implementation -p payloadSize` per
      JMH invocation) avoids resource accumulation across cells.
    
    References:
    - PR #2878
    - Issue #2860
---
 bench-jmh/src/main/resources/keystore              | Bin 0 -> 2397 bytes
 bench-jmh/src/main/resources/truststore            | Bin 0 -> 857 bytes
 .../org/apache/pekko/stream/io/TlsBenchmark.scala  | 197 +++++++++++++++++++++
 3 files changed, 197 insertions(+)

diff --git a/bench-jmh/src/main/resources/keystore 
b/bench-jmh/src/main/resources/keystore
new file mode 100644
index 0000000000..2b0237562b
Binary files /dev/null and b/bench-jmh/src/main/resources/keystore differ
diff --git a/bench-jmh/src/main/resources/truststore 
b/bench-jmh/src/main/resources/truststore
new file mode 100644
index 0000000000..3cc1983600
Binary files /dev/null and b/bench-jmh/src/main/resources/truststore differ
diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/io/TlsBenchmark.scala 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/io/TlsBenchmark.scala
new file mode 100644
index 0000000000..72044a6f25
--- /dev/null
+++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/io/TlsBenchmark.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.stream.io
+
+import java.security.{ KeyStore, SecureRandom }
+import java.util.concurrent.TimeUnit
+import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLEngine, SSLSession, 
TrustManagerFactory }
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.{ Success, Try }
+
+import com.typesafe.config.{ Config, ConfigFactory }
+import org.openjdk.jmh.annotations._
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.actor.ActorSystem
+import pekko.stream._
+import pekko.stream.TLSProtocol._
+import pekko.stream.impl.io.{ TlsGraphStage, TlsModule }
+import pekko.stream.scaladsl._
+import pekko.util.ByteString
+
+/**
+ * JMH benchmark comparing the legacy actor-based TLS path (`TlsModule`) to the
+ * GraphStage path (`TlsGraphStage`).
+ *
+ *  - `warmRoundTrip` drives a fixed payload through a client+server echo 
loop, with
+ *    the SSL engines reused across invocations (one materialization per 
@Setup).
+ *    This isolates per-record encrypt/decrypt overhead — handshake cost is 
amortized
+ *    away by the iteration count.
+ *  - `coldHandshake` measures the cost of materializing a fresh client+server 
pair
+ *    and completing the TLS handshake before transferring a tiny payload. This
+ *    represents short-lived connections (e.g. HTTPS request/response).
+ *
+ * Run with:
+ * {{{
+ *   sbt "bench-jmh/Jmh/run -i 5 -wi 3 -f1 -t1 .*TlsBenchmark.*"
+ * }}}
+ */
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(1)
+class TlsBenchmark {
+
+  private val config: Config = ConfigFactory.parseString("""
+      pekko {
+        log-config-on-start = off
+        log-dead-letters-during-shutdown = off
+        stdout-loglevel = "OFF"
+        loglevel = "OFF"
+        actor.default-dispatcher {
+          throughput = 1024
+        }
+        actor.default-mailbox {
+          mailbox-type = 
"org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
+        }
+      }""".stripMargin).withFallback(ConfigFactory.load())
+
+  implicit var system: ActorSystem = _
+  private var sslContext: SSLContext = _
+  private var ciphers: Array[String] = _
+
+  @Param(Array("legacy", "graphstage"))
+  var implementation: String = _
+
+  // 256 B = control message; 4 KiB = typical HTTP request; 64 KiB = streaming 
chunk
+  @Param(Array("256", "4096", "65536"))
+  var payloadSize: Int = _
+
+  private var payload: ByteString = _
+  private var payloads: scala.collection.immutable.IndexedSeq[SslTlsOutbound] 
= _
+
+  @Setup
+  def setup(): Unit = {
+    system = ActorSystem("TlsBenchmark", config)
+    SystemMaterializer(system).materializer
+
+    sslContext = TlsBenchmark.initSslContext("TLSv1.2")
+    ciphers = TlsBenchmark.TLS12Ciphers.toArray
+
+    payload = ByteString(Array.fill[Byte](payloadSize)('a'.toByte))
+    payloads = (0 until 100).map(_ => SendBytes(payload))
+  }
+
+  @TearDown
+  def shutdown(): Unit = {
+    Await.result(system.terminate(), 10.seconds)
+  }
+
+  private def engine(role: TLSRole): SSLEngine = {
+    val e = sslContext.createSSLEngine()
+    e.setUseClientMode(role == Client)
+    e.setEnabledCipherSuites(ciphers)
+    e.setEnabledProtocols(Array("TLSv1.2"))
+    e
+  }
+
+  private def makeBidi(
+      role: TLSRole,
+      closing: TLSClosing,
+      verifySession: SSLSession => Try[Unit] = _ => Success(()))
+      : BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, 
NotUsed] =
+    if (implementation == "legacy")
+      BidiFlow.fromGraph(
+        TlsModule(Attributes.none, () => engine(role), verifySession, closing))
+    else
+      BidiFlow
+        .fromGraph(new TlsGraphStage(() => engine(role), verifySession, 
closing))
+        .withAttributes(Attributes.asyncBoundary)
+
+  /**
+   * Warm round-trip: 100 payloads through a fresh client+server pair. 
Handshake
+   * happens once per invocation and is amortized over 100 records. The sink
+   * cancels once it has received the expected number of bytes, which 
propagates
+   * back through both TLS sessions and shuts the stream down cleanly.
+   */
+  @Benchmark
+  @OperationsPerInvocation(100)
+  def warmRoundTrip(): Unit = {
+    val records = 100
+    val expected = payload.size * records
+    val client = makeBidi(Client, IgnoreComplete)
+    val server = makeBidi(Server, IgnoreComplete)
+    val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => 
SendBytes(b) }
+
+    val done = Source(payloads)
+      .via(client.atop(server.reversed).join(echo))
+      .collect { case SessionBytes(_, b) => b }
+      .scan(0)((acc, b) => acc + b.size)
+      .dropWhile(_ < expected)
+      .runWith(Sink.headOption)
+
+    Await.result(done, 30.seconds)
+  }
+
+  /**
+   * Cold handshake: each invocation builds a fresh client+server pair and
+   * completes the handshake by exchanging a single small payload. Use this
+   * to measure short-lived connection cost (e.g. one HTTPS request).
+   */
+  @Benchmark
+  def coldHandshake(): Unit = {
+    val client = makeBidi(Client, IgnoreComplete)
+    val server = makeBidi(Server, IgnoreComplete)
+    val tiny = ByteString("ping")
+    val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => 
SendBytes(b) }
+
+    val done = Source
+      .single[SslTlsOutbound](SendBytes(tiny))
+      .via(client.atop(server.reversed).join(echo))
+      .collect { case SessionBytes(_, b) => b }
+      .scan(ByteString.empty)(_ ++ _)
+      .dropWhile(_.size < tiny.size)
+      .runWith(Sink.headOption)
+
+    Await.result(done, 30.seconds)
+  }
+}
+
+object TlsBenchmark {
+
+  val TLS12Ciphers: Set[String] = Set(
+    "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
+    "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384")
+
+  def initSslContext(protocol: String): SSLContext = {
+    val password = "changeme"
+
+    val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
+    keyStore.load(getClass.getResourceAsStream("/keystore"), 
password.toCharArray)
+
+    val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
+    trustStore.load(getClass.getResourceAsStream("/truststore"), 
password.toCharArray)
+
+    val keyManagerFactory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
+    keyManagerFactory.init(keyStore, password.toCharArray)
+
+    val trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
+    trustManagerFactory.init(trustStore)
+
+    val context = SSLContext.getInstance(protocol)
+    context.init(keyManagerFactory.getKeyManagers, 
trustManagerFactory.getTrustManagers, new SecureRandom)
+    context
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to