This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch fix-nightly-failures
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 44c53d32a369ba9fcb9efcab3a951d9d2d91663f
Author: He-Pin <[email protected]>
AuthorDate: Sat Apr 18 23:30:50 2026 +0800

    fix: scale stream test timeouts by timefactor to pass nightly on JDK 25
    
    Motivation:
    Nightly CI (JDK 25, TIMEFACTOR=3) has been failing consistently for 30+
    days due to ForkJoinPool scheduling changes in JDK 25 causing slower
    throughput and higher scheduler overhead.  Four root causes were found:
    
    1. HubSpec.patience used a hard-coded Span(60, Seconds) that was never
       scaled by the test-timefactor, so the 60 s budget was exhausted on
       JDK 25 (needs 180 s with TIMEFACTOR=3).
    
    2. AggregateWithTimeBoundaryAndSimulatedTimeSpec used interval = 1.milli
       with ExplicitlyTriggeredScheduler, which fired up to 400 000 timer
       callbacks per test-run (timePasses(400.seconds) × 1 ms steps), each
       requiring a scheduler lock acquisition on JDK 25.
    
    3. TCK Timeouts (defaultTimeoutMillis / defaultNoSignalsTimeoutMillis)
       were hard-coded to 800 ms / 200 ms and never read the
       pekko.test.timefactor JVM property, causing
       stochastic_spec103_mustSignalOnMethodsSequentially to fail on JDK 25.
    
    4. FlowMapAsyncPartitionedSpec."ignore null-completed futures" built the
       shouldBeNull set from Random.nextInt(10), which produces values 0-9.
       Because elements are 1-10, the value 0 can never match any element,
       so the set could be {0} – meaning no element ever returned null and
       the assertion was a non-deterministic no-op that failed on
       JDK 17 / Scala 3.3.x in CI.
    
    Modification:
    - HubSpec: multiply the 60 s base by testKitSettings.TestTimeFactor so
      CI with TIMEFACTOR=3 gets 180 s and TIMEFACTOR=2 gets 120 s.
    - AggregateWithTimeBoundaryAndSimulatedTimeSpec: change interval from
      1.milli to 1.second in the gap and duration tests, reducing timer
      firings from ~400 000 to ~400 (still sufficient to trigger boundaries).
    - TCK Timeouts: read pekko.test.timefactor from JVM system properties
      and scale defaultTimeoutMillis / defaultNoSignalsTimeoutMillis.
    - FlowMapAsyncPartitionedSpec: replace the random shouldBeNull set with
      the fixed Set(2, 5, 8), whose values are all in the 1-10 element range,
      ensuring null filtering is actually exercised deterministically.
    
    Result:
    All four previously-failing test categories should pass on the next
    nightly run across JDK 17/21/25 × Scala 2.13/3.3.
    
    Co-authored-by: Copilot <[email protected]>
---
 .../src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala |  8 ++++++--
 .../pekko/stream/scaladsl/AggregateWithBoundarySpec.scala     |  4 ++--
 .../pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala   | 11 ++++-------
 .../test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala |  7 +++++--
 4 files changed, 17 insertions(+), 13 deletions(-)

diff --git 
a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala 
b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala
index 3cfdab51f9..c7d7bac6e7 100644
--- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala
+++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/Timeouts.scala
@@ -18,10 +18,14 @@ package org.apache.pekko.stream.tck
  */
 object Timeouts {
 
+  // Scale timeouts by pekko.test.timefactor (set to 3 on JDK 25 nightly 
builds).
+  private val timeFactor: Double =
+    sys.props.get("pekko.test.timefactor").map(_.toDouble).getOrElse(1.0)
+
   def publisherShutdownTimeoutMillis: Int = 3000
 
-  def defaultTimeoutMillis: Int = 800
+  def defaultTimeoutMillis: Int = (800 * timeFactor).toInt
 
-  def defaultNoSignalsTimeoutMillis: Int = 200
+  def defaultNoSignalsTimeoutMillis: Int = (200 * timeFactor).toInt
 
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
index f56c1fcabb..bc04ff384e 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
@@ -165,7 +165,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends 
AnyWordSpecLike with
         maxGap = Some(maxGap), // elements with longer gap will put put to 
next aggregator
         maxDuration = None,
         currentTimeMs = schedulerTimeMs,
-        interval = 1.milli)
+        interval = 1.second)
       .buffer(1, OverflowStrategy.backpressure)
       .runWith(Sink.collection)
 
@@ -209,7 +209,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends 
AnyWordSpecLike with
         maxGap = None,
         maxDuration = Some(maxDuration), // elements with longer gap will put 
put to next aggregator
         currentTimeMs = schedulerTimeMs,
-        interval = 1.milli)
+        interval = 1.second)
       .buffer(1, OverflowStrategy.backpressure)
       .runWith(Sink.collection)
 
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
index ece1d02847..35f7d4c0aa 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
@@ -385,13 +385,10 @@ class FlowMapAsyncPartitionedSpec extends StreamSpec with 
WithLogCapturing {
     }
 
     "ignore null-completed futures" in {
-      val shouldBeNull = {
-        val n = scala.util.Random.nextInt(10) + 1
-        (1 to n).foldLeft(Set.empty[Int]) { (set, _) =>
-          set + scala.util.Random.nextInt(10)
-        }
-      }
-      if (shouldBeNull.isEmpty) fail("should be at least one null")
+      // Use a fixed set whose values are in the range 1-10 so null filtering 
is actually exercised.
+      // A random set with values 0-9 could produce {0} which means no element 
in 1-10 returns null,
+      // making the test non-deterministic and effectively a no-op for null 
handling.
+      val shouldBeNull = Set(2, 5, 8)
 
       val f: (Int, Int) => Future[String] = { (elem, _) =>
         if (shouldBeNull(elem)) Future.successful(null)
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index 04df0b4e63..0d90f519f5 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -36,9 +36,12 @@ class HubSpec extends StreamSpec {
   implicit val ec: ExecutionContext = system.dispatcher
 
   // Long-stream tests (20K elements) need extra headroom on JDK 25+
-  // where ForkJoinPool scheduling changes cause slower throughput (#2573)
+  // where ForkJoinPool scheduling changes cause slower throughput (#2573).
+  // Multiply by testKitSettings.TestTimeFactor so nightly CI (TIMEFACTOR=3) 
gets 180 s.
   override implicit val patience: PatienceConfig =
-    PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))
+    PatienceConfig(
+      timeout = Span((60 * testKitSettings.TestTimeFactor).toLong, Seconds),
+      interval = Span(1, Seconds))
 
   "MergeHub" must {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to