This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch pr-2878-clean in repository https://gitbox.apache.org/repos/asf/pekko.git
commit bdd787011603497b42b185d57124956b8b216d9f Author: He-Pin <[email protected]> AuthorDate: Sun Apr 26 00:34:18 2026 +0800 test(bench-jmh): add TlsBenchmark comparing TlsGraphStage vs legacy actor path Motivation: PR #2878 introduces TlsGraphStage as an internal alternative to the legacy actor-based TLS path. Without a JMH benchmark we have no reproducible way to compare per-record encrypt/decrypt overhead or short-lived-connection (handshake) cost across the two implementations, which is the primary technical justification for the rewrite. Modification: - New TlsBenchmark.scala under bench-jmh with two scenarios: - warmRoundTrip: 100 payloads through a fresh client+server pair per invocation, marked @OperationsPerInvocation(100) so the reported score is per round-trip. Handshake cost is amortized away by the iteration count, isolating per-record overhead. Sink terminates by cancelling once the expected byte count is received, propagating a clean shutdown through both TLS sessions. - coldHandshake: each invocation builds a fresh client+server pair and completes the handshake by exchanging a single small payload. Models the cost profile of short-lived connections (e.g. one HTTPS request). - @Param crosses two implementations (legacy / graphstage) with three payload sizes (256 B / 4 KiB / 64 KiB) covering control-plane, typical-HTTP, and streaming-chunk regimes. - Companion object holds keystore/truststore loading and the TLSv1.2 cipher whitelist used by both scenarios. - bench-jmh/src/main/resources/{keystore,truststore}: copied from stream-tests so the benchmark module is self-contained and does not depend on the test classpath. Result: - `sbt 'project bench-jmh' 'Jmh/run -i 5 -wi 3 -f1 -t1 .*TlsBenchmark.*'` produces stable warm and cold numbers for both implementations. - Per-cell fork isolation (one `-p implementation -p payloadSize` per JMH invocation) avoids resource accumulation across cells. References: - PR #2878 - Issue #2860 --- bench-jmh/src/main/resources/keystore | Bin 0 -> 2397 bytes bench-jmh/src/main/resources/truststore | Bin 0 -> 857 bytes .../org/apache/pekko/stream/io/TlsBenchmark.scala | 197 +++++++++++++++++++++ 3 files changed, 197 insertions(+) diff --git a/bench-jmh/src/main/resources/keystore b/bench-jmh/src/main/resources/keystore new file mode 100644 index 0000000000..2b0237562b Binary files /dev/null and b/bench-jmh/src/main/resources/keystore differ diff --git a/bench-jmh/src/main/resources/truststore b/bench-jmh/src/main/resources/truststore new file mode 100644 index 0000000000..3cc1983600 Binary files /dev/null and b/bench-jmh/src/main/resources/truststore differ diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/io/TlsBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/io/TlsBenchmark.scala new file mode 100644 index 0000000000..72044a6f25 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/io/TlsBenchmark.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.stream.io + +import java.security.{ KeyStore, SecureRandom } +import java.util.concurrent.TimeUnit +import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLEngine, SSLSession, TrustManagerFactory } + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.{ Success, Try } + +import com.typesafe.config.{ Config, ConfigFactory } +import org.openjdk.jmh.annotations._ + +import org.apache.pekko +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream._ +import pekko.stream.TLSProtocol._ +import pekko.stream.impl.io.{ TlsGraphStage, TlsModule } +import pekko.stream.scaladsl._ +import pekko.util.ByteString + +/** + * JMH benchmark comparing the legacy actor-based TLS path (`TlsModule`) to the + * GraphStage path (`TlsGraphStage`). + * + * - `warmRoundTrip` drives a fixed payload through a client+server echo loop, with + * the SSL engines reused across invocations (one materialization per @Setup). + * This isolates per-record encrypt/decrypt overhead — handshake cost is amortized + * away by the iteration count. + * - `coldHandshake` measures the cost of materializing a fresh client+server pair + * and completing the TLS handshake before transferring a tiny payload. This + * represents short-lived connections (e.g. HTTPS request/response). + * + * Run with: + * {{{ + * sbt "bench-jmh/Jmh/run -i 5 -wi 3 -f1 -t1 .*TlsBenchmark.*" + * }}} + */ +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(1) +class TlsBenchmark { + + private val config: Config = ConfigFactory.parseString(""" + pekko { + log-config-on-start = off + log-dead-letters-during-shutdown = off + stdout-loglevel = "OFF" + loglevel = "OFF" + actor.default-dispatcher { + throughput = 1024 + } + actor.default-mailbox { + mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox" + } + }""".stripMargin).withFallback(ConfigFactory.load()) + + implicit var system: ActorSystem = _ + private var sslContext: SSLContext = _ + private var ciphers: Array[String] = _ + + @Param(Array("legacy", "graphstage")) + var implementation: String = _ + + // 256 B = control message; 4 KiB = typical HTTP request; 64 KiB = streaming chunk + @Param(Array("256", "4096", "65536")) + var payloadSize: Int = _ + + private var payload: ByteString = _ + private var payloads: scala.collection.immutable.IndexedSeq[SslTlsOutbound] = _ + + @Setup + def setup(): Unit = { + system = ActorSystem("TlsBenchmark", config) + SystemMaterializer(system).materializer + + sslContext = TlsBenchmark.initSslContext("TLSv1.2") + ciphers = TlsBenchmark.TLS12Ciphers.toArray + + payload = ByteString(Array.fill[Byte](payloadSize)('a'.toByte)) + payloads = (0 until 100).map(_ => SendBytes(payload)) + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 10.seconds) + } + + private def engine(role: TLSRole): SSLEngine = { + val e = sslContext.createSSLEngine() + e.setUseClientMode(role == Client) + e.setEnabledCipherSuites(ciphers) + e.setEnabledProtocols(Array("TLSv1.2")) + e + } + + private def makeBidi( + role: TLSRole, + closing: TLSClosing, + verifySession: SSLSession => Try[Unit] = _ => Success(())) + : BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + if (implementation == "legacy") + BidiFlow.fromGraph( + TlsModule(Attributes.none, () => engine(role), verifySession, closing)) + else + BidiFlow + .fromGraph(new TlsGraphStage(() => engine(role), verifySession, closing)) + .withAttributes(Attributes.asyncBoundary) + + /** + * Warm round-trip: 100 payloads through a fresh client+server pair. Handshake + * happens once per invocation and is amortized over 100 records. The sink + * cancels once it has received the expected number of bytes, which propagates + * back through both TLS sessions and shuts the stream down cleanly. + */ + @Benchmark + @OperationsPerInvocation(100) + def warmRoundTrip(): Unit = { + val records = 100 + val expected = payload.size * records + val client = makeBidi(Client, IgnoreComplete) + val server = makeBidi(Server, IgnoreComplete) + val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => SendBytes(b) } + + val done = Source(payloads) + .via(client.atop(server.reversed).join(echo)) + .collect { case SessionBytes(_, b) => b } + .scan(0)((acc, b) => acc + b.size) + .dropWhile(_ < expected) + .runWith(Sink.headOption) + + Await.result(done, 30.seconds) + } + + /** + * Cold handshake: each invocation builds a fresh client+server pair and + * completes the handshake by exchanging a single small payload. Use this + * to measure short-lived connection cost (e.g. one HTTPS request). + */ + @Benchmark + def coldHandshake(): Unit = { + val client = makeBidi(Client, IgnoreComplete) + val server = makeBidi(Server, IgnoreComplete) + val tiny = ByteString("ping") + val echo = Flow[SslTlsInbound].collect { case SessionBytes(_, b) => SendBytes(b) } + + val done = Source + .single[SslTlsOutbound](SendBytes(tiny)) + .via(client.atop(server.reversed).join(echo)) + .collect { case SessionBytes(_, b) => b } + .scan(ByteString.empty)(_ ++ _) + .dropWhile(_.size < tiny.size) + .runWith(Sink.headOption) + + Await.result(done, 30.seconds) + } +} + +object TlsBenchmark { + + val TLS12Ciphers: Set[String] = Set( + "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384") + + def initSslContext(protocol: String): SSLContext = { + val password = "changeme" + + val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) + keyStore.load(getClass.getResourceAsStream("/keystore"), password.toCharArray) + + val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) + trustStore.load(getClass.getResourceAsStream("/truststore"), password.toCharArray) + + val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + keyManagerFactory.init(keyStore, password.toCharArray) + + val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + trustManagerFactory.init(trustStore) + + val context = SSLContext.getInstance(protocol) + context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) + context + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
