This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 044ff9a Add EventsByPersistenceIdGapSpec tests for gap handling (#398)
044ff9a is described below
commit 044ff9a494e8b0349a4d5c44e2dcf075458b8ec1
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Jun 10 20:07:16 2026 +0100
Add EventsByPersistenceIdGapSpec tests for gap handling (#398)
* Add EventsByPersistenceIdGapSpec tests for PostgreSQL gap handling
* try to fix mysql test
* Refine comments in EventsByPersistenceIdGapSpec
Removed redundant comment about PostgreSQL numbered parameter syntax in
hardDeleteSeqNrs method.
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
---
.../r2dbc/query/EventsByPersistenceIdGapSpec.scala | 252 +++++++++++++++++++++
1 file changed, 252 insertions(+)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdGapSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdGapSpec.scala
new file mode 100644
index 0000000..31ef689
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdGapSpec.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.query
+
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.r2dbc.TestActors.Persister
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.cleanup.scaladsl.EventSourcedCleanup
+import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+import pekko.stream.scaladsl.Sink
+import pekko.stream.testkit.scaladsl.TestSink
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+/**
+ * Tests covering sequence-number gap handling in eventsByPersistenceId and
+ * currentEventsByPersistenceId queries.
+ *
+ * Gaps arise when journal rows are hard-deleted (physical removal) or
+ * soft-deleted (the `deleted` flag is set to `true`, e.g. via the
delete-marker
+ * left by deleteEventsTo / deleteMessagesTo). Both cases must be transparent
to
+ * the consumer: every non-deleted event in the requested range must be emitted
+ * exactly once, in order.
+ *
+ * The test config uses `buffer-size = 1` to force the ContinuousQuery
pagination
+ * to issue multiple database round trips and so exercise the gap-crossing
logic.
+ *
+ * Inspired by pekko-persistence-jdbc#517
+ * (MessagesWithBatchDatabaseContractTest).
+ */
+object EventsByPersistenceIdGapSpec {
+ // A small buffer-size forces multiple round trips through the
ContinuousQuery
+ // pagination, making it possible to observe gaps that span a batch boundary.
+ val config = ConfigFactory
+ .parseString("pekko.persistence.r2dbc.buffer-size = 1")
+ .withFallback(TestConfig.config)
+
+ val dialect: String = config.getString("pekko.persistence.r2dbc.dialect")
+}
+
+class EventsByPersistenceIdGapSpec
+ extends ScalaTestWithActorTestKit(EventsByPersistenceIdGapSpec.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with LogCapturing {
+
+ override def typedSystem: ActorSystem[_] = system
+
+ private val query =
+
PersistenceQuery(system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
+
+ //
---------------------------------------------------------------------------
+ // Helpers
+ //
---------------------------------------------------------------------------
+
+ /** Persist `count` events for `pid` via a Persister actor and return seqNrs
1..count. */
+ private def persistEvents(pid: String, count: Int): Unit = {
+ val probe = createTestProbe[Done]()
+ val persister = spawn(Persister(pid))
+ (1 to count).foreach { i =>
+ persister ! Persister.PersistWithAck(s"e-$i", probe.ref)
+ probe.expectMessage(10.seconds, Done)
+ }
+ testKit.stop(persister)
+ }
+
+ /**
+ * Hard-delete specific rows from the journal table by directly issuing a
+ * DELETE statement. This simulates the scenario from pekko-persistence-jdbc
+ * issue #516 where messages are removed without leaving a delete marker.
+ */
+ private def hardDeleteSeqNrs(pid: String, seqNrs: Long*): Unit = {
+ val table = journalSettings.journalTableWithSchema
+ val sql = if (EventsByPersistenceIdGapSpec.dialect == "mysql") {
+ // MySQL R2DBC uses ? as parameter markers, so we can use the same SQL
for
+ // any number of seqNrs.
+ val inList = seqNrs.map(_ => "?").mkString(", ")
+ s"DELETE FROM $table WHERE persistence_id = ? AND seq_nr IN ($inList)"
+ } else {
+ // PostgreSQL R2DBC uses numbered parameters: $1, $2, $3, … so we need to
+ // generate the SQL with the correct number of parameters.
+ val inList = seqNrs.zipWithIndex.map { case (_, i) => s"$$${i + 2}"
}.mkString(", ")
+ s"DELETE FROM $table WHERE persistence_id = $$1 AND seq_nr IN ($inList)"
+ }
+ r2dbcExecutor
+ .updateOne("hard-delete seqNrs") { connection =>
+ val stmt = connection.createStatement(sql).bind(0, pid)
+ seqNrs.zipWithIndex.foreach { case (seqNr, idx) => stmt.bind(idx + 1,
seqNr) }
+ stmt
+ }
+ .futureValue
+ ()
+ }
+
+ /**
+ * Collect the sequence numbers emitted by `currentEventsByPersistenceId` for
+ * the given pid and range, waiting for the stream to complete.
+ */
+ private def currentSeqNrs(pid: String, from: Long = 1L, to: Long =
Long.MaxValue): Seq[Long] =
+ query
+ .currentEventsByPersistenceId(pid, from, to)
+ .map(_.sequenceNr)
+ .runWith(Sink.seq)
+ .futureValue
+
+ //
---------------------------------------------------------------------------
+ // currentEventsByPersistenceId
+ //
---------------------------------------------------------------------------
+
+ "currentEventsByPersistenceId" should {
+
+ "emit all events when a hard-deleted gap is wider than the buffer size" in
{
+ val pid = nextPid()
+ persistEvents(pid, 4)
+ hardDeleteSeqNrs(pid, 2L, 3L)
+
+ val seqNrs = currentSeqNrs(pid, to = 4)
+ seqNrs shouldBe Seq(1L, 4L)
+ }
+
+ "emit all events across multiple hard-deleted gaps" in {
+ val pid = nextPid()
+ persistEvents(pid, 8)
+ // Two separate gaps: [2,3] and [6,7]
+ hardDeleteSeqNrs(pid, 2L, 3L, 6L, 7L)
+
+ val seqNrs = currentSeqNrs(pid, to = 8)
+ seqNrs shouldBe Seq(1L, 4L, 5L, 8L)
+ }
+
+ "emit all events when a gap mixes hard-deleted and soft-deleted
(delete-marker) rows" in {
+ // Persist events 1..5.
+ // deleteEventsTo(3) will hard-delete 1..3 and leave a delete marker
+ // (deleted=true) at seqNr 3. Hard-delete seqNr 4 separately.
+ // Only seqNr 5 survives and must be returned.
+ val pid = nextPid()
+ persistEvents(pid, 5)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteEventsTo(pid, 3L).futureValue
+
+ hardDeleteSeqNrs(pid, 4L)
+
+ val seqNrs = currentSeqNrs(pid, to = 5)
+ seqNrs shouldBe Seq(5L)
+ }
+
+ "complete with no events when every event is covered by a delete marker"
in {
+ // deleteEventsTo(3) hard-deletes 1..3 and inserts a delete marker at 3.
+ // Since we query [1,3] and the marker has deleted=true, no event must
+ // be emitted.
+ val pid = nextPid()
+ persistEvents(pid, 3)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteEventsTo(pid, 3L).futureValue
+
+ val seqNrs = currentSeqNrs(pid, to = 3)
+ seqNrs shouldBe empty
+ }
+
+ // The journal's deleteEventsTo hard-deletes everything up to the target
+ // sequence number and leaves a single delete marker (deleted=true) at that
+ // number. Events beyond the target must still be discoverable – this is
+ // the "prefix purge" scenario from pekko-persistence-jdbc#516.
+ "emit the remaining events after a prefix purge via deleteEventsTo" in {
+ val pid = nextPid()
+ persistEvents(pid, 4)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteEventsTo(pid, 3L).futureValue
+
+ val seqNrs = currentSeqNrs(pid, to = 4)
+ seqNrs shouldBe Seq(4L)
+ }
+
+ "emit remaining events after deleteEventsTo leaves a gap wider than the
buffer size" in {
+ val pid = nextPid()
+ persistEvents(pid, 6)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteEventsTo(pid, 4L).futureValue
+
+ val seqNrs = currentSeqNrs(pid, to = 6)
+ seqNrs shouldBe Seq(5L, 6L)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // eventsByPersistenceId (live / unbounded)
+ //
---------------------------------------------------------------------------
+
+ "eventsByPersistenceId (live query)" should {
+
+ "emit all events when a hard-deleted gap is wider than the buffer size" in
{
+ val pid = nextPid()
+ persistEvents(pid, 4)
+ hardDeleteSeqNrs(pid, 2L, 3L)
+
+ val sub = query
+ .eventsByPersistenceId(pid, 1L, 4L)
+ .map(_.sequenceNr)
+ .runWith(TestSink[Long]())
+
+ sub.request(4)
+ sub.expectNextN(Seq(1L, 4L))
+ sub.expectComplete()
+ }
+
+ "emit the remaining events after a prefix purge via deleteEventsTo (live
query)" in {
+ val pid = nextPid()
+ persistEvents(pid, 4)
+
+ val cleanup = new EventSourcedCleanup(system)
+ cleanup.deleteEventsTo(pid, 3L).futureValue
+
+ val sub = query
+ .eventsByPersistenceId(pid, 1L, 4L)
+ .map(_.sequenceNr)
+ .runWith(TestSink[Long]())
+
+ sub.request(4)
+ sub.expectNext(4L)
+ sub.expectComplete()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]