This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new c2c5557258 test: dilate timeouts in FlowPrefixAndTailSpec and
FlowFlatMapConcatParallelismSpec (#2902)
c2c5557258 is described below
commit c2c5557258540157a7f9d06a3dc19a67b7097411
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Apr 25 21:42:23 2026 +0800
test: dilate timeouts in FlowPrefixAndTailSpec and
FlowFlatMapConcatParallelismSpec (#2902)
Both specs hit hardcoded patience that was tight on JDK 25 + busy CI:
- FlowPrefixAndTailSpec used `Await.result(_, 3.seconds)` in 14 places
- FlowFlatMapConcatParallelismSpec used `PatienceConfig(timeout =
60.seconds)`
Wrap with TestKit dilation so the budget tracks `pekko.test.timefactor`
the same way HubSpec and MapAsyncPartitionedSpec already do. No
operator-side changes; PrefixAndTail and FlattenConcat are unmodified.
---
.../FlowFlatMapConcatParallelismSpec.scala | 7 ++++--
.../stream/scaladsl/FlowPrefixAndTailSpec.scala | 29 +++++++++++-----------
2 files changed, 20 insertions(+), 16 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
index df7a7ddb14..483838aece 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
@@ -40,9 +40,12 @@ class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
""") with ScriptedTest with FutureTimeoutSupport {
// 100K-element tests need extra headroom, especially on JDK 25+ where
- // ForkJoinPool scheduling changes slow down highly-parallel workloads
(#2573)
+ // ForkJoinPool scheduling changes slow down highly-parallel workloads
(#2573).
+ // Dilation makes the timeout track CI's pekko.test.timefactor.
override implicit val patience: PatienceConfig =
- PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))
+ PatienceConfig(
+ timeout = Span((60 * testKitSettings.TestTimeFactor).toLong, Seconds),
+ interval = Span(1, Seconds))
val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala
index c148cb6123..1a6cf80852 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala
@@ -22,6 +22,7 @@ import scala.util.control.NoStackTrace
import org.apache.pekko
import pekko.stream._
import pekko.stream.testkit._
+import pekko.testkit.TestDuration
class FlowPrefixAndTailSpec extends StreamSpec("""
pekko.stream.materializer.initial-input-buffer-size = 2
@@ -37,7 +38,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
"work on empty input" in {
val futureSink = newHeadSink
val fut = Source.empty.prefixAndTail(10).runWith(futureSink)
- val (prefix, tailFlow) = Await.result(fut, 3.seconds)
+ val (prefix, tailFlow) = Await.result(fut, 3.seconds.dilated)
prefix should be(Nil)
val tailSubscriber = TestSubscriber.manualProbe[Int]()
tailFlow.to(Sink.fromSubscriber(tailSubscriber)).run()
@@ -47,7 +48,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
"work on short input" in {
val futureSink = newHeadSink
val fut = Source(List(1, 2, 3)).prefixAndTail(10).runWith(futureSink)
- val (prefix, tailFlow) = Await.result(fut, 3.seconds)
+ val (prefix, tailFlow) = Await.result(fut, 3.seconds.dilated)
prefix should be(List(1, 2, 3))
val tailSubscriber = TestSubscriber.manualProbe[Int]()
tailFlow.to(Sink.fromSubscriber(tailSubscriber)).run()
@@ -57,40 +58,40 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
"work on longer inputs" in {
val futureSink = newHeadSink
val fut = Source(1 to 10).prefixAndTail(5).runWith(futureSink)
- val (takes, tail) = Await.result(fut, 3.seconds)
+ val (takes, tail) = Await.result(fut, 3.seconds.dilated)
takes should be(1 to 5)
val futureSink2 = Sink.head[immutable.Seq[Int]]
val fut2 = tail.grouped(6).runWith(futureSink2)
- Await.result(fut2, 3.seconds) should be(6 to 10)
+ Await.result(fut2, 3.seconds.dilated) should be(6 to 10)
}
"handle zero take count" in {
val futureSink = newHeadSink
val fut = Source(1 to 10).prefixAndTail(0).runWith(futureSink)
- val (takes, tail) = Await.result(fut, 3.seconds)
+ val (takes, tail) = Await.result(fut, 3.seconds.dilated)
takes should be(Nil)
val futureSink2 = Sink.head[immutable.Seq[Int]]
val fut2 = tail.grouped(11).runWith(futureSink2)
- Await.result(fut2, 3.seconds) should be(1 to 10)
+ Await.result(fut2, 3.seconds.dilated) should be(1 to 10)
}
"handle negative take count" in {
val futureSink = newHeadSink
val fut = Source(1 to 10).prefixAndTail(-1).runWith(futureSink)
- val (takes, tail) = Await.result(fut, 3.seconds)
+ val (takes, tail) = Await.result(fut, 3.seconds.dilated)
takes should be(Nil)
val futureSink2 = Sink.head[immutable.Seq[Int]]
val fut2 = tail.grouped(11).runWith(futureSink2)
- Await.result(fut2, 3.seconds) should be(1 to 10)
+ Await.result(fut2, 3.seconds.dilated) should be(1 to 10)
}
"work if size of take is equal to stream size" in {
val futureSink = newHeadSink
val fut = Source(1 to 10).prefixAndTail(10).runWith(futureSink)
- val (takes, tail) = Await.result(fut, 3.seconds)
+ val (takes, tail) = Await.result(fut, 3.seconds.dilated)
takes should be(1 to 10)
val subscriber = TestSubscriber.manualProbe[Int]()
@@ -101,7 +102,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
"throw if tail is attempted to be materialized twice" in {
val futureSink = newHeadSink
val fut = Source(1 to 3).prefixAndTail(1).runWith(futureSink)
- val (prefix, tail) = Await.result(fut, 3.seconds)
+ val (prefix, tail) = Await.result(fut, 3.seconds.dilated)
prefix should be(Seq(1))
val subscriber1 = TestSubscriber.probe[Int]()
@@ -132,7 +133,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
val futureSink = newHeadSink
val fut = Source(1 to
2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer)
- val (takes, tail) = Await.result(fut, 3.seconds)
+ val (takes, tail) = Await.result(fut, 3.seconds.dilated)
takes should be(Seq(1))
val subscriber = TestSubscriber.probe[Int]()
@@ -151,7 +152,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
val futureSink = newHeadSink
val fut = Source(1 to
2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer)
- val (takes, tail) = Await.result(fut, 3.seconds)
+ val (takes, tail) = Await.result(fut, 3.seconds.dilated)
takes should be(Seq(1))
val subscriber = TestSubscriber.probe[Int]()
@@ -165,7 +166,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
"shut down main stage if substream is empty, even when not subscribed" in {
val futureSink = newHeadSink
val fut = Source.single(1).prefixAndTail(1).runWith(futureSink)
- val (takes, _) = Await.result(fut, 3.seconds)
+ val (takes, _) = Await.result(fut, 3.seconds.dilated)
takes should be(Seq(1))
}
@@ -279,7 +280,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
val s = pub.expectSubscription()
s.sendNext(0)
- val (_, tail) = Await.result(f, 3.seconds)
+ val (_, tail) = Await.result(f, 3.seconds.dilated)
val tailPub = tail.runWith(Sink.asPublisher(false))
s.sendComplete()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]