This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 2433475  Port akka-persistence-r2dbc PRs #345 and #349: backtracking 
improvements (#357)
2433475 is described below

commit 24334751c32445400ef9fa5024a3b2ede077f4a4
Author: PJ Fanning <[email protected]>
AuthorDate: Wed May 6 13:17:29 2026 +0100

    Port akka-persistence-r2dbc PRs #345 and #349: backtracking improvements 
(#357)
    
    * Port akka PRs #345 and #349: backtracking improvements
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/851bb153-b88c-4506-b57b-20bf731c7219
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Create backtracking-improvements.excludes
    
    * Update backtracking-improvements.excludes
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../backtracking-improvements.excludes             | 25 ++++++++++
 .../persistence/r2dbc/internal/BySliceQuery.scala  | 33 ++++++++++---
 .../query/EventsBySliceBacktrackingSpec.scala      | 57 ++++++++++++++++++++++
 .../r2dbc/query/EventsBySlicePerfSpec.scala        |  3 +-
 4 files changed, 111 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/mima-filters/2.0.x.backwards.excludes/backtracking-improvements.excludes
 
b/core/src/main/mima-filters/2.0.x.backwards.excludes/backtracking-improvements.excludes
new file mode 100644
index 0000000..9673c35
--- /dev/null
+++ 
b/core/src/main/mima-filters/2.0.x.backwards.excludes/backtracking-improvements.excludes
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by https://github.com/apache/pekko-persistence-r2dbc/pull/357
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.copy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.unapply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.copy*")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState._6")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState._7")
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
index 9be7ae7..dc5515d 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
@@ -43,12 +43,13 @@ import org.slf4j.Logger
 
   object QueryState {
     val empty: QueryState =
-      QueryState(TimestampOffset.Zero, 0, 0, 0, backtrackingCount = 0, 
TimestampOffset.Zero, Buckets.empty)
+      QueryState(TimestampOffset.Zero, 0, 0, 0, 0, backtrackingCount = 0, 
TimestampOffset.Zero, Buckets.empty)
   }
 
   final case class QueryState(
       latest: TimestampOffset,
       rowCount: Int,
+      rowCountSinceBacktracking: Long,
       queryCount: Long,
       idleCount: Long,
       backtrackingCount: Int,
@@ -348,8 +349,7 @@ import org.slf4j.Logger
     }
 
     def switchFromBacktracking(state: QueryState): Boolean = {
-      // backtrackingCount is for fairness, to not run too many backtracking 
queries in a row
-      state.backtracking && (state.backtrackingCount >= 3 || state.rowCount < 
settings.bufferSize - 1)
+      state.backtracking && state.rowCount < settings.bufferSize - 1
     }
 
     def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, 
NotUsed]]) = {
@@ -357,11 +357,15 @@ import org.slf4j.Logger
       val newState =
         if (settings.backtrackingEnabled && !state.backtracking && 
state.latest != TimestampOffset.Zero &&
           (newIdleCount >= 5 ||
+          state.rowCountSinceBacktracking + state.rowCount >= 
settings.bufferSize * 3 ||
           JDuration
             .between(state.latestBacktracking.timestamp, 
state.latest.timestamp)
             .compareTo(halfBacktrackingWindow) > 0)) {
           // FIXME config for newIdleCount >= 5 and maybe something like 
`newIdleCount % 5 == 0`
 
+          // Note that when starting the query with offset = NoOffset it will 
switch to backtracking immediately after
+          // the first normal query because 
between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow
+
           // switching to backtracking
           val fromOffset =
             if (state.latestBacktracking == TimestampOffset.Zero)
@@ -371,18 +375,25 @@ import org.slf4j.Logger
 
           state.copy(
             rowCount = 0,
+            rowCountSinceBacktracking = 0,
             queryCount = state.queryCount + 1,
             idleCount = newIdleCount,
             backtrackingCount = 1,
             latestBacktracking = fromOffset)
         } else if (switchFromBacktracking(state)) {
           // switch from backtracking
-          state.copy(rowCount = 0, queryCount = state.queryCount + 1, 
idleCount = newIdleCount, backtrackingCount = 0)
+          state.copy(
+            rowCount = 0,
+            rowCountSinceBacktracking = 0,
+            queryCount = state.queryCount + 1,
+            idleCount = newIdleCount,
+            backtrackingCount = 0)
         } else {
           // continue
           val newBacktrackingCount = if (state.backtracking) 
state.backtrackingCount + 1 else 0
           state.copy(
             rowCount = 0,
+            rowCountSinceBacktracking = state.rowCountSinceBacktracking + 
state.rowCount,
             queryCount = state.queryCount + 1,
             idleCount = newIdleCount,
             backtrackingCount = newBacktrackingCount)
@@ -395,12 +406,21 @@ import org.slf4j.Logger
       val fromTimestamp = newState.nextQueryFromTimestamp
       val toTimestamp = newState.nextQueryToTimestamp(settings.bufferSize)
 
-      if (log.isDebugEnabled())
+      if (log.isDebugEnabled()) {
+        val backtrackingInfo =
+          if (newState.backtracking && !state.backtracking)
+            s" switching to backtracking mode, 
[${state.rowCountSinceBacktracking + state.rowCount}] events behind,"
+          else if (!newState.backtracking && state.backtracking)
+            " switching from backtracking mode,"
+          else if (newState.backtracking && state.backtracking)
+            " in backtracking mode,"
+          else
+            ""
         log.debug(
           "{} next query [{}]{} from slices [{} - {}], between time [{} - {}]. 
{}",
           logPrefix,
           newState.queryCount: java.lang.Long,
-          if (newState.backtracking) " in backtracking mode" else "",
+          backtrackingInfo,
           minSlice: java.lang.Integer,
           maxSlice: java.lang.Integer,
           fromTimestamp,
@@ -408,6 +428,7 @@ import org.slf4j.Logger
           if (newIdleCount >= 3) s"Idle in [$newIdleCount] queries."
           else if (state.backtracking) s"Found [${state.rowCount}] rows in 
previous backtracking query."
           else s"Found [${state.rowCount}] rows in previous query.")
+      }
 
       newState ->
       Some(
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index b32d13c..ad655b5 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -21,6 +21,7 @@ import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorSystem
 import pekko.persistence.query.NoOffset
+import pekko.persistence.query.Offset
 import pekko.persistence.query.PersistenceQuery
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.r2dbc.Dialect
@@ -162,6 +163,62 @@ class EventsBySliceBacktrackingSpec
       env9.persistenceId shouldBe pid1
       env9.sequenceNr shouldBe 4L
     }
+
+    "emit from backtracking after first normal query" in {
+      val entityType = nextEntityType()
+      val pid1 = nextPid(entityType)
+      val pid2 = nextPid(entityType)
+      val slice1 = query.sliceForPersistenceId(pid1)
+      val slice2 = query.sliceForPersistenceId(pid2)
+      val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
+
+      // don't let behind-current-time be a reason for not finding events
+      val startTime = Instant.now().minusSeconds(10 * 60)
+
+      writeEvent(slice1, pid1, 1L, startTime, "e1-1")
+      writeEvent(slice1, pid1, 2L, startTime.plusMillis(2), "e1-2")
+      writeEvent(slice1, pid1, 3L, startTime.plusMillis(4), "e1-3")
+
+      def startQuery(offset: Offset): 
TestSubscriber.Probe[EventEnvelope[String]] =
+        query
+          .eventsBySlices[String](entityType, 0, persistenceExt.numberOfSlices 
- 1, offset)
+          .runWith(sinkProbe)
+          .request(100)
+
+      def expect(env: EventEnvelope[String], pid: String, seqNr: Long, 
eventOption: Option[String]): Offset = {
+        env.persistenceId shouldBe pid
+        env.sequenceNr shouldBe seqNr
+        env.eventOption shouldBe eventOption
+        env.offset
+      }
+
+      val result1 = startQuery(NoOffset)
+      expect(result1.expectNext(), pid1, 1L, Some("e1-1"))
+      expect(result1.expectNext(), pid1, 2L, Some("e1-2"))
+      expect(result1.expectNext(), pid1, 3L, Some("e1-3"))
+
+      // first backtracking query kicks in immediately after the first normal 
query has finished
+      // and it also emits duplicates (by design)
+      expect(result1.expectNext(), pid1, 1L, None)
+      expect(result1.expectNext(), pid1, 2L, None)
+      val offset1 = expect(result1.expectNext(), pid1, 3L, None)
+      result1.cancel()
+
+      // write delayed events from pid2
+      writeEvent(slice2, pid2, 1L, startTime.plusMillis(1), "e2-1")
+      writeEvent(slice2, pid2, 2L, startTime.plusMillis(3), "e2-2")
+      writeEvent(slice2, pid2, 3L, startTime.plusMillis(5), "e2-3")
+
+      val result2 = startQuery(offset1)
+      // backtracking
+      expect(result2.expectNext(), pid1, 1L, None)
+      expect(result2.expectNext(), pid2, 1L, None)
+      expect(result2.expectNext(), pid1, 2L, None)
+      expect(result2.expectNext(), pid2, 2L, None)
+      expect(result2.expectNext(), pid1, 3L, None)
+      // from normal query
+      expect(result2.expectNext(), pid2, 3L, Some("e2-3"))
+    }
   }
 
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
index 85b045b..5b12b34 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
@@ -49,11 +49,12 @@ class EventsBySlicePerfSpec
 
     "retrieve from several slices" in {
       // increase these properties for "real" testing
+      // also, remove LogCapturing and change logback log levels for "real" 
testing
       val numberOfPersisters = 30
       val numberOfEvents = 5
       val writeConcurrency = 10
       val numberOfSliceRanges = 4
-      val iterations = 3
+      val iterations = 2
       val totalNumberOfEvents = numberOfPersisters * numberOfEvents
 
       val entityType = nextEntityType()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to