This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new a36d126509 Scale MapAsyncPartitionedSpec patience with test timefactor
(#2884)
a36d126509 is described below
commit a36d126509620614883c68bf91b7f1981c2a8745
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Wed Apr 22 17:43:42 2026 +0800
Scale MapAsyncPartitionedSpec patience with test timefactor (#2884)
* Scale MapAsyncPartitionedSpec patience with test timefactor
JDK 25 nightly runs can take longer to complete the property-based ordered
mapAsyncPartitioned checks. Use the configured test timefactor when deriving
ScalaFutures patience so the suite tracks CI dilation instead of timing out at
a fixed 60 seconds.
* Stabilize ReliableDeliveryShardingSpec on JDK 25
Motivation:
JDK 25 nightly reproductions showed the RequestNext demand test could
observe a legitimate redelivery before the buffered message was re-sent, making
the assertion order-sensitive.
Modification:
Update ReliableDeliveryShardingSpec to fish for the expected buffered
message and tolerate duplicate earlier deliveries during the renewed-demand
race window.
Result:
The sharding delivery spec remains strict about eventually observing the
buffered message while no longer flaking on JDK 25 scheduling differences.
---
.../typed/delivery/ReliableDeliveryShardingSpec.scala | 15 ++++++++++++++-
.../apache/pekko/stream/MapAsyncPartitionedSpec.scala | 17 +++++++++++------
2 files changed, 25 insertions(+), 7 deletions(-)
diff --git
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
index 4e6becb941..56a30704ee 100644
---
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
+++
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
@@ -19,6 +19,7 @@ import scala.concurrent.duration._
import org.apache.pekko
import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.FishingOutcomes
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@@ -306,7 +307,19 @@ class ReliableDeliveryShardingSpec
// when new demand the buffered messages will be be sent
seq5.producerController ! ProducerControllerImpl.Request(confirmedSeqNr
= 5L, requestUpToSeqNr = 10, true, false)
- val seq6 = shardingProbe.receiveMessage().message
+ val seq6 = shardingProbe
+ .fishForMessage(testKit.testKitSettings.dilated(3.seconds), "waiting
for buffered msg-6 after renewed demand") {
+ case ShardingEnvelope("entity-1", SequencedMessage(_, _,
TestConsumer.Job("msg-6"), _, _)) =>
+ FishingOutcomes.complete
+ case ShardingEnvelope("entity-1", SequencedMessage(_, _,
TestConsumer.Job(_), _, _)) =>
+ // A redelivery can race with the renewed demand, so tolerate
buffered msg-6 arriving
+ // after duplicate earlier messages.
+ FishingOutcomes.continueAndIgnore
+ case other =>
+ FishingOutcomes.fail(s"Unexpected message while waiting for
buffered msg-6: [$other]")
+ }
+ .last
+ .message
seq6.message should ===(TestConsumer.Job("msg-6"))
val next9 = producerProbe.receiveMessage()
diff --git
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
index 3a84b3c47d..0b8efdda0c 100644
---
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
+++
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
@@ -44,6 +44,7 @@ import pekko.stream.scaladsl.{
Zip
}
import pekko.stream.testkit.scaladsl.TestSink
+import pekko.testkit.TestKitExtension
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
@@ -107,18 +108,22 @@ class MapAsyncPartitionedSpec
import MapAsyncPartitionedSpec.TestData._
- // These suites materialize many short-lived streams. On busy CI nodes,
- // JDK 25 makes the 1000-sample property checks noticeably more expensive
(#2573).
- override implicit def patienceConfig: PatienceConfig = PatienceConfig(
- timeout = 60.seconds,
- interval = 100.millis)
-
private val heavyPropertyChecks = minSuccessful(100)
private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty,
"test-system")
private val executor: ExecutorService = Executors.newCachedThreadPool()
private implicit val ec: ExecutionContext =
ExecutionContext.fromExecutor(executor)
+ // These suites materialize many short-lived streams. On busy CI nodes,
+ // JDK 25 makes the property checks noticeably more expensive, so bind
+ // ScalaFutures patience to the test timefactor used by CI.
+ private val testTimeFactor =
TestKitExtension(system.classicSystem).TestTimeFactor
+ private def dilated(duration: FiniteDuration): FiniteDuration =
+ FiniteDuration((duration.toNanos * testTimeFactor + 0.5).toLong,
java.util.concurrent.TimeUnit.NANOSECONDS)
+ override implicit def patienceConfig: PatienceConfig = PatienceConfig(
+ timeout = dilated(60.seconds),
+ interval = dilated(100.millis))
+
override protected def afterAll(): Unit = {
system.terminate()
system.whenTerminated.futureValue
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]