This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new a410f9d2 Emit all messages across sequence number gaps in 
messagesWithBatch (#517) (#521)
a410f9d2 is described below

commit a410f9d21d08ac0e150a4cf3848bfce9f3bb394c
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jun 14 16:59:50 2026 +0100

    Emit all messages across sequence number gaps in messagesWithBatch (#517) 
(#521)
    
    * Emit all messages across sequence number gaps in messagesWithBatch #516
    
    * The query windowing introduced by #180 treated a windowed batch returning
      fewer messages than batchSize as the end of the journal: bounded streams
      (recovery, currentEventsByPersistenceId) completed early and silently
      dropped every message beyond a gap left by deleted messages, and live
      streams stalled forever polling an empty window. #195 fixed only the gap
      at the head of the journal.
    * Rework the unfoldAsync state from FlowControl into a QueryPlan state
      machine (QueryRemaining, QueryWindow, PollRemaining, Complete): a short
      windowed batch is no longer conclusive and falls back to one query over
      the full remaining range, which gaps cannot hide messages from.
    * Keep windowing as the dense fast path of #180: after a full batch shows
      the journal to be dense, queries stay bounded to [from, from + batchSize].
    * Span the full remaining range on the first query, crossing a deleted
      journal head (snapshot cleanup) in a single round trip, subsuming the
      #195 special case.
    * Poll the full remaining range when tailing: a windowed poll can never
      reach messages appended beyond a trailing gap.
    * Specify the gap state machine in MessagesWithBatchTest (core, in-memory
      journal stub) and the database-coupled behavior in
      MessagesWithBatchDatabaseContractTest (H2 plus the five integration
      databases): hard and soft deletes, a prefix purge through the journal's
      delete, and serialization failures.
    
    * Use a non-empty writer in the corrupt-row test fixture
    
    Oracle treats the empty string as NULL, so inserting the synthetic
    corrupt row with writer = "" violated the NOT NULL constraint on
    EVENT_JOURNAL.WRITER (ORA-01400) and failed only the Oracle integration
    tests. The production write path always carries a writer UUID, so this is
    a test-fixture-only fix.
    
    * Use the standard Apache header on the new test files
    
    These files contain no Akka-derived code, so they should carry the full
    ASF header (the headerLicense the build stamps on new files) rather than
    the shorter "derived from Akka" variant, per a maintainer review note.
    
    * Note batchSize+1 window width in LimitWindowingStreamTest
    
    Co-authored-by: Diego D. Mucciolo 
<[email protected]>
---
 .../dao/BaseJournalDaoWithReadMessages.scala       | 136 +++++---
 .../journal/dao/LimitWindowingStreamTest.scala     |   1 +
 .../MessagesWithBatchDatabaseContractTest.scala    | 195 +++++++++++
 .../jdbc/journal/dao/MessagesWithBatchTest.scala   | 370 +++++++++++++++++++++
 .../MessagesWithBatchDatabaseContractTest.scala    |  47 +++
 5 files changed, 701 insertions(+), 48 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
index 30f35697..414dc25f 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
@@ -19,7 +19,6 @@ import pekko.NotUsed
 import pekko.actor.Scheduler
 import pekko.annotation.InternalApi
 import pekko.persistence.PersistentRepr
-import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, 
ContinueDelayed, Stop }
 import pekko.stream.Materializer
 import pekko.stream.scaladsl.{ Sink, Source }
 
@@ -28,6 +27,7 @@ import scala.concurrent.{ ExecutionContext, Future }
 import scala.util.{ Failure, Success, Try }
 
 trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
+  import BaseJournalDaoWithReadMessages._
 
   implicit val ec: ExecutionContext
   implicit val mat: Materializer
@@ -42,7 +42,7 @@ trait BaseJournalDaoWithReadMessages extends 
JournalDaoWithReadMessages {
   }
 
   /**
-   * separate this method for unit tests.
+   * Separate this method for unit tests.
    */
   @InternalApi
   private[dao] def internalBatchStream(
@@ -50,54 +50,94 @@ trait BaseJournalDaoWithReadMessages extends 
JournalDaoWithReadMessages {
       fromSequenceNr: Long,
       toSequenceNr: Long,
       batchSize: Int,
-      refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
-    val firstSequenceNr: Long = Math.max(1, fromSequenceNr)
+      refreshInterval: Option[(FiniteDuration, Scheduler)]
+  ): Source[Seq[Try[(PersistentRepr, Long)]], NotUsed] = {
+
+    val normalizedBatchSize = Math.max(1, batchSize)
+    val normalizedFromSequenceNr = Math.max(1, fromSequenceNr)
+
+    def pollOrComplete(from: Long): QueryPlan = refreshInterval match {
+      case Some((delay, scheduler)) => PollRemaining(from, delay, scheduler)
+      case None                     => Complete
+    }
+
+    // A windowed query spans [from, min(from + batchSize, toSequenceNr)]; at 
full width that is
+    // batchSize + 1 sequence numbers, which lets a window tolerate one 
missing sequence number
+    // and still return a full batch.
+    def windowedQuery(from: Long): QueryWindow = {
+      val endInclusive =
+        if (from <= (Long.MaxValue - normalizedBatchSize)) Math.min(from + 
normalizedBatchSize, toSequenceNr)
+        else toSequenceNr // from + batchSize overflows; fall back to the full 
remaining range
+      QueryWindow(from, endInclusive)
+    }
+
+    def retrieveBatch(from: Long, endInclusive: Long): 
Future[Option[(QueryPlan, Seq[Try[(PersistentRepr, Long)]])]] =
+      messages(persistenceId, from, endInclusive, 
normalizedBatchSize).runWith(Sink.seq).map { batch =>
+        // Messages are ordered by sequence number, therefore the last one is 
the largest
+        val lastSeqNrInBatch: Option[Long] = batch.lastOption match {
+          case Some(Success((repr, _))) => Some(repr.sequenceNr)
+          case Some(Failure(cause))     => throw cause // fail the returned 
Future
+          case None                     => None
+        }
+        val hasReachedToSequenceNr = lastSeqNrInBatch.exists(_ >= toSequenceNr)
+        val isFullBatch = batch.size == normalizedBatchSize
+        val wasWindowedQuery = endInclusive < toSequenceNr
+        // fromSequenceNr beyond toSequenceNr requests an empty range, which 
must complete
+        // rather than poll a range that can never produce messages
+        val isEmptyRange = from > toSequenceNr
+        val nextFrom: Long = lastSeqNrInBatch.map(_ + 1).getOrElse(from)
+
+        // Deleted messages leave gaps, so a window may hold fewer live 
messages than batchSize
+        // even when more messages exist beyond it. A short batch is 
conclusive only when the
+        // queried range reached toSequenceNr, otherwise requery the full 
remaining range.
+        val nextQueryPlan: QueryPlan =
+          if (hasReachedToSequenceNr || isEmptyRange) Complete
+          else if (isFullBatch) windowedQuery(nextFrom)
+          else if (wasWindowedQuery) QueryRemaining(nextFrom)
+          else pollOrComplete(nextFrom)
+
+        Some((nextQueryPlan, batch))
+      }
+
+    // A full-range first query crosses a leading gap (e.g. journal purged up 
to a snapshot)
+    // in a single round trip; the LIMITed query costs the same as a windowed 
one when dense.
+    // Once a full batch shows the journal to be dense, reads switch to 
bounded windows, the
+    // perf safeguard introduced by PR #180, and fall back to a full-range 
query on a gap.
     Source
-      .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, 
Long)]]]((firstSequenceNr, Continue)) {
-        case (from, control) =>
-          def limitWindow(from: Long): Long = {
-            if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue - 
batchSize) < from) {
-              toSequenceNr
-            } else {
-              Math.min(from + batchSize, toSequenceNr)
-            }
-          }
-
-          def retrieveNextBatch(): Future[Option[((Long, FlowControl), 
Seq[Try[(PersistentRepr, Long)]])]] = {
-            for {
-              xs <- messages(persistenceId, from, limitWindow(from), 
batchSize).runWith(Sink.seq)
-            } yield {
-              val hasMoreEvents = xs.size == batchSize
-              // Events are ordered by sequence number, therefore the last one 
is the largest)
-              val lastSeqNrInBatch: Option[Long] = xs.lastOption match {
-                case Some(Success((repr, _))) => Some(repr.sequenceNr)
-                case Some(Failure(e))         => throw e // fail the returned 
Future
-                case None                     => None
-              }
-              val hasLastEvent = lastSeqNrInBatch.exists(_ >= toSequenceNr)
-              val nextControl: FlowControl =
-                if (hasLastEvent || from > toSequenceNr) Stop
-                else if (hasMoreEvents) Continue
-                else if (refreshInterval.isEmpty) Stop
-                else ContinueDelayed
-
-              val nextFrom: Long = lastSeqNrInBatch match {
-                // Continue querying from the last sequence number (the events 
are ordered)
-                case Some(lastSeqNr) => lastSeqNr + 1
-                case None            => from
-              }
-              Some(((nextFrom, nextControl), xs))
-            }
-          }
-
-          control match {
-            case Stop     => Future.successful(None)
-            case Continue => retrieveNextBatch()
-            case ContinueDelayed =>
-              val (delay, scheduler) = refreshInterval.get
-              pekko.pattern.after(delay, scheduler)(retrieveNextBatch())
-          }
+      .unfoldAsync[QueryPlan, Seq[Try[(PersistentRepr, 
Long)]]](QueryRemaining(normalizedFromSequenceNr)) {
+        case Complete =>
+          Future.successful(None)
+        case QueryWindow(from, endInclusive) =>
+          retrieveBatch(from, endInclusive)
+        case QueryRemaining(from) =>
+          retrieveBatch(from, toSequenceNr)
+        case PollRemaining(from, delay, scheduler) =>
+          pekko.pattern.after(delay, scheduler)(retrieveBatch(from, 
toSequenceNr))
       }
   }
+}
+
+private[dao] object BaseJournalDaoWithReadMessages {
+
+  /** The query the batch stream will run. */
+  private sealed trait QueryPlan
+
+  /**
+   * Query the window [from, endInclusive]: the dense fast path, planned only 
after a full batch
+   * showed the messages to be dense. Windowing bounds the range a query 
scans; it is
+   * not load-bearing for correctness, since a short window falls back to 
[[QueryRemaining]].
+   */
+  private final case class QueryWindow(from: Long, endInclusive: Long) extends 
QueryPlan
+
+  /**
+   * Query the whole remaining range [from, toSequenceNr]: planned first, and 
whenever a short
+   * windowed batch leaves the remainder undetermined, since gaps cannot hide 
messages from an
+   * unwindowed query.
+   */
+  private final case class QueryRemaining(from: Long) extends QueryPlan
+
+  /** Poll the remaining range after `delay`: the live tail, when the range is 
exhausted but may grow. */
+  private final case class PollRemaining(from: Long, delay: FiniteDuration, 
scheduler: Scheduler) extends QueryPlan
 
+  private case object Complete extends QueryPlan
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
index be2679fc..bc351063 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
@@ -74,6 +74,7 @@ abstract class LimitWindowingStreamTest(configFile: String)
 
       lastInsert.futureValue(Timeout(totalMessages.seconds))
       val readMessagesDao = dao.asInstanceOf[BaseJournalDaoWithReadMessages]
+      // Asserts batch/message counts only. Windows are intentionally 
batchSize+1 wide (see BaseJournalDaoWithReadMessages.windowedQuery).
       val messagesSrc =
         readMessagesDao.internalBatchStream(persistenceId, 0, totalMessages, 
batchSize = fetchSize, None)
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchDatabaseContractTest.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchDatabaseContractTest.scala
new file mode 100644
index 00000000..b631a78b
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchDatabaseContractTest.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.journal.dao
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.actor.ActorSystem
+import pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec }
+import pekko.persistence.{ AtomicWrite, PersistentRepr }
+import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.stream.{ Materializer, SystemMaterializer }
+
+import java.io.NotSerializableException
+import java.util.UUID
+import scala.collection.immutable
+import scala.concurrent.duration._
+import scala.concurrent.{ ExecutionContext, Future }
+
+/**
+ * Covers the database-coupled side of the batched message stream: hard 
deletes as a
+ * representative contract check of the `messages` query (ascending order, 
limit, inclusive
+ * range), soft deletes (the `deleted` flag filter), the journal's own prefix 
purge
+ * (`delete`), and serialization failures. The full gap state-machine 
specification lives
+ * in MessagesWithBatchTest.
+ */
+abstract class MessagesWithBatchDatabaseContractTest(configFile: String) 
extends QueryTestSpec(configFile) {
+
+  private lazy val journalQueries =
+    new JournalQueries(profile, journalConfig.eventJournalTableConfiguration, 
journalConfig.eventTagTableConfiguration)
+
+  it should "emit all messages when a hard-deleted gap is wider than the batch 
size" in
+  withActorSystem { implicit system =>
+    withSetup { (dao, persistenceId) =>
+      persistMessages(dao, persistenceId, count = 4)
+      hardDelete(persistenceId, 2, 3)
+
+      val emitted = emittedSequenceNrs(dao, persistenceId, toSequenceNr = 4, 
batchSize = 1)
+
+      emitted shouldBe Seq(1L, 4L)
+    }
+  }
+
+  it should "emit messages beyond a hard-deleted gap wider than the batch size 
when polling with a refresh interval" in
+  withActorSystem { implicit system =>
+    withSetup { (dao, persistenceId) =>
+      persistMessages(dao, persistenceId, count = 4)
+      hardDelete(persistenceId, 2, 3)
+
+      val emitted =
+        emittedSequenceNrs(dao, persistenceId, toSequenceNr = 4, batchSize = 
1, count = Some(2),
+          refreshInterval = Some(50.millis))
+
+      emitted shouldBe Seq(1L, 4L)
+    }
+  }
+
+  it should "emit all messages when a gap mixes hard-deleted and soft-deleted 
messages" in
+  withActorSystem { implicit system =>
+    withSetup { (dao, persistenceId) =>
+      persistMessages(dao, persistenceId, count = 5)
+      hardDelete(persistenceId, 2)
+      softDelete(persistenceId, 3)
+      hardDelete(persistenceId, 4)
+
+      val emitted = emittedSequenceNrs(dao, persistenceId, toSequenceNr = 5, 
batchSize = 1)
+
+      emitted shouldBe Seq(1L, 5L)
+    }
+  }
+
+  it should "complete with no messages when every message is soft-deleted" in
+  withActorSystem { implicit system =>
+    withSetup { (dao, persistenceId) =>
+      persistMessages(dao, persistenceId, count = 3)
+      softDelete(persistenceId, 1, 2, 3)
+
+      val emitted = emittedSequenceNrs(dao, persistenceId, toSequenceNr = 3, 
batchSize = 1)
+
+      emitted shouldBe empty
+    }
+  }
+
+  // The journal's prefix purge hard-deletes the messages below the highest 
affected sequence
+  // number and soft-deletes that one, leaving a leading gap: the scenario 
behind issue #516
+  it should "emit the remaining messages after a prefix purge through the 
journal's delete" in
+  withActorSystem { implicit system =>
+    withSetup { (dao, persistenceId) =>
+      persistMessages(dao, persistenceId, count = 4)
+      dao.delete(persistenceId, 3).futureValue
+
+      val emitted = emittedSequenceNrs(dao, persistenceId, toSequenceNr = 4, 
batchSize = 1)
+
+      emitted shouldBe Seq(4L)
+    }
+  }
+
+  it should "fail the stream when the last message in a batch cannot be 
deserialized" in
+  withActorSystem { implicit system =>
+    withSetup { (dao, persistenceId) =>
+      persistMessages(dao, persistenceId, count = 1)
+      persistCorruptMessage(persistenceId, sequenceNr = 2)
+
+      val result = dao
+        .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr = 
2, batchSize = 2,
+          refreshInterval = None)
+        .runWith(Sink.seq)
+
+      result.failed.futureValue shouldBe a[NotSerializableException]
+    }
+  }
+
+  private implicit def materializer(implicit system: ActorSystem): 
Materializer =
+    SystemMaterializer(system).materializer
+
+  private def withSetup(f: (JournalDao, String) => Unit)(implicit system: 
ActorSystem): Unit = {
+    implicit val ec: ExecutionContext = system.dispatcher
+    withDao(dao => f(dao, UUID.randomUUID().toString))
+  }
+
+  private def persistMessages(dao: JournalDao, persistenceId: String, count: 
Int): Unit = {
+    val writerUuid = UUID.randomUUID().toString
+    val payload = Array.fill(8)('a'.toByte)
+    val writes = (1 to count).map { sequenceNr =>
+      AtomicWrite(immutable.Seq(PersistentRepr(payload, sequenceNr, 
persistenceId, writerUuid = writerUuid)))
+    }
+    dao.asyncWriteMessages(writes).futureValue
+  }
+
+  private def hardDelete(persistenceId: String, sequenceNrs: Long*): Unit = {
+    import profile.api._
+    val deleteRows = journalQueries.JournalTable
+      .filter(row => row.persistenceId === persistenceId && 
row.sequenceNumber.inSet(sequenceNrs))
+      .delete
+    db.run(deleteRows).futureValue
+  }
+
+  private def softDelete(persistenceId: String, sequenceNrs: Long*): Unit =
+    sequenceNrs.foreach { sequenceNr =>
+      db.run(journalQueries.markSeqNrJournalMessagesAsDeleted(persistenceId, 
sequenceNr)).futureValue
+    }
+
+  /** Inserts a journal row whose serializer id is unknown, so reading it 
yields a Failure. */
+  private def persistCorruptMessage(persistenceId: String, sequenceNr: Long): 
Unit = {
+    import profile.api._
+    val unknownSerializerId = 999999
+    val corruptRow = JournalTables.JournalPekkoSerializationRow(Long.MinValue, 
deleted = false, persistenceId,
+      sequenceNr, writer = UUID.randomUUID().toString, writeTimestamp = 0L, 
adapterManifest = "",
+      eventPayload = Array.fill(8)('x'.toByte),
+      eventSerId = unknownSerializerId, eventSerManifest = "", metaPayload = 
None, metaSerId = None,
+      metaSerManifest = None)
+    db.run(journalQueries.JournalTable += corruptRow).futureValue
+  }
+
+  /**
+   * Collects the sequence numbers the batched message stream emits. A polling 
stream never
+   * completes on its own, so pass `count` to bound it.
+   */
+  private def emittedSequenceNrs(dao: JournalDao, persistenceId: String, 
toSequenceNr: Long, batchSize: Int,
+      refreshInterval: Option[FiniteDuration] = None, count: Option[Int] = 
None)(
+      implicit system: ActorSystem): Seq[Long] = {
+    val sequenceNrs = sequenceNrSource(dao, persistenceId, toSequenceNr, 
batchSize, refreshInterval)
+    count
+      .fold(sequenceNrs)(sequenceNrs.take(_))
+      .completionTimeout(10.seconds)
+      .runWith(Sink.seq)
+      .futureValue
+  }
+
+  private def sequenceNrSource(dao: JournalDao, persistenceId: String, 
toSequenceNr: Long,
+      batchSize: Int, refreshInterval: Option[FiniteDuration])(implicit 
system: ActorSystem): Source[Long, NotUsed] =
+    dao
+      .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr, 
batchSize,
+        refreshInterval.map(_ -> system.scheduler))
+      .mapAsync(1)(Future.fromTry)
+      .map { case (repr, _) => repr.sequenceNr }
+}
+
+final class H2MessagesWithBatchDatabaseContractTest
+    extends MessagesWithBatchDatabaseContractTest("h2-application.conf")
+    with H2Cleaner
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchTest.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchTest.scala
new file mode 100644
index 00000000..561e08e9
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchTest.scala
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.journal.dao
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.actor.ActorSystem
+import pekko.persistence.PersistentRepr
+import pekko.persistence.jdbc.SimpleSpec
+import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.stream.testkit.TestSubscriber
+import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.{ Materializer, SystemMaterializer }
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import scala.concurrent.duration._
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
+import scala.util.{ Failure, Success, Try }
+
+/**
+ * Specifies the batched message stream against an in-memory journal, where a 
missing
+ * sequence number stands for a deleted message. The stream must emit all live 
messages
+ * regardless of such gaps. Database-coupled behavior (soft-delete filtering, 
serialization
+ * failures, per-database query semantics) is covered by 
MessagesWithBatchDatabaseContractTest.
+ */
+final class MessagesWithBatchTest extends SimpleSpec {
+
+  private implicit val system: ActorSystem = 
ActorSystem("MessagesWithBatchTest")
+  private implicit val materializer: Materializer = 
SystemMaterializer(system).materializer
+
+  private val persistenceId = "pid"
+
+  // SimpleSpec leaves ScalaFutures' default 150ms patience in place, too 
tight for CI
+  private implicit val pc: PatienceConfig = PatienceConfig(timeout = 
10.seconds)
+
+  override def afterAll(): Unit = {
+    system.terminate().futureValue
+    super.afterAll()
+  }
+
+  "messagesWithBatch" should "emit all messages when a single missing message 
lies within the batch window" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 3L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 3, batchSize = 2)
+
+    emitted shouldBe Seq(1L, 3L)
+  }
+
+  it should "emit all messages when a single missing message lies within the 
batch window of size one" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 3L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 3, batchSize = 1)
+
+    emitted shouldBe Seq(1L, 3L)
+  }
+
+  it should "emit all messages when a gap is wider than the batch size" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 4, batchSize = 1)
+
+    emitted shouldBe Seq(1L, 4L)
+  }
+
+  it should "emit all messages when a batch window contains fewer live 
messages than the batch size" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L, 4L, 6L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 6, batchSize = 2)
+
+    emitted shouldBe Seq(1L, 2L, 4L, 6L)
+  }
+
+  it should "emit messages beyond a gap wider than the batch size when polling 
with a refresh interval" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+
+    val emitted =
+      emittedSequenceNrs(dao, toSequenceNr = 4, batchSize = 1, count = 
Some(2), refreshInterval = Some(50.millis))
+
+    emitted shouldBe Seq(1L, 4L)
+  }
+
+  it should "emit all messages when messages at the start of the journal are 
missing" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(3L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 3, batchSize = 1)
+
+    emitted shouldBe Seq(3L)
+  }
+
+  it should "emit all messages when a gap is wider than the batch size up to 
an unbounded toSequenceNr" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = Long.MaxValue, 
batchSize = 1)
+
+    emitted shouldBe Seq(1L, 4L)
+  }
+
+  it should "emit messages beyond a gap when polling up to an unbounded 
toSequenceNr" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+
+    val emitted =
+      emittedSequenceNrs(dao, toSequenceNr = Long.MaxValue, batchSize = 1, 
count = Some(2),
+        refreshInterval = Some(50.millis))
+
+    emitted shouldBe Seq(1L, 4L)
+  }
+
+  it should "emit all messages when the batch size is zero" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 2, batchSize = 0)
+
+    emitted shouldBe Seq(1L, 2L)
+  }
+
+  it should "emit all messages across multiple gaps wider than the batch size" 
in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L, 5L, 8L, 9L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 9, batchSize = 1)
+
+    emitted shouldBe Seq(1L, 4L, 5L, 8L, 9L)
+  }
+
+  it should "keep emitting newly appended messages after crossing a gap in a 
live stream" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+    val probe = sequenceNrProbe(dao, toSequenceNr = Long.MaxValue, batchSize = 
1, refreshInterval = Some(50.millis))
+
+    probe.request(4)
+    probe.expectNext(1L, 4L)
+    dao.append(5L, 6L)
+
+    probe.expectNext(5L, 6L)
+    probe.cancel()
+  }
+
+  it should "emit only newly appended messages when polling resumes after a 
short non-empty batch" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L))
+    val probe = sequenceNrProbe(dao, toSequenceNr = Long.MaxValue, batchSize = 
2, refreshInterval = Some(50.millis))
+
+    probe.request(3)
+    probe.expectNext(1L)
+    dao.append(2L, 3L)
+
+    probe.expectNext(2L, 3L)
+    probe.cancel()
+  }
+
+  it should "complete after emitting all messages when the journal ends with a 
gap" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 4, batchSize = 2)
+
+    emitted shouldBe Seq(1L, 2L)
+  }
+
+  it should
+  "keep polling without emitting or completing when a live stream reaches a 
trailing gap below toSequenceNr" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+    val probe = sequenceNrProbe(dao, toSequenceNr = 4, batchSize = 2, 
refreshInterval = Some(50.millis))
+
+    probe.request(4)
+    probe.expectNext(1L, 2L)
+
+    probe.expectNoMessage(500.millis)
+    probe.cancel()
+  }
+
+  it should "complete a bounded live stream once appended messages reach 
toSequenceNr" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+    val probe = sequenceNrProbe(dao, toSequenceNr = 4, batchSize = 2, 
refreshInterval = Some(50.millis))
+
+    probe.request(5)
+    probe.expectNext(1L, 2L)
+    dao.append(3L, 4L)
+
+    probe.expectNext(3L, 4L)
+    probe.expectComplete()
+  }
+
+  it should "emit messages with sequence numbers near Long.MaxValue without 
overflowing the batch window" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(Long.MaxValue - 2, 
Long.MaxValue - 1))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = Long.MaxValue, 
batchSize = 1,
+      fromSequenceNr = Long.MaxValue - 2)
+
+    emitted shouldBe Seq(Long.MaxValue - 2, Long.MaxValue - 1)
+  }
+
+  it should "complete with no messages when the journal is empty" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq.empty)
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 3, batchSize = 1)
+
+    emitted shouldBe empty
+  }
+
+  it should "emit messages from a fromSequenceNr that lies inside a gap" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L, 5L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 5, batchSize = 1, 
fromSequenceNr = 2)
+
+    emitted shouldBe Seq(4L, 5L)
+  }
+
+  // The read journal defaults fromSequenceNr to 0, below the journal's 
1-based numbering
+  it should "emit all messages when fromSequenceNr is zero" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 2, batchSize = 1, 
fromSequenceNr = 0)
+
+    emitted shouldBe Seq(1L, 2L)
+  }
+
+  it should "complete without emitting when fromSequenceNr exceeds 
toSequenceNr" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L, 3L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 2, batchSize = 1, 
fromSequenceNr = 3)
+
+    emitted shouldBe empty
+  }
+
+  it should "not emit messages beyond toSequenceNr when crossing a gap" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 5L, 6L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 4, batchSize = 1)
+
+    emitted shouldBe Seq(1L)
+  }
+
+  it should "emit all messages when single-message gaps occur in separate 
batch windows" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 3L, 4L, 6L, 7L, 
9L))
+
+    val emitted = emittedSequenceNrs(dao, toSequenceNr = 9, batchSize = 1)
+
+    emitted shouldBe Seq(1L, 3L, 4L, 6L, 7L, 9L)
+  }
+
+  it should "fail the stream when the last message of a batch cannot be read" 
in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L), 
corruptSequenceNrs = Set(2L))
+
+    val result = dao
+      .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr = 2, 
batchSize = 2, refreshInterval = None)
+      .runWith(Sink.seq)
+
+    result.failed.futureValue shouldBe a[CorruptMessageException]
+  }
+
+  // Only a Failure in the last position fails the stream (the next query 
needs its sequence
+  // number); a mid-batch Failure is passed through for the consumer to handle
+  it should "emit an unreadable message as a failed element followed by the 
remaining messages" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L, 3L), 
corruptSequenceNrs = Set(2L))
+
+    val emitted = dao
+      .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr = 3, 
batchSize = 3, refreshInterval = None)
+      .runWith(Sink.seq)
+      .futureValue
+
+    emitted.map(_.toOption.map { case (repr, _) => repr.sequenceNr }) shouldBe 
Seq(Some(1L), None, Some(3L))
+  }
+
+  // The first batch query must span the entire remaining range, so a journal 
whose head is
+  // deleted (e.g. purged up to a snapshot) is crossed in a single round trip 
instead of an
+  // empty windowed query followed by a full-range one.
+  it should "cross a leading gap with a single query spanning the full 
remaining range" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(100L, 101L, 102L))
+
+    dao
+      .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr = 
102, batchSize = 3, refreshInterval = None)
+      .runWith(Sink.seq)
+      .futureValue
+
+    dao.queriedRanges shouldBe Seq(1L -> 102L)
+  }
+
+  // Once a full batch shows the journal to be dense, subsequent queries must 
be bounded
+  // windows rather than full-range scans: the perf safeguard introduced by PR 
#180
+  it should "query bounded windows after a full batch shows the journal to be 
dense" in {
+    val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L, 3L, 4L, 5L, 
6L))
+
+    dao
+      .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr = 6, 
batchSize = 2, refreshInterval = None)
+      .runWith(Sink.seq)
+      .futureValue
+
+    dao.queriedRanges shouldBe Seq(1L -> 6L, 3L -> 5L, 5L -> 6L)
+  }
+
+  /**
+   * Collects the sequence numbers the batched message stream emits. A polling 
stream never
+   * completes on its own, so pass `count` to bound it.
+   */
+  private def emittedSequenceNrs(dao: InMemoryJournalDao, toSequenceNr: Long, 
batchSize: Int,
+      fromSequenceNr: Long =
+        1, refreshInterval: Option[FiniteDuration] = None, count: Option[Int] 
= None): Seq[Long] = {
+    val sequenceNrs = sequenceNrSource(dao, fromSequenceNr, toSequenceNr, 
batchSize, refreshInterval)
+    count
+      .fold(sequenceNrs)(sequenceNrs.take(_))
+      .completionTimeout(10.seconds)
+      .runWith(Sink.seq)
+      .futureValue
+  }
+
+  private def sequenceNrProbe(dao: InMemoryJournalDao, toSequenceNr: Long, 
batchSize: Int,
+      refreshInterval: Option[FiniteDuration]): TestSubscriber.Probe[Long] =
+    sequenceNrSource(dao, fromSequenceNr = 1, toSequenceNr, batchSize, 
refreshInterval)
+      .runWith(TestSink[Long]())
+
+  private def sequenceNrSource(dao: InMemoryJournalDao, fromSequenceNr: Long, 
toSequenceNr: Long,
+      batchSize: Int, refreshInterval: Option[FiniteDuration]): Source[Long, 
NotUsed] =
+    dao
+      .messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, 
batchSize,
+        refreshInterval.map(_ -> system.scheduler))
+      .mapAsync(1)(Future.fromTry)
+      .map { case (repr, _) => repr.sequenceNr }
+
+  /**
+   * Serves batches from an in-memory set of live sequence numbers, honoring 
the contract of
+   * `messages`: ascending order, inclusive bounds, at most `max` elements. 
Sequence numbers
+   * in `corruptSequenceNrs` yield Failure elements, as unreadable rows do. 
Records each
+   * queried range and supports appends for live-stream tests.
+   */
+  private final class InMemoryJournalDao(liveSequenceNrs: Seq[Long], 
corruptSequenceNrs: Set[Long] = Set.empty)
+      extends BaseJournalDaoWithReadMessages {
+    implicit val ec: ExecutionContext = system.dispatcher
+    implicit val mat: Materializer = materializer
+
+    @volatile private var sequenceNrs: Vector[Long] = 
liveSequenceNrs.sorted.toVector
+    private val ranges = new ConcurrentLinkedQueue[(Long, Long)]
+
+    def queriedRanges: Seq[(Long, Long)] = ranges.asScala.toSeq
+
+    def append(appendedSequenceNrs: Long*): Unit =
+      sequenceNrs = (sequenceNrs ++ appendedSequenceNrs).sorted
+
+    override def messages(
+        persistenceId: String,
+        fromSequenceNr: Long,
+        toSequenceNr: Long,
+        max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = {
+      ranges.add(fromSequenceNr -> toSequenceNr)
+      val batch = sequenceNrs
+        .filter(sequenceNr => sequenceNr >= fromSequenceNr && sequenceNr <= 
toSequenceNr)
+        .take(max.toInt)
+        .map { sequenceNr =>
+          if (corruptSequenceNrs.contains(sequenceNr)) Failure(new 
CorruptMessageException(sequenceNr))
+          else Success(PersistentRepr("payload", sequenceNr, persistenceId) -> 
sequenceNr)
+        }
+      Source(batch.toList)
+    }
+  }
+
+  private final class CorruptMessageException(sequenceNr: Long)
+      extends RuntimeException(s"Cannot read message $sequenceNr")
+}
diff --git 
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MessagesWithBatchDatabaseContractTest.scala
 
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MessagesWithBatchDatabaseContractTest.scala
new file mode 100644
index 00000000..c8e7cb53
--- /dev/null
+++ 
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MessagesWithBatchDatabaseContractTest.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.integration
+
+import 
org.apache.pekko.persistence.jdbc.journal.dao.MessagesWithBatchDatabaseContractTest
+import org.apache.pekko.persistence.jdbc.query.{
+  MariaDBCleaner,
+  MysqlCleaner,
+  OracleCleaner,
+  PostgresCleaner,
+  SqlServerCleaner
+}
+
+final class PostgresMessagesWithBatchDatabaseContractTest
+    extends MessagesWithBatchDatabaseContractTest("postgres-application.conf")
+    with PostgresCleaner
+
+final class MySQLMessagesWithBatchDatabaseContractTest
+    extends MessagesWithBatchDatabaseContractTest("mysql-application.conf")
+    with MysqlCleaner
+
+final class MariaDBMessagesWithBatchDatabaseContractTest
+    extends MessagesWithBatchDatabaseContractTest("mariadb-application.conf")
+    with MariaDBCleaner
+
+final class OracleMessagesWithBatchDatabaseContractTest
+    extends MessagesWithBatchDatabaseContractTest("oracle-application.conf")
+    with OracleCleaner
+
+final class SqlServerMessagesWithBatchDatabaseContractTest
+    extends MessagesWithBatchDatabaseContractTest("sqlserver-application.conf")
+    with SqlServerCleaner


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to