This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch fix-nightly-failures in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 44c53d32a369ba9fcb9efcab3a951d9d2d91663f Author: He-Pin <[email protected]> AuthorDate: Sat Apr 18 23:30:50 2026 +0800 fix: scale stream test timeouts by timefactor to pass nightly on JDK 25 Motivation: Nightly CI (JDK 25, TIMEFACTOR=3) has been failing consistently for 30+ days due to ForkJoinPool scheduling changes in JDK 25 causing slower throughput and higher scheduler overhead. Four root causes were found: 1. HubSpec.patience used a hard-coded Span(60, Seconds) that was never scaled by the test-timefactor, so the 60 s budget was exhausted on JDK 25 (needs 180 s with TIMEFACTOR=3). 2. AggregateWithTimeBoundaryAndSimulatedTimeSpec used interval = 1.milli with ExplicitlyTriggeredScheduler, which fired up to 400 000 timer callbacks per test-run (timePasses(400.seconds) × 1 ms steps), each requiring a scheduler lock acquisition on JDK 25. 3. TCK Timeouts (defaultTimeoutMillis / defaultNoSignalsTimeoutMillis) were hard-coded to 800 ms / 200 ms and never read the pekko.test.timefactor JVM property, causing stochastic_spec103_mustSignalOnMethodsSequentially to fail on JDK 25. 4. FlowMapAsyncPartitionedSpec."ignore null-completed futures" built the shouldBeNull set from Random.nextInt(10), which produces values 0-9. Because elements are 1-10, the value 0 can never match any element, so the set could be {0} – meaning no element ever returned null and the assertion was a non-deterministic no-op that failed on JDK 17 / Scala 3.3.x in CI. Modification: - HubSpec: multiply the 60 s base by testKitSettings.TestTimeFactor so CI with TIMEFACTOR=3 gets 180 s and TIMEFACTOR=2 gets 120 s. - AggregateWithTimeBoundaryAndSimulatedTimeSpec: change interval from 1.milli to 1.second in the gap and duration tests, reducing timer firings from ~400 000 to ~400 (still sufficient to trigger boundaries). - TCK Timeouts: read pekko.test.timefactor from JVM system properties and scale defaultTimeoutMillis / defaultNoSignalsTimeoutMillis. - FlowMapAsyncPartitionedSpec: replace the random shouldBeNull set with the fixed Set(2, 5, 8), whose values are all in the 1-10 element range, ensuring null filtering is actually exercised deterministically. Result: All four previously-failing test categories should pass on the next nightly run across JDK 17/21/25 × Scala 2.13/3.3. Co-authored-by: Copilot <[email protected]> --- .../src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala | 8 ++++++-- .../pekko/stream/scaladsl/AggregateWithBoundarySpec.scala | 4 ++-- .../pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala | 11 ++++------- .../test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala | 7 +++++-- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala index 3cfdab51f9..c7d7bac6e7 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala @@ -18,10 +18,14 @@ package org.apache.pekko.stream.tck */ object Timeouts { + // Scale timeouts by pekko.test.timefactor (set to 3 on JDK 25 nightly builds). + private val timeFactor: Double = + sys.props.get("pekko.test.timefactor").map(_.toDouble).getOrElse(1.0) + def publisherShutdownTimeoutMillis: Int = 3000 - def defaultTimeoutMillis: Int = 800 + def defaultTimeoutMillis: Int = (800 * timeFactor).toInt - def defaultNoSignalsTimeoutMillis: Int = 200 + def defaultNoSignalsTimeoutMillis: Int = (200 * timeFactor).toInt } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala index f56c1fcabb..bc04ff384e 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala @@ -165,7 +165,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with maxGap = Some(maxGap), // elements with longer gap will put put to next aggregator maxDuration = None, currentTimeMs = schedulerTimeMs, - interval = 1.milli) + interval = 1.second) .buffer(1, OverflowStrategy.backpressure) .runWith(Sink.collection) @@ -209,7 +209,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with maxGap = None, maxDuration = Some(maxDuration), // elements with longer gap will put put to next aggregator currentTimeMs = schedulerTimeMs, - interval = 1.milli) + interval = 1.second) .buffer(1, OverflowStrategy.backpressure) .runWith(Sink.collection) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala index ece1d02847..35f7d4c0aa 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala @@ -385,13 +385,10 @@ class FlowMapAsyncPartitionedSpec extends StreamSpec with WithLogCapturing { } "ignore null-completed futures" in { - val shouldBeNull = { - val n = scala.util.Random.nextInt(10) + 1 - (1 to n).foldLeft(Set.empty[Int]) { (set, _) => - set + scala.util.Random.nextInt(10) - } - } - if (shouldBeNull.isEmpty) fail("should be at least one null") + // Use a fixed set whose values are in the range 1-10 so null filtering is actually exercised. + // A random set with values 0-9 could produce {0} which means no element in 1-10 returns null, + // making the test non-deterministic and effectively a no-op for null handling. + val shouldBeNull = Set(2, 5, 8) val f: (Int, Int) => Future[String] = { (elem, _) => if (shouldBeNull(elem)) Future.successful(null) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index 04df0b4e63..0d90f519f5 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -36,9 +36,12 @@ class HubSpec extends StreamSpec { implicit val ec: ExecutionContext = system.dispatcher // Long-stream tests (20K elements) need extra headroom on JDK 25+ - // where ForkJoinPool scheduling changes cause slower throughput (#2573) + // where ForkJoinPool scheduling changes cause slower throughput (#2573). + // Multiply by testKitSettings.TestTimeFactor so nightly CI (TIMEFACTOR=3) gets 180 s. override implicit val patience: PatienceConfig = - PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds)) + PatienceConfig( + timeout = Span((60 * testKitSettings.TestTimeFactor).toLong, Seconds), + interval = Span(1, Seconds)) "MergeHub" must { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
