This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new c2c5557258 test: dilate timeouts in FlowPrefixAndTailSpec and 
FlowFlatMapConcatParallelismSpec (#2902)
c2c5557258 is described below

commit c2c5557258540157a7f9d06a3dc19a67b7097411
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Apr 25 21:42:23 2026 +0800

    test: dilate timeouts in FlowPrefixAndTailSpec and 
FlowFlatMapConcatParallelismSpec (#2902)
    
    Both specs hit hardcoded patience that was tight on JDK 25 + busy CI:
    - FlowPrefixAndTailSpec used `Await.result(_, 3.seconds)` in 14 places
    - FlowFlatMapConcatParallelismSpec used `PatienceConfig(timeout = 
60.seconds)`
    
    Wrap with TestKit dilation so the budget tracks `pekko.test.timefactor`
    the same way HubSpec and MapAsyncPartitionedSpec already do. No
    operator-side changes; PrefixAndTail and FlattenConcat are unmodified.
---
 .../FlowFlatMapConcatParallelismSpec.scala         |  7 ++++--
 .../stream/scaladsl/FlowPrefixAndTailSpec.scala    | 29 +++++++++++-----------
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
index df7a7ddb14..483838aece 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
@@ -40,9 +40,12 @@ class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
   """) with ScriptedTest with FutureTimeoutSupport {
 
   // 100K-element tests need extra headroom, especially on JDK 25+ where
-  // ForkJoinPool scheduling changes slow down highly-parallel workloads 
(#2573)
+  // ForkJoinPool scheduling changes slow down highly-parallel workloads 
(#2573).
+  // Dilation makes the timeout track CI's pekko.test.timefactor.
   override implicit val patience: PatienceConfig =
-    PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))
+    PatienceConfig(
+      timeout = Span((60 * testKitSettings.TestTimeFactor).toLong, Seconds),
+      interval = Span(1, Seconds))
 
   val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)
 
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala
index c148cb6123..1a6cf80852 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala
@@ -22,6 +22,7 @@ import scala.util.control.NoStackTrace
 import org.apache.pekko
 import pekko.stream._
 import pekko.stream.testkit._
+import pekko.testkit.TestDuration
 
 class FlowPrefixAndTailSpec extends StreamSpec("""
     pekko.stream.materializer.initial-input-buffer-size = 2
@@ -37,7 +38,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
     "work on empty input" in {
       val futureSink = newHeadSink
       val fut = Source.empty.prefixAndTail(10).runWith(futureSink)
-      val (prefix, tailFlow) = Await.result(fut, 3.seconds)
+      val (prefix, tailFlow) = Await.result(fut, 3.seconds.dilated)
       prefix should be(Nil)
       val tailSubscriber = TestSubscriber.manualProbe[Int]()
       tailFlow.to(Sink.fromSubscriber(tailSubscriber)).run()
@@ -47,7 +48,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
     "work on short input" in {
       val futureSink = newHeadSink
       val fut = Source(List(1, 2, 3)).prefixAndTail(10).runWith(futureSink)
-      val (prefix, tailFlow) = Await.result(fut, 3.seconds)
+      val (prefix, tailFlow) = Await.result(fut, 3.seconds.dilated)
       prefix should be(List(1, 2, 3))
       val tailSubscriber = TestSubscriber.manualProbe[Int]()
       tailFlow.to(Sink.fromSubscriber(tailSubscriber)).run()
@@ -57,40 +58,40 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
     "work on longer inputs" in {
       val futureSink = newHeadSink
       val fut = Source(1 to 10).prefixAndTail(5).runWith(futureSink)
-      val (takes, tail) = Await.result(fut, 3.seconds)
+      val (takes, tail) = Await.result(fut, 3.seconds.dilated)
       takes should be(1 to 5)
 
       val futureSink2 = Sink.head[immutable.Seq[Int]]
       val fut2 = tail.grouped(6).runWith(futureSink2)
-      Await.result(fut2, 3.seconds) should be(6 to 10)
+      Await.result(fut2, 3.seconds.dilated) should be(6 to 10)
     }
 
     "handle zero take count" in {
       val futureSink = newHeadSink
       val fut = Source(1 to 10).prefixAndTail(0).runWith(futureSink)
-      val (takes, tail) = Await.result(fut, 3.seconds)
+      val (takes, tail) = Await.result(fut, 3.seconds.dilated)
       takes should be(Nil)
 
       val futureSink2 = Sink.head[immutable.Seq[Int]]
       val fut2 = tail.grouped(11).runWith(futureSink2)
-      Await.result(fut2, 3.seconds) should be(1 to 10)
+      Await.result(fut2, 3.seconds.dilated) should be(1 to 10)
     }
 
     "handle negative take count" in {
       val futureSink = newHeadSink
       val fut = Source(1 to 10).prefixAndTail(-1).runWith(futureSink)
-      val (takes, tail) = Await.result(fut, 3.seconds)
+      val (takes, tail) = Await.result(fut, 3.seconds.dilated)
       takes should be(Nil)
 
       val futureSink2 = Sink.head[immutable.Seq[Int]]
       val fut2 = tail.grouped(11).runWith(futureSink2)
-      Await.result(fut2, 3.seconds) should be(1 to 10)
+      Await.result(fut2, 3.seconds.dilated) should be(1 to 10)
     }
 
     "work if size of take is equal to stream size" in {
       val futureSink = newHeadSink
       val fut = Source(1 to 10).prefixAndTail(10).runWith(futureSink)
-      val (takes, tail) = Await.result(fut, 3.seconds)
+      val (takes, tail) = Await.result(fut, 3.seconds.dilated)
       takes should be(1 to 10)
 
       val subscriber = TestSubscriber.manualProbe[Int]()
@@ -101,7 +102,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
     "throw if tail is attempted to be materialized twice" in {
       val futureSink = newHeadSink
       val fut = Source(1 to 3).prefixAndTail(1).runWith(futureSink)
-      val (prefix, tail) = Await.result(fut, 3.seconds)
+      val (prefix, tail) = Await.result(fut, 3.seconds.dilated)
       prefix should be(Seq(1))
 
       val subscriber1 = TestSubscriber.probe[Int]()
@@ -132,7 +133,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
 
       val futureSink = newHeadSink
       val fut = Source(1 to 
2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer)
-      val (takes, tail) = Await.result(fut, 3.seconds)
+      val (takes, tail) = Await.result(fut, 3.seconds.dilated)
       takes should be(Seq(1))
 
       val subscriber = TestSubscriber.probe[Int]()
@@ -151,7 +152,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
 
       val futureSink = newHeadSink
       val fut = Source(1 to 
2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer)
-      val (takes, tail) = Await.result(fut, 3.seconds)
+      val (takes, tail) = Await.result(fut, 3.seconds.dilated)
       takes should be(Seq(1))
 
       val subscriber = TestSubscriber.probe[Int]()
@@ -165,7 +166,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
     "shut down main stage if substream is empty, even when not subscribed" in {
       val futureSink = newHeadSink
       val fut = Source.single(1).prefixAndTail(1).runWith(futureSink)
-      val (takes, _) = Await.result(fut, 3.seconds)
+      val (takes, _) = Await.result(fut, 3.seconds.dilated)
       takes should be(Seq(1))
     }
 
@@ -279,7 +280,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
       val s = pub.expectSubscription()
       s.sendNext(0)
 
-      val (_, tail) = Await.result(f, 3.seconds)
+      val (_, tail) = Await.result(f, 3.seconds.dilated)
 
       val tailPub = tail.runWith(Sink.asPublisher(false))
       s.sendComplete()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to