This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 2433475 Port akka-persistence-r2dbc PRs #345 and #349: backtracking
improvements (#357)
2433475 is described below
commit 24334751c32445400ef9fa5024a3b2ede077f4a4
Author: PJ Fanning <[email protected]>
AuthorDate: Wed May 6 13:17:29 2026 +0100
Port akka-persistence-r2dbc PRs #345 and #349: backtracking improvements
(#357)
* Port akka PRs #345 and #349: backtracking improvements
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/851bb153-b88c-4506-b57b-20bf731c7219
Co-authored-by: pjfanning <[email protected]>
* Create backtracking-improvements.excludes
* Update backtracking-improvements.excludes
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../backtracking-improvements.excludes | 25 ++++++++++
.../persistence/r2dbc/internal/BySliceQuery.scala | 33 ++++++++++---
.../query/EventsBySliceBacktrackingSpec.scala | 57 ++++++++++++++++++++++
.../r2dbc/query/EventsBySlicePerfSpec.scala | 3 +-
4 files changed, 111 insertions(+), 7 deletions(-)
diff --git
a/core/src/main/mima-filters/2.0.x.backwards.excludes/backtracking-improvements.excludes
b/core/src/main/mima-filters/2.0.x.backwards.excludes/backtracking-improvements.excludes
new file mode 100644
index 0000000..9673c35
--- /dev/null
+++
b/core/src/main/mima-filters/2.0.x.backwards.excludes/backtracking-improvements.excludes
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by https://github.com/apache/pekko-persistence-r2dbc/pull/357
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.copy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.unapply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.copy*")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState._6")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState._7")
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
index 9be7ae7..dc5515d 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
@@ -43,12 +43,13 @@ import org.slf4j.Logger
object QueryState {
val empty: QueryState =
- QueryState(TimestampOffset.Zero, 0, 0, 0, backtrackingCount = 0,
TimestampOffset.Zero, Buckets.empty)
+ QueryState(TimestampOffset.Zero, 0, 0, 0, 0, backtrackingCount = 0,
TimestampOffset.Zero, Buckets.empty)
}
final case class QueryState(
latest: TimestampOffset,
rowCount: Int,
+ rowCountSinceBacktracking: Long,
queryCount: Long,
idleCount: Long,
backtrackingCount: Int,
@@ -348,8 +349,7 @@ import org.slf4j.Logger
}
def switchFromBacktracking(state: QueryState): Boolean = {
- // backtrackingCount is for fairness, to not run too many backtracking
queries in a row
- state.backtracking && (state.backtrackingCount >= 3 || state.rowCount <
settings.bufferSize - 1)
+ state.backtracking && state.rowCount < settings.bufferSize - 1
}
def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope,
NotUsed]]) = {
@@ -357,11 +357,15 @@ import org.slf4j.Logger
val newState =
if (settings.backtrackingEnabled && !state.backtracking &&
state.latest != TimestampOffset.Zero &&
(newIdleCount >= 5 ||
+ state.rowCountSinceBacktracking + state.rowCount >=
settings.bufferSize * 3 ||
JDuration
.between(state.latestBacktracking.timestamp,
state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)) {
// FIXME config for newIdleCount >= 5 and maybe something like
`newIdleCount % 5 == 0`
+ // Note that when starting the query with offset = NoOffset it will
switch to backtracking immediately after
+ // the first normal query because
between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow
+
// switching to backtracking
val fromOffset =
if (state.latestBacktracking == TimestampOffset.Zero)
@@ -371,18 +375,25 @@ import org.slf4j.Logger
state.copy(
rowCount = 0,
+ rowCountSinceBacktracking = 0,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = 1,
latestBacktracking = fromOffset)
} else if (switchFromBacktracking(state)) {
// switch from backtracking
- state.copy(rowCount = 0, queryCount = state.queryCount + 1,
idleCount = newIdleCount, backtrackingCount = 0)
+ state.copy(
+ rowCount = 0,
+ rowCountSinceBacktracking = 0,
+ queryCount = state.queryCount + 1,
+ idleCount = newIdleCount,
+ backtrackingCount = 0)
} else {
// continue
val newBacktrackingCount = if (state.backtracking)
state.backtrackingCount + 1 else 0
state.copy(
rowCount = 0,
+ rowCountSinceBacktracking = state.rowCountSinceBacktracking +
state.rowCount,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = newBacktrackingCount)
@@ -395,12 +406,21 @@ import org.slf4j.Logger
val fromTimestamp = newState.nextQueryFromTimestamp
val toTimestamp = newState.nextQueryToTimestamp(settings.bufferSize)
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
+ val backtrackingInfo =
+ if (newState.backtracking && !state.backtracking)
+ s" switching to backtracking mode,
[${state.rowCountSinceBacktracking + state.rowCount}] events behind,"
+ else if (!newState.backtracking && state.backtracking)
+ " switching from backtracking mode,"
+ else if (newState.backtracking && state.backtracking)
+ " in backtracking mode,"
+ else
+ ""
log.debug(
"{} next query [{}]{} from slices [{} - {}], between time [{} - {}].
{}",
logPrefix,
newState.queryCount: java.lang.Long,
- if (newState.backtracking) " in backtracking mode" else "",
+ backtrackingInfo,
minSlice: java.lang.Integer,
maxSlice: java.lang.Integer,
fromTimestamp,
@@ -408,6 +428,7 @@ import org.slf4j.Logger
if (newIdleCount >= 3) s"Idle in [$newIdleCount] queries."
else if (state.backtracking) s"Found [${state.rowCount}] rows in
previous backtracking query."
else s"Found [${state.rowCount}] rows in previous query.")
+ }
newState ->
Some(
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index b32d13c..ad655b5 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -21,6 +21,7 @@ import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorSystem
import pekko.persistence.query.NoOffset
+import pekko.persistence.query.Offset
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.r2dbc.Dialect
@@ -162,6 +163,62 @@ class EventsBySliceBacktrackingSpec
env9.persistenceId shouldBe pid1
env9.sequenceNr shouldBe 4L
}
+
+ "emit from backtracking after first normal query" in {
+ val entityType = nextEntityType()
+ val pid1 = nextPid(entityType)
+ val pid2 = nextPid(entityType)
+ val slice1 = query.sliceForPersistenceId(pid1)
+ val slice2 = query.sliceForPersistenceId(pid2)
+ val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
+
+ // don't let behind-current-time be a reason for not finding events
+ val startTime = Instant.now().minusSeconds(10 * 60)
+
+ writeEvent(slice1, pid1, 1L, startTime, "e1-1")
+ writeEvent(slice1, pid1, 2L, startTime.plusMillis(2), "e1-2")
+ writeEvent(slice1, pid1, 3L, startTime.plusMillis(4), "e1-3")
+
+ def startQuery(offset: Offset):
TestSubscriber.Probe[EventEnvelope[String]] =
+ query
+ .eventsBySlices[String](entityType, 0, persistenceExt.numberOfSlices
- 1, offset)
+ .runWith(sinkProbe)
+ .request(100)
+
+ def expect(env: EventEnvelope[String], pid: String, seqNr: Long,
eventOption: Option[String]): Offset = {
+ env.persistenceId shouldBe pid
+ env.sequenceNr shouldBe seqNr
+ env.eventOption shouldBe eventOption
+ env.offset
+ }
+
+ val result1 = startQuery(NoOffset)
+ expect(result1.expectNext(), pid1, 1L, Some("e1-1"))
+ expect(result1.expectNext(), pid1, 2L, Some("e1-2"))
+ expect(result1.expectNext(), pid1, 3L, Some("e1-3"))
+
+ // first backtracking query kicks in immediately after the first normal
query has finished
+ // and it also emits duplicates (by design)
+ expect(result1.expectNext(), pid1, 1L, None)
+ expect(result1.expectNext(), pid1, 2L, None)
+ val offset1 = expect(result1.expectNext(), pid1, 3L, None)
+ result1.cancel()
+
+ // write delayed events from pid2
+ writeEvent(slice2, pid2, 1L, startTime.plusMillis(1), "e2-1")
+ writeEvent(slice2, pid2, 2L, startTime.plusMillis(3), "e2-2")
+ writeEvent(slice2, pid2, 3L, startTime.plusMillis(5), "e2-3")
+
+ val result2 = startQuery(offset1)
+ // backtracking
+ expect(result2.expectNext(), pid1, 1L, None)
+ expect(result2.expectNext(), pid2, 1L, None)
+ expect(result2.expectNext(), pid1, 2L, None)
+ expect(result2.expectNext(), pid2, 2L, None)
+ expect(result2.expectNext(), pid1, 3L, None)
+ // from normal query
+ expect(result2.expectNext(), pid2, 3L, Some("e2-3"))
+ }
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
index 85b045b..5b12b34 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
@@ -49,11 +49,12 @@ class EventsBySlicePerfSpec
"retrieve from several slices" in {
// increase these properties for "real" testing
+ // also, remove LogCapturing and change logback log levels for "real"
testing
val numberOfPersisters = 30
val numberOfEvents = 5
val writeConcurrency = 10
val numberOfSliceRanges = 4
- val iterations = 3
+ val iterations = 2
val totalNumberOfEvents = numberOfPersisters * numberOfEvents
val entityType = nextEntityType()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]