This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/pekko-persistence-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new cb97e5b flaky: make EventsByTagPersistenceIdCleanupSpec more robust
(#408)
cb97e5b is described below
commit cb97e5bd694e13d4ea3f872122340c862a7b2a69
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 28 11:58:57 2026 +0100
flaky: make EventsByTagPersistenceIdCleanupSpec more robust (#408)
* fix: make EventsByTagPersistenceIdCleanupSpec more robust against timing
flakiness
* fix: correct withProbe call to use single parameter list syntax
* scalafmt
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
---
.../cassandra/query/EventsByTagSpec.scala | 31 +++++++++++++---------
1 file changed, 18 insertions(+), 13 deletions(-)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
index 396578f..47d86f9 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
@@ -1371,26 +1371,31 @@ class EventsByTagPersistenceIdCleanupSpec extends
AbstractEventsByTagSpec(Events
"PersistenceId cleanup" must {
"drop state and trigger new persistence id lookup periodically" in {
// https://github.com/apache/pekko-persistence-cassandra/issues/33
+ // Use a unique pid per test run to prevent events written in one
attempt from
+ // polluting subsequent attempts when eventually retries.
+ val pid = s"cleanup-${java.util.UUID.randomUUID()}"
eventually(timeout(20.seconds), interval(3.second)) {
val t1: LocalDateTime = LocalDateTime.now(ZoneOffset.UTC).minusDays(2)
- val event1 = PersistentRepr(s"cleanup-1", 1, "cleanup")
+ val event1 = PersistentRepr(s"cleanup-1", 1, pid)
writeTaggedEvent(t1, event1, Set("cleanup-tag"), 1, bucketSize)
val query =
queries.eventsByTag("cleanup-tag",
TimeBasedUUID(Uuids.startOf(t1.toInstant(ZoneOffset.UTC).toEpochMilli - 1L)))
- val probe = query.runWith(TestSink[Any]())
- probe.request(10)
- probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 1L,
"cleanup-1") => e }
- probe.expectNoMessage(cleanupPeriod + 250.millis)
-
- // the metadata for pid cleanup should have been removed meaning the
next event will be delayed
- val event2 = PersistentRepr(s"cleanup-2", 2, "cleanup")
- writeTaggedEvent(event2, Set("cleanup-tag"), 2, bucketSize)
-
- // we don't now when exactly the next persistence id scan will be
- probe.expectNoMessage(newPersistenceIdScan - 100.millis)
- probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 2L,
"cleanup-2") => e }
+ withProbe(query.runWith(TestSink[Any]()),
+ { probe =>
+ probe.request(10)
+ probe.expectNextPF { case e @ EventEnvelope(_, `pid`, 1L,
"cleanup-1") => e }
+ probe.expectNoMessage(cleanupPeriod + 250.millis)
+
+ // the metadata for pid should have been removed meaning the next
event will be delayed
+ val event2 = PersistentRepr(s"cleanup-2", 2, pid)
+ writeTaggedEvent(event2, Set("cleanup-tag"), 2, bucketSize)
+
+ // we don't know exactly when the next persistence id scan will
be, so avoid
+ // asserting on the delay itself; just verify that event2 is
eventually delivered
+ probe.expectNextPF { case e @ EventEnvelope(_, `pid`, 2L,
"cleanup-2") => e }
+ })
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]