This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new a410f9d2 Emit all messages across sequence number gaps in
messagesWithBatch (#517) (#521)
a410f9d2 is described below
commit a410f9d21d08ac0e150a4cf3848bfce9f3bb394c
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jun 14 16:59:50 2026 +0100
Emit all messages across sequence number gaps in messagesWithBatch (#517)
(#521)
* Emit all messages across sequence number gaps in messagesWithBatch #516
* The query windowing introduced by #180 treated a windowed batch returning
fewer messages than batchSize as the end of the journal: bounded streams
(recovery, currentEventsByPersistenceId) completed early and silently
dropped every message beyond a gap left by deleted messages, and live
streams stalled forever polling an empty window. #195 fixed only the gap
at the head of the journal.
* Rework the unfoldAsync state from FlowControl into a QueryPlan state
machine (QueryRemaining, QueryWindow, PollRemaining, Complete): a short
windowed batch is no longer conclusive and falls back to one query over
the full remaining range, which gaps cannot hide messages from.
* Keep windowing as the dense fast path of #180: after a full batch shows
the journal to be dense, queries stay bounded to [from, from + batchSize].
* Span the full remaining range on the first query, crossing a deleted
journal head (snapshot cleanup) in a single round trip, subsuming the
#195 special case.
* Poll the full remaining range when tailing: a windowed poll can never
reach messages appended beyond a trailing gap.
* Specify the gap state machine in MessagesWithBatchTest (core, in-memory
journal stub) and the database-coupled behavior in
MessagesWithBatchDatabaseContractTest (H2 plus the five integration
databases): hard and soft deletes, a prefix purge through the journal's
delete, and serialization failures.
* Use a non-empty writer in the corrupt-row test fixture
Oracle treats the empty string as NULL, so inserting the synthetic
corrupt row with writer = "" violated the NOT NULL constraint on
EVENT_JOURNAL.WRITER (ORA-01400) and failed only the Oracle integration
tests. The production write path always carries a writer UUID, so this is
a test-fixture-only fix.
* Use the standard Apache header on the new test files
These files contain no Akka-derived code, so they should carry the full
ASF header (the headerLicense the build stamps on new files) rather than
the shorter "derived from Akka" variant, per a maintainer review note.
* Note batchSize+1 window width in LimitWindowingStreamTest
Co-authored-by: Diego D. Mucciolo
<[email protected]>
---
.../dao/BaseJournalDaoWithReadMessages.scala | 136 +++++---
.../journal/dao/LimitWindowingStreamTest.scala | 1 +
.../MessagesWithBatchDatabaseContractTest.scala | 195 +++++++++++
.../jdbc/journal/dao/MessagesWithBatchTest.scala | 370 +++++++++++++++++++++
.../MessagesWithBatchDatabaseContractTest.scala | 47 +++
5 files changed, 701 insertions(+), 48 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
index 30f35697..414dc25f 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
@@ -19,7 +19,6 @@ import pekko.NotUsed
import pekko.actor.Scheduler
import pekko.annotation.InternalApi
import pekko.persistence.PersistentRepr
-import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue,
ContinueDelayed, Stop }
import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Sink, Source }
@@ -28,6 +27,7 @@ import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
+ import BaseJournalDaoWithReadMessages._
implicit val ec: ExecutionContext
implicit val mat: Materializer
@@ -42,7 +42,7 @@ trait BaseJournalDaoWithReadMessages extends
JournalDaoWithReadMessages {
}
/**
- * separate this method for unit tests.
+ * Separate this method for unit tests.
*/
@InternalApi
private[dao] def internalBatchStream(
@@ -50,54 +50,94 @@ trait BaseJournalDaoWithReadMessages extends
JournalDaoWithReadMessages {
fromSequenceNr: Long,
toSequenceNr: Long,
batchSize: Int,
- refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
- val firstSequenceNr: Long = Math.max(1, fromSequenceNr)
+ refreshInterval: Option[(FiniteDuration, Scheduler)]
+ ): Source[Seq[Try[(PersistentRepr, Long)]], NotUsed] = {
+
+ val normalizedBatchSize = Math.max(1, batchSize)
+ val normalizedFromSequenceNr = Math.max(1, fromSequenceNr)
+
+ def pollOrComplete(from: Long): QueryPlan = refreshInterval match {
+ case Some((delay, scheduler)) => PollRemaining(from, delay, scheduler)
+ case None => Complete
+ }
+
+ // A windowed query spans [from, min(from + batchSize, toSequenceNr)]; at
full width that is
+ // batchSize + 1 sequence numbers, which lets a window tolerate one
missing sequence number
+ // and still return a full batch.
+ def windowedQuery(from: Long): QueryWindow = {
+ val endInclusive =
+ if (from <= (Long.MaxValue - normalizedBatchSize)) Math.min(from +
normalizedBatchSize, toSequenceNr)
+ else toSequenceNr // from + batchSize overflows; fall back to the full
remaining range
+ QueryWindow(from, endInclusive)
+ }
+
+ def retrieveBatch(from: Long, endInclusive: Long):
Future[Option[(QueryPlan, Seq[Try[(PersistentRepr, Long)]])]] =
+ messages(persistenceId, from, endInclusive,
normalizedBatchSize).runWith(Sink.seq).map { batch =>
+ // Messages are ordered by sequence number, therefore the last one is
the largest
+ val lastSeqNrInBatch: Option[Long] = batch.lastOption match {
+ case Some(Success((repr, _))) => Some(repr.sequenceNr)
+ case Some(Failure(cause)) => throw cause // fail the returned
Future
+ case None => None
+ }
+ val hasReachedToSequenceNr = lastSeqNrInBatch.exists(_ >= toSequenceNr)
+ val isFullBatch = batch.size == normalizedBatchSize
+ val wasWindowedQuery = endInclusive < toSequenceNr
+ // fromSequenceNr beyond toSequenceNr requests an empty range, which
must complete
+ // rather than poll a range that can never produce messages
+ val isEmptyRange = from > toSequenceNr
+ val nextFrom: Long = lastSeqNrInBatch.map(_ + 1).getOrElse(from)
+
+ // Deleted messages leave gaps, so a window may hold fewer live
messages than batchSize
+ // even when more messages exist beyond it. A short batch is
conclusive only when the
+ // queried range reached toSequenceNr, otherwise requery the full
remaining range.
+ val nextQueryPlan: QueryPlan =
+ if (hasReachedToSequenceNr || isEmptyRange) Complete
+ else if (isFullBatch) windowedQuery(nextFrom)
+ else if (wasWindowedQuery) QueryRemaining(nextFrom)
+ else pollOrComplete(nextFrom)
+
+ Some((nextQueryPlan, batch))
+ }
+
+ // A full-range first query crosses a leading gap (e.g. journal purged up
to a snapshot)
+ // in a single round trip; the LIMITed query costs the same as a windowed
one when dense.
+ // Once a full batch shows the journal to be dense, reads switch to
bounded windows, the
+ // perf safeguard introduced by PR #180, and fall back to a full-range
query on a gap.
Source
- .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr,
Long)]]]((firstSequenceNr, Continue)) {
- case (from, control) =>
- def limitWindow(from: Long): Long = {
- if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue -
batchSize) < from) {
- toSequenceNr
- } else {
- Math.min(from + batchSize, toSequenceNr)
- }
- }
-
- def retrieveNextBatch(): Future[Option[((Long, FlowControl),
Seq[Try[(PersistentRepr, Long)]])]] = {
- for {
- xs <- messages(persistenceId, from, limitWindow(from),
batchSize).runWith(Sink.seq)
- } yield {
- val hasMoreEvents = xs.size == batchSize
- // Events are ordered by sequence number, therefore the last one
is the largest)
- val lastSeqNrInBatch: Option[Long] = xs.lastOption match {
- case Some(Success((repr, _))) => Some(repr.sequenceNr)
- case Some(Failure(e)) => throw e // fail the returned
Future
- case None => None
- }
- val hasLastEvent = lastSeqNrInBatch.exists(_ >= toSequenceNr)
- val nextControl: FlowControl =
- if (hasLastEvent || from > toSequenceNr) Stop
- else if (hasMoreEvents) Continue
- else if (refreshInterval.isEmpty) Stop
- else ContinueDelayed
-
- val nextFrom: Long = lastSeqNrInBatch match {
- // Continue querying from the last sequence number (the events
are ordered)
- case Some(lastSeqNr) => lastSeqNr + 1
- case None => from
- }
- Some(((nextFrom, nextControl), xs))
- }
- }
-
- control match {
- case Stop => Future.successful(None)
- case Continue => retrieveNextBatch()
- case ContinueDelayed =>
- val (delay, scheduler) = refreshInterval.get
- pekko.pattern.after(delay, scheduler)(retrieveNextBatch())
- }
+ .unfoldAsync[QueryPlan, Seq[Try[(PersistentRepr,
Long)]]](QueryRemaining(normalizedFromSequenceNr)) {
+ case Complete =>
+ Future.successful(None)
+ case QueryWindow(from, endInclusive) =>
+ retrieveBatch(from, endInclusive)
+ case QueryRemaining(from) =>
+ retrieveBatch(from, toSequenceNr)
+ case PollRemaining(from, delay, scheduler) =>
+ pekko.pattern.after(delay, scheduler)(retrieveBatch(from,
toSequenceNr))
}
}
+}
+
+private[dao] object BaseJournalDaoWithReadMessages {
+
+ /** The query the batch stream will run. */
+ private sealed trait QueryPlan
+
+ /**
+ * Query the window [from, endInclusive]: the dense fast path, planned only
after a full batch
+ * showed the messages to be dense. Windowing bounds the range a query
scans; it is
+ * not load-bearing for correctness, since a short window falls back to
[[QueryRemaining]].
+ */
+ private final case class QueryWindow(from: Long, endInclusive: Long) extends
QueryPlan
+
+ /**
+ * Query the whole remaining range [from, toSequenceNr]: planned first, and
whenever a short
+ * windowed batch leaves the remainder undetermined, since gaps cannot hide
messages from an
+ * unwindowed query.
+ */
+ private final case class QueryRemaining(from: Long) extends QueryPlan
+
+ /** Poll the remaining range after `delay`: the live tail, when the range is
exhausted but may grow. */
+ private final case class PollRemaining(from: Long, delay: FiniteDuration,
scheduler: Scheduler) extends QueryPlan
+ private case object Complete extends QueryPlan
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
index be2679fc..bc351063 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
@@ -74,6 +74,7 @@ abstract class LimitWindowingStreamTest(configFile: String)
lastInsert.futureValue(Timeout(totalMessages.seconds))
val readMessagesDao = dao.asInstanceOf[BaseJournalDaoWithReadMessages]
+ // Asserts batch/message counts only. Windows are intentionally
batchSize+1 wide (see BaseJournalDaoWithReadMessages.windowedQuery).
val messagesSrc =
readMessagesDao.internalBatchStream(persistenceId, 0, totalMessages,
batchSize = fetchSize, None)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchDatabaseContractTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchDatabaseContractTest.scala
new file mode 100644
index 00000000..b631a78b
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchDatabaseContractTest.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.journal.dao
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.actor.ActorSystem
+import pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec }
+import pekko.persistence.{ AtomicWrite, PersistentRepr }
+import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.stream.{ Materializer, SystemMaterializer }
+
+import java.io.NotSerializableException
+import java.util.UUID
+import scala.collection.immutable
+import scala.concurrent.duration._
+import scala.concurrent.{ ExecutionContext, Future }
+
+/**
+ * Covers the database-coupled side of the batched message stream: hard
deletes as a
+ * representative contract check of the `messages` query (ascending order,
limit, inclusive
+ * range), soft deletes (the `deleted` flag filter), the journal's own prefix
purge
+ * (`delete`), and serialization failures. The full gap state-machine
specification lives
+ * in MessagesWithBatchTest.
+ */
+abstract class MessagesWithBatchDatabaseContractTest(configFile: String)
extends QueryTestSpec(configFile) {
+
+ private lazy val journalQueries =
+ new JournalQueries(profile, journalConfig.eventJournalTableConfiguration,
journalConfig.eventTagTableConfiguration)
+
+ it should "emit all messages when a hard-deleted gap is wider than the batch
size" in
+ withActorSystem { implicit system =>
+ withSetup { (dao, persistenceId) =>
+ persistMessages(dao, persistenceId, count = 4)
+ hardDelete(persistenceId, 2, 3)
+
+ val emitted = emittedSequenceNrs(dao, persistenceId, toSequenceNr = 4,
batchSize = 1)
+
+ emitted shouldBe Seq(1L, 4L)
+ }
+ }
+
+ it should "emit messages beyond a hard-deleted gap wider than the batch size
when polling with a refresh interval" in
+ withActorSystem { implicit system =>
+ withSetup { (dao, persistenceId) =>
+ persistMessages(dao, persistenceId, count = 4)
+ hardDelete(persistenceId, 2, 3)
+
+ val emitted =
+ emittedSequenceNrs(dao, persistenceId, toSequenceNr = 4, batchSize =
1, count = Some(2),
+ refreshInterval = Some(50.millis))
+
+ emitted shouldBe Seq(1L, 4L)
+ }
+ }
+
+ it should "emit all messages when a gap mixes hard-deleted and soft-deleted
messages" in
+ withActorSystem { implicit system =>
+ withSetup { (dao, persistenceId) =>
+ persistMessages(dao, persistenceId, count = 5)
+ hardDelete(persistenceId, 2)
+ softDelete(persistenceId, 3)
+ hardDelete(persistenceId, 4)
+
+ val emitted = emittedSequenceNrs(dao, persistenceId, toSequenceNr = 5,
batchSize = 1)
+
+ emitted shouldBe Seq(1L, 5L)
+ }
+ }
+
+ it should "complete with no messages when every message is soft-deleted" in
+ withActorSystem { implicit system =>
+ withSetup { (dao, persistenceId) =>
+ persistMessages(dao, persistenceId, count = 3)
+ softDelete(persistenceId, 1, 2, 3)
+
+ val emitted = emittedSequenceNrs(dao, persistenceId, toSequenceNr = 3,
batchSize = 1)
+
+ emitted shouldBe empty
+ }
+ }
+
+ // The journal's prefix purge hard-deletes the messages below the highest
affected sequence
+ // number and soft-deletes that one, leaving a leading gap: the scenario
behind issue #516
+ it should "emit the remaining messages after a prefix purge through the
journal's delete" in
+ withActorSystem { implicit system =>
+ withSetup { (dao, persistenceId) =>
+ persistMessages(dao, persistenceId, count = 4)
+ dao.delete(persistenceId, 3).futureValue
+
+ val emitted = emittedSequenceNrs(dao, persistenceId, toSequenceNr = 4,
batchSize = 1)
+
+ emitted shouldBe Seq(4L)
+ }
+ }
+
+ it should "fail the stream when the last message in a batch cannot be
deserialized" in
+ withActorSystem { implicit system =>
+ withSetup { (dao, persistenceId) =>
+ persistMessages(dao, persistenceId, count = 1)
+ persistCorruptMessage(persistenceId, sequenceNr = 2)
+
+ val result = dao
+ .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr =
2, batchSize = 2,
+ refreshInterval = None)
+ .runWith(Sink.seq)
+
+ result.failed.futureValue shouldBe a[NotSerializableException]
+ }
+ }
+
+ private implicit def materializer(implicit system: ActorSystem):
Materializer =
+ SystemMaterializer(system).materializer
+
+ private def withSetup(f: (JournalDao, String) => Unit)(implicit system:
ActorSystem): Unit = {
+ implicit val ec: ExecutionContext = system.dispatcher
+ withDao(dao => f(dao, UUID.randomUUID().toString))
+ }
+
+ private def persistMessages(dao: JournalDao, persistenceId: String, count:
Int): Unit = {
+ val writerUuid = UUID.randomUUID().toString
+ val payload = Array.fill(8)('a'.toByte)
+ val writes = (1 to count).map { sequenceNr =>
+ AtomicWrite(immutable.Seq(PersistentRepr(payload, sequenceNr,
persistenceId, writerUuid = writerUuid)))
+ }
+ dao.asyncWriteMessages(writes).futureValue
+ }
+
+ private def hardDelete(persistenceId: String, sequenceNrs: Long*): Unit = {
+ import profile.api._
+ val deleteRows = journalQueries.JournalTable
+ .filter(row => row.persistenceId === persistenceId &&
row.sequenceNumber.inSet(sequenceNrs))
+ .delete
+ db.run(deleteRows).futureValue
+ }
+
+ private def softDelete(persistenceId: String, sequenceNrs: Long*): Unit =
+ sequenceNrs.foreach { sequenceNr =>
+ db.run(journalQueries.markSeqNrJournalMessagesAsDeleted(persistenceId,
sequenceNr)).futureValue
+ }
+
+ /** Inserts a journal row whose serializer id is unknown, so reading it
yields a Failure. */
+ private def persistCorruptMessage(persistenceId: String, sequenceNr: Long):
Unit = {
+ import profile.api._
+ val unknownSerializerId = 999999
+ val corruptRow = JournalTables.JournalPekkoSerializationRow(Long.MinValue,
deleted = false, persistenceId,
+ sequenceNr, writer = UUID.randomUUID().toString, writeTimestamp = 0L,
adapterManifest = "",
+ eventPayload = Array.fill(8)('x'.toByte),
+ eventSerId = unknownSerializerId, eventSerManifest = "", metaPayload =
None, metaSerId = None,
+ metaSerManifest = None)
+ db.run(journalQueries.JournalTable += corruptRow).futureValue
+ }
+
+ /**
+ * Collects the sequence numbers the batched message stream emits. A polling
stream never
+ * completes on its own, so pass `count` to bound it.
+ */
+ private def emittedSequenceNrs(dao: JournalDao, persistenceId: String,
toSequenceNr: Long, batchSize: Int,
+ refreshInterval: Option[FiniteDuration] = None, count: Option[Int] =
None)(
+ implicit system: ActorSystem): Seq[Long] = {
+ val sequenceNrs = sequenceNrSource(dao, persistenceId, toSequenceNr,
batchSize, refreshInterval)
+ count
+ .fold(sequenceNrs)(sequenceNrs.take(_))
+ .completionTimeout(10.seconds)
+ .runWith(Sink.seq)
+ .futureValue
+ }
+
+ private def sequenceNrSource(dao: JournalDao, persistenceId: String,
toSequenceNr: Long,
+ batchSize: Int, refreshInterval: Option[FiniteDuration])(implicit
system: ActorSystem): Source[Long, NotUsed] =
+ dao
+ .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr,
batchSize,
+ refreshInterval.map(_ -> system.scheduler))
+ .mapAsync(1)(Future.fromTry)
+ .map { case (repr, _) => repr.sequenceNr }
+}
+
+final class H2MessagesWithBatchDatabaseContractTest
+ extends MessagesWithBatchDatabaseContractTest("h2-application.conf")
+ with H2Cleaner
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchTest.scala
new file mode 100644
index 00000000..561e08e9
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/MessagesWithBatchTest.scala
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.journal.dao
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.actor.ActorSystem
+import pekko.persistence.PersistentRepr
+import pekko.persistence.jdbc.SimpleSpec
+import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.stream.testkit.TestSubscriber
+import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.{ Materializer, SystemMaterializer }
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import scala.concurrent.duration._
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
+import scala.util.{ Failure, Success, Try }
+
+/**
+ * Specifies the batched message stream against an in-memory journal, where a
missing
+ * sequence number stands for a deleted message. The stream must emit all live
messages
+ * regardless of such gaps. Database-coupled behavior (soft-delete filtering,
serialization
+ * failures, per-database query semantics) is covered by
MessagesWithBatchDatabaseContractTest.
+ */
+final class MessagesWithBatchTest extends SimpleSpec {
+
+ private implicit val system: ActorSystem =
ActorSystem("MessagesWithBatchTest")
+ private implicit val materializer: Materializer =
SystemMaterializer(system).materializer
+
+ private val persistenceId = "pid"
+
+ // SimpleSpec leaves ScalaFutures' default 150ms patience in place, too
tight for CI
+ private implicit val pc: PatienceConfig = PatienceConfig(timeout =
10.seconds)
+
+ override def afterAll(): Unit = {
+ system.terminate().futureValue
+ super.afterAll()
+ }
+
+ "messagesWithBatch" should "emit all messages when a single missing message
lies within the batch window" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 3L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 3, batchSize = 2)
+
+ emitted shouldBe Seq(1L, 3L)
+ }
+
+ it should "emit all messages when a single missing message lies within the
batch window of size one" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 3L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 3, batchSize = 1)
+
+ emitted shouldBe Seq(1L, 3L)
+ }
+
+ it should "emit all messages when a gap is wider than the batch size" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 4, batchSize = 1)
+
+ emitted shouldBe Seq(1L, 4L)
+ }
+
+ it should "emit all messages when a batch window contains fewer live
messages than the batch size" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L, 4L, 6L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 6, batchSize = 2)
+
+ emitted shouldBe Seq(1L, 2L, 4L, 6L)
+ }
+
+ it should "emit messages beyond a gap wider than the batch size when polling
with a refresh interval" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+
+ val emitted =
+ emittedSequenceNrs(dao, toSequenceNr = 4, batchSize = 1, count =
Some(2), refreshInterval = Some(50.millis))
+
+ emitted shouldBe Seq(1L, 4L)
+ }
+
+ it should "emit all messages when messages at the start of the journal are
missing" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(3L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 3, batchSize = 1)
+
+ emitted shouldBe Seq(3L)
+ }
+
+ it should "emit all messages when a gap is wider than the batch size up to
an unbounded toSequenceNr" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = Long.MaxValue,
batchSize = 1)
+
+ emitted shouldBe Seq(1L, 4L)
+ }
+
+ it should "emit messages beyond a gap when polling up to an unbounded
toSequenceNr" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+
+ val emitted =
+ emittedSequenceNrs(dao, toSequenceNr = Long.MaxValue, batchSize = 1,
count = Some(2),
+ refreshInterval = Some(50.millis))
+
+ emitted shouldBe Seq(1L, 4L)
+ }
+
+ it should "emit all messages when the batch size is zero" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 2, batchSize = 0)
+
+ emitted shouldBe Seq(1L, 2L)
+ }
+
+ it should "emit all messages across multiple gaps wider than the batch size"
in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L, 5L, 8L, 9L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 9, batchSize = 1)
+
+ emitted shouldBe Seq(1L, 4L, 5L, 8L, 9L)
+ }
+
+ it should "keep emitting newly appended messages after crossing a gap in a
live stream" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L))
+ val probe = sequenceNrProbe(dao, toSequenceNr = Long.MaxValue, batchSize =
1, refreshInterval = Some(50.millis))
+
+ probe.request(4)
+ probe.expectNext(1L, 4L)
+ dao.append(5L, 6L)
+
+ probe.expectNext(5L, 6L)
+ probe.cancel()
+ }
+
+ it should "emit only newly appended messages when polling resumes after a
short non-empty batch" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L))
+ val probe = sequenceNrProbe(dao, toSequenceNr = Long.MaxValue, batchSize =
2, refreshInterval = Some(50.millis))
+
+ probe.request(3)
+ probe.expectNext(1L)
+ dao.append(2L, 3L)
+
+ probe.expectNext(2L, 3L)
+ probe.cancel()
+ }
+
+ it should "complete after emitting all messages when the journal ends with a
gap" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 4, batchSize = 2)
+
+ emitted shouldBe Seq(1L, 2L)
+ }
+
+ it should
+ "keep polling without emitting or completing when a live stream reaches a
trailing gap below toSequenceNr" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+ val probe = sequenceNrProbe(dao, toSequenceNr = 4, batchSize = 2,
refreshInterval = Some(50.millis))
+
+ probe.request(4)
+ probe.expectNext(1L, 2L)
+
+ probe.expectNoMessage(500.millis)
+ probe.cancel()
+ }
+
+ it should "complete a bounded live stream once appended messages reach
toSequenceNr" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+ val probe = sequenceNrProbe(dao, toSequenceNr = 4, batchSize = 2,
refreshInterval = Some(50.millis))
+
+ probe.request(5)
+ probe.expectNext(1L, 2L)
+ dao.append(3L, 4L)
+
+ probe.expectNext(3L, 4L)
+ probe.expectComplete()
+ }
+
+ it should "emit messages with sequence numbers near Long.MaxValue without
overflowing the batch window" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(Long.MaxValue - 2,
Long.MaxValue - 1))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = Long.MaxValue,
batchSize = 1,
+ fromSequenceNr = Long.MaxValue - 2)
+
+ emitted shouldBe Seq(Long.MaxValue - 2, Long.MaxValue - 1)
+ }
+
+ it should "complete with no messages when the journal is empty" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq.empty)
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 3, batchSize = 1)
+
+ emitted shouldBe empty
+ }
+
+ it should "emit messages from a fromSequenceNr that lies inside a gap" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 4L, 5L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 5, batchSize = 1,
fromSequenceNr = 2)
+
+ emitted shouldBe Seq(4L, 5L)
+ }
+
+ // The read journal defaults fromSequenceNr to 0, below the journal's
1-based numbering
+ it should "emit all messages when fromSequenceNr is zero" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 2, batchSize = 1,
fromSequenceNr = 0)
+
+ emitted shouldBe Seq(1L, 2L)
+ }
+
+ it should "complete without emitting when fromSequenceNr exceeds
toSequenceNr" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L, 3L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 2, batchSize = 1,
fromSequenceNr = 3)
+
+ emitted shouldBe empty
+ }
+
+ it should "not emit messages beyond toSequenceNr when crossing a gap" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 5L, 6L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 4, batchSize = 1)
+
+ emitted shouldBe Seq(1L)
+ }
+
+ it should "emit all messages when single-message gaps occur in separate
batch windows" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 3L, 4L, 6L, 7L,
9L))
+
+ val emitted = emittedSequenceNrs(dao, toSequenceNr = 9, batchSize = 1)
+
+ emitted shouldBe Seq(1L, 3L, 4L, 6L, 7L, 9L)
+ }
+
+ it should "fail the stream when the last message of a batch cannot be read"
in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L),
corruptSequenceNrs = Set(2L))
+
+ val result = dao
+ .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr = 2,
batchSize = 2, refreshInterval = None)
+ .runWith(Sink.seq)
+
+ result.failed.futureValue shouldBe a[CorruptMessageException]
+ }
+
+ // Only a Failure in the last position fails the stream (the next query
needs its sequence
+ // number); a mid-batch Failure is passed through for the consumer to handle
+ it should "emit an unreadable message as a failed element followed by the
remaining messages" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L, 3L),
corruptSequenceNrs = Set(2L))
+
+ val emitted = dao
+ .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr = 3,
batchSize = 3, refreshInterval = None)
+ .runWith(Sink.seq)
+ .futureValue
+
+ emitted.map(_.toOption.map { case (repr, _) => repr.sequenceNr }) shouldBe
Seq(Some(1L), None, Some(3L))
+ }
+
+ // The first batch query must span the entire remaining range, so a journal
whose head is
+ // deleted (e.g. purged up to a snapshot) is crossed in a single round trip
instead of an
+ // empty windowed query followed by a full-range one.
+ it should "cross a leading gap with a single query spanning the full
remaining range" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(100L, 101L, 102L))
+
+ dao
+ .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr =
102, batchSize = 3, refreshInterval = None)
+ .runWith(Sink.seq)
+ .futureValue
+
+ dao.queriedRanges shouldBe Seq(1L -> 102L)
+ }
+
+ // Once a full batch shows the journal to be dense, subsequent queries must
be bounded
+ // windows rather than full-range scans: the perf safeguard introduced by PR
#180
+ it should "query bounded windows after a full batch shows the journal to be
dense" in {
+ val dao = new InMemoryJournalDao(liveSequenceNrs = Seq(1L, 2L, 3L, 4L, 5L,
6L))
+
+ dao
+ .messagesWithBatch(persistenceId, fromSequenceNr = 1, toSequenceNr = 6,
batchSize = 2, refreshInterval = None)
+ .runWith(Sink.seq)
+ .futureValue
+
+ dao.queriedRanges shouldBe Seq(1L -> 6L, 3L -> 5L, 5L -> 6L)
+ }
+
+ /**
+ * Collects the sequence numbers the batched message stream emits. A polling
stream never
+ * completes on its own, so pass `count` to bound it.
+ */
+ private def emittedSequenceNrs(dao: InMemoryJournalDao, toSequenceNr: Long,
batchSize: Int,
+ fromSequenceNr: Long =
+ 1, refreshInterval: Option[FiniteDuration] = None, count: Option[Int]
= None): Seq[Long] = {
+ val sequenceNrs = sequenceNrSource(dao, fromSequenceNr, toSequenceNr,
batchSize, refreshInterval)
+ count
+ .fold(sequenceNrs)(sequenceNrs.take(_))
+ .completionTimeout(10.seconds)
+ .runWith(Sink.seq)
+ .futureValue
+ }
+
+ private def sequenceNrProbe(dao: InMemoryJournalDao, toSequenceNr: Long,
batchSize: Int,
+ refreshInterval: Option[FiniteDuration]): TestSubscriber.Probe[Long] =
+ sequenceNrSource(dao, fromSequenceNr = 1, toSequenceNr, batchSize,
refreshInterval)
+ .runWith(TestSink[Long]())
+
+ private def sequenceNrSource(dao: InMemoryJournalDao, fromSequenceNr: Long,
toSequenceNr: Long,
+ batchSize: Int, refreshInterval: Option[FiniteDuration]): Source[Long,
NotUsed] =
+ dao
+ .messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr,
batchSize,
+ refreshInterval.map(_ -> system.scheduler))
+ .mapAsync(1)(Future.fromTry)
+ .map { case (repr, _) => repr.sequenceNr }
+
+ /**
+ * Serves batches from an in-memory set of live sequence numbers, honoring
the contract of
+ * `messages`: ascending order, inclusive bounds, at most `max` elements.
Sequence numbers
+ * in `corruptSequenceNrs` yield Failure elements, as unreadable rows do.
Records each
+ * queried range and supports appends for live-stream tests.
+ */
+ private final class InMemoryJournalDao(liveSequenceNrs: Seq[Long],
corruptSequenceNrs: Set[Long] = Set.empty)
+ extends BaseJournalDaoWithReadMessages {
+ implicit val ec: ExecutionContext = system.dispatcher
+ implicit val mat: Materializer = materializer
+
+ @volatile private var sequenceNrs: Vector[Long] =
liveSequenceNrs.sorted.toVector
+ private val ranges = new ConcurrentLinkedQueue[(Long, Long)]
+
+ def queriedRanges: Seq[(Long, Long)] = ranges.asScala.toSeq
+
+ def append(appendedSequenceNrs: Long*): Unit =
+ sequenceNrs = (sequenceNrs ++ appendedSequenceNrs).sorted
+
+ override def messages(
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long,
+ max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = {
+ ranges.add(fromSequenceNr -> toSequenceNr)
+ val batch = sequenceNrs
+ .filter(sequenceNr => sequenceNr >= fromSequenceNr && sequenceNr <=
toSequenceNr)
+ .take(max.toInt)
+ .map { sequenceNr =>
+ if (corruptSequenceNrs.contains(sequenceNr)) Failure(new
CorruptMessageException(sequenceNr))
+ else Success(PersistentRepr("payload", sequenceNr, persistenceId) ->
sequenceNr)
+ }
+ Source(batch.toList)
+ }
+ }
+
+ private final class CorruptMessageException(sequenceNr: Long)
+ extends RuntimeException(s"Cannot read message $sequenceNr")
+}
diff --git
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MessagesWithBatchDatabaseContractTest.scala
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MessagesWithBatchDatabaseContractTest.scala
new file mode 100644
index 00000000..c8e7cb53
--- /dev/null
+++
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MessagesWithBatchDatabaseContractTest.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.integration
+
+import
org.apache.pekko.persistence.jdbc.journal.dao.MessagesWithBatchDatabaseContractTest
+import org.apache.pekko.persistence.jdbc.query.{
+ MariaDBCleaner,
+ MysqlCleaner,
+ OracleCleaner,
+ PostgresCleaner,
+ SqlServerCleaner
+}
+
+final class PostgresMessagesWithBatchDatabaseContractTest
+ extends MessagesWithBatchDatabaseContractTest("postgres-application.conf")
+ with PostgresCleaner
+
+final class MySQLMessagesWithBatchDatabaseContractTest
+ extends MessagesWithBatchDatabaseContractTest("mysql-application.conf")
+ with MysqlCleaner
+
+final class MariaDBMessagesWithBatchDatabaseContractTest
+ extends MessagesWithBatchDatabaseContractTest("mariadb-application.conf")
+ with MariaDBCleaner
+
+final class OracleMessagesWithBatchDatabaseContractTest
+ extends MessagesWithBatchDatabaseContractTest("oracle-application.conf")
+ with OracleCleaner
+
+final class SqlServerMessagesWithBatchDatabaseContractTest
+ extends MessagesWithBatchDatabaseContractTest("sqlserver-application.conf")
+ with SqlServerCleaner
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]