This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new a36d126509 Scale MapAsyncPartitionedSpec patience with test timefactor 
(#2884)
a36d126509 is described below

commit a36d126509620614883c68bf91b7f1981c2a8745
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Wed Apr 22 17:43:42 2026 +0800

    Scale MapAsyncPartitionedSpec patience with test timefactor (#2884)
    
    * Scale MapAsyncPartitionedSpec patience with test timefactor
    
    JDK 25 nightly runs can take longer to complete the property-based ordered 
mapAsyncPartitioned checks. Use the configured test timefactor when deriving 
ScalaFutures patience so the suite tracks CI dilation instead of timing out at 
a fixed 60 seconds.
    
    
    * Stabilize ReliableDeliveryShardingSpec on JDK 25
    
    Motivation:
    JDK 25 nightly reproductions showed the RequestNext demand test could 
observe a legitimate redelivery before the buffered message was re-sent, making 
the assertion order-sensitive.
    
    Modification:
    Update ReliableDeliveryShardingSpec to fish for the expected buffered 
message and tolerate duplicate earlier deliveries during the renewed-demand 
race window.
    
    Result:
    The sharding delivery spec remains strict about eventually observing the 
buffered message while no longer flaking on JDK 25 scheduling differences.
---
 .../typed/delivery/ReliableDeliveryShardingSpec.scala   | 15 ++++++++++++++-
 .../apache/pekko/stream/MapAsyncPartitionedSpec.scala   | 17 +++++++++++------
 2 files changed, 25 insertions(+), 7 deletions(-)

diff --git 
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
 
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
index 4e6becb941..56a30704ee 100644
--- 
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
+++ 
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
@@ -19,6 +19,7 @@ import scala.concurrent.duration._
 
 import org.apache.pekko
 import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.FishingOutcomes
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@@ -306,7 +307,19 @@ class ReliableDeliveryShardingSpec
 
       // when new demand the buffered messages will be be sent
       seq5.producerController ! ProducerControllerImpl.Request(confirmedSeqNr 
= 5L, requestUpToSeqNr = 10, true, false)
-      val seq6 = shardingProbe.receiveMessage().message
+      val seq6 = shardingProbe
+        .fishForMessage(testKit.testKitSettings.dilated(3.seconds), "waiting 
for buffered msg-6 after renewed demand") {
+          case ShardingEnvelope("entity-1", SequencedMessage(_, _, 
TestConsumer.Job("msg-6"), _, _)) =>
+            FishingOutcomes.complete
+          case ShardingEnvelope("entity-1", SequencedMessage(_, _, 
TestConsumer.Job(_), _, _)) =>
+            // A redelivery can race with the renewed demand, so tolerate 
buffered msg-6 arriving
+            // after duplicate earlier messages.
+            FishingOutcomes.continueAndIgnore
+          case other =>
+            FishingOutcomes.fail(s"Unexpected message while waiting for 
buffered msg-6: [$other]")
+        }
+        .last
+        .message
       seq6.message should ===(TestConsumer.Job("msg-6"))
 
       val next9 = producerProbe.receiveMessage()
diff --git 
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
 
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
index 3a84b3c47d..0b8efdda0c 100644
--- 
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
+++ 
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
@@ -44,6 +44,7 @@ import pekko.stream.scaladsl.{
   Zip
 }
 import pekko.stream.testkit.scaladsl.TestSink
+import pekko.testkit.TestKitExtension
 
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.ScalaFutures
@@ -107,18 +108,22 @@ class MapAsyncPartitionedSpec
 
   import MapAsyncPartitionedSpec.TestData._
 
-  // These suites materialize many short-lived streams. On busy CI nodes,
-  // JDK 25 makes the 1000-sample property checks noticeably more expensive 
(#2573).
-  override implicit def patienceConfig: PatienceConfig = PatienceConfig(
-    timeout = 60.seconds,
-    interval = 100.millis)
-
   private val heavyPropertyChecks = minSuccessful(100)
 
   private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, 
"test-system")
   private val executor: ExecutorService = Executors.newCachedThreadPool()
   private implicit val ec: ExecutionContext = 
ExecutionContext.fromExecutor(executor)
 
+  // These suites materialize many short-lived streams. On busy CI nodes,
+  // JDK 25 makes the property checks noticeably more expensive, so bind
+  // ScalaFutures patience to the test timefactor used by CI.
+  private val testTimeFactor = 
TestKitExtension(system.classicSystem).TestTimeFactor
+  private def dilated(duration: FiniteDuration): FiniteDuration =
+    FiniteDuration((duration.toNanos * testTimeFactor + 0.5).toLong, 
java.util.concurrent.TimeUnit.NANOSECONDS)
+  override implicit def patienceConfig: PatienceConfig = PatienceConfig(
+    timeout = dilated(60.seconds),
+    interval = dilated(100.millis))
+
   override protected def afterAll(): Unit = {
     system.terminate()
     system.whenTerminated.futureValue


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to