This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new e9bfe50  copy over changes from akka projections 1.4.1 (#504)
e9bfe50 is described below

commit e9bfe5051b814670b9b62650071271107b830e55
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 30 13:01:46 2026 +0100

    copy over changes from akka projections 1.4.1 (#504)
    
    * Port akka-projection PRs #898 and #908: batch offset inserts and fix 
grouped async offsets
    
    * scalafmt
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
---
 .../internal/InternalProjectionState.scala         | 10 +++-
 .../pekko/projection/internal/OffsetStrategy.scala |  7 +++
 .../jdbc/internal/JdbcProjectionImpl.scala         |  8 +--
 r2dbc/src/main/resources/reference.conf            |  7 +++
 .../projection/r2dbc/R2dbcProjectionSettings.scala | 27 +++++++---
 .../r2dbc/internal/R2dbcOffsetStore.scala          | 60 ++++++++++++++++------
 .../r2dbc/internal/R2dbcProjectionImpl.scala       | 19 ++++---
 .../r2dbc/scaladsl/R2dbcProjection.scala           |  4 +-
 .../slick/internal/SlickProjectionImpl.scala       |  6 ++-
 9 files changed, 105 insertions(+), 43 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
 
b/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
index 904b64b..904faa0 100644
--- 
a/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
+++ 
b/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
@@ -236,7 +236,10 @@ private[projection] abstract class 
InternalProjectionState[Offset, Envelope](
     }
   }
 
-  private def exactlyOnceProcessing(
+  /**
+   * ExactlyOnce or async store of offset by the handler adapter.
+   */
+  private def offsetStoredByHandlerProcessing(
       source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed],
       recoveryStrategy: HandlerRecoveryStrategy): Source[Done, NotUsed] = {
 
@@ -444,7 +447,10 @@ private[projection] abstract class 
InternalProjectionState[Offset, Envelope](
     val composedSource: Source[Done, NotUsed] =
       offsetStrategy match {
         case ExactlyOnce(recoveryStrategyOpt) =>
-          exactlyOnceProcessing(source, 
recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
+          offsetStoredByHandlerProcessing(source, 
recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
+
+        case OffsetStoredByHandler(recoveryStrategyOpt) =>
+          offsetStoredByHandlerProcessing(source, 
recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
 
         case AtLeastOnce(afterEnvelopesOpt, orAfterDurationOpt, 
recoveryStrategyOpt) =>
           atLeastOnceProcessing(
diff --git 
a/core/src/main/scala/org/apache/pekko/projection/internal/OffsetStrategy.scala 
b/core/src/main/scala/org/apache/pekko/projection/internal/OffsetStrategy.scala
index 9c8ff4c..a693690 100644
--- 
a/core/src/main/scala/org/apache/pekko/projection/internal/OffsetStrategy.scala
+++ 
b/core/src/main/scala/org/apache/pekko/projection/internal/OffsetStrategy.scala
@@ -56,6 +56,13 @@ private[projection] final case class AtLeastOnce(
     recoveryStrategy: Option[HandlerRecoveryStrategy] = None)
     extends OffsetStrategy
 
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[projection] final case class OffsetStoredByHandler(recoveryStrategy: 
Option[HandlerRecoveryStrategy] = None)
+    extends OffsetStrategy
+
 /**
  * INTERNAL API
  */
diff --git 
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
 
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
index 7fbfd1a..3057e84 100644
--- 
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
+++ 
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
@@ -39,6 +39,7 @@ import pekko.projection.internal.HandlerStrategy
 import pekko.projection.internal.InternalProjection
 import pekko.projection.internal.InternalProjectionState
 import pekko.projection.internal.ManagementState
+import pekko.projection.internal.OffsetStoredByHandler
 import pekko.projection.internal.OffsetStrategy
 import pekko.projection.internal.ProjectionSettings
 import pekko.projection.internal.SettingsImpl
@@ -206,7 +207,7 @@ private[projection] class JdbcProjectionImpl[Offset, 
Envelope, S <: JdbcSession]
       .copy(afterEnvelopes = Some(afterEnvelopes), orAfterDuration = 
Some(afterDuration)))
 
   /**
-   * Settings for GroupedSlickProjection
+   * Settings for GroupedJdbcProjection
    */
   override def withGroup(
       groupAfterEnvelopes: Int,
@@ -218,8 +219,9 @@ private[projection] class JdbcProjectionImpl[Offset, 
Envelope, S <: JdbcSession]
   override def withRecoveryStrategy(
       recoveryStrategy: HandlerRecoveryStrategy): JdbcProjectionImpl[Offset, 
Envelope, S] = {
     val newStrategy = offsetStrategy match {
-      case s: ExactlyOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
-      case s: AtLeastOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
+      case s: ExactlyOnce           => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
+      case s: AtLeastOnce           => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
+      case s: OffsetStoredByHandler => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
       // NOTE: AtMostOnce has its own withRecoveryStrategy variant
       // this method is not available for AtMostOnceProjection
       case s: AtMostOnce => s
diff --git a/r2dbc/src/main/resources/reference.conf 
b/r2dbc/src/main/resources/reference.conf
index 6f8ff0b..7ca39ce 100644
--- a/r2dbc/src/main/resources/reference.conf
+++ b/r2dbc/src/main/resources/reference.conf
@@ -36,6 +36,13 @@ pekko.projection.r2dbc {
     # Remove old entries outside the time-window from the offset store database
     # with this frequency.
     delete-interval = 1 minute
+
+    # Batch size for inserts of timestamp offsets. Can be used to improve 
throughput for projections
+    # with many events per persistence id and that is using 
`groupedWithinAsync`.
+    # For Postgres (and Yugabyte) this will use multi-row inserts for this 
number of records. For MySQL
+    # this will use multiple single row inserts.
+    # Use 0 to disable batching and always use single row inserts.
+    offset-batch-size = 20
   }
 
   # By default it shares connection-factory with pekko-persistence-r2dbc 
(write side),
diff --git 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
index 954b1cd..1f5aa2e 100644
--- 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
+++ 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
@@ -48,7 +48,8 @@ object R2dbcProjectionSettings {
       evictInterval = config.getDuration("offset-store.evict-interval"),
       deleteInterval = config.getDuration("offset-store.delete-interval"),
       logDbCallsExceeding,
-      warnAboutFilteredEventsInFlow = 
config.getBoolean("warn-about-filtered-events-in-flow")
+      warnAboutFilteredEventsInFlow = 
config.getBoolean("warn-about-filtered-events-in-flow"),
+      offsetBatchSize = config.getInt("offset-store.offset-batch-size")
     )
   }
 
@@ -100,7 +101,8 @@ object R2dbcProjectionSettings {
     evictInterval,
     deleteInterval,
     logDbCallsExceeding,
-    warnAboutFilteredEventsInFlow
+    warnAboutFilteredEventsInFlow,
+    offsetBatchSize = 10
   )
 }
 
@@ -116,12 +118,13 @@ final class R2dbcProjectionSettings private (
     val evictInterval: JDuration,
     val deleteInterval: JDuration,
     val logDbCallsExceeding: FiniteDuration,
-    val warnAboutFilteredEventsInFlow: Boolean
+    val warnAboutFilteredEventsInFlow: Boolean,
+    val offsetBatchSize: Int
 ) extends Serializable {
 
   override def toString: String =
     s"R2dbcProjectionSettings($dialect, $schema, $offsetTable, 
$timestampOffsetTable, $managementTable, " +
-    s"$useConnectionFactory, $timeWindow, $keepNumberOfEntries, 
$evictInterval, $deleteInterval, $logDbCallsExceeding, 
$warnAboutFilteredEventsInFlow)"
+    s"$useConnectionFactory, $timeWindow, $keepNumberOfEntries, 
$evictInterval, $deleteInterval, $logDbCallsExceeding, 
$warnAboutFilteredEventsInFlow, $offsetBatchSize)"
 
   override def equals(other: Any): Boolean =
     other match {
@@ -132,7 +135,8 @@ final class R2dbcProjectionSettings private (
         timeWindow == that.timeWindow && keepNumberOfEntries == 
that.keepNumberOfEntries &&
         evictInterval == that.evictInterval && deleteInterval == 
that.deleteInterval &&
         logDbCallsExceeding == that.logDbCallsExceeding &&
-        warnAboutFilteredEventsInFlow == that.warnAboutFilteredEventsInFlow
+        warnAboutFilteredEventsInFlow == that.warnAboutFilteredEventsInFlow &&
+        offsetBatchSize == that.offsetBatchSize
       case _ => false
     }
 
@@ -149,7 +153,8 @@ final class R2dbcProjectionSettings private (
       evictInterval,
       deleteInterval,
       logDbCallsExceeding,
-      warnAboutFilteredEventsInFlow
+      warnAboutFilteredEventsInFlow,
+      offsetBatchSize
     )
     val h = values.foldLeft(MurmurHash3.productSeed) { case (h, value) =>
       MurmurHash3.mix(h, value.##)
@@ -169,7 +174,8 @@ final class R2dbcProjectionSettings private (
       evictInterval: JDuration = evictInterval,
       deleteInterval: JDuration = deleteInterval,
       logDbCallsExceeding: FiniteDuration = logDbCallsExceeding,
-      warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow
+      warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
+      offsetBatchSize: Int = offsetBatchSize
   ): R2dbcProjectionSettings =
     new R2dbcProjectionSettings(
       dialect,
@@ -183,7 +189,8 @@ final class R2dbcProjectionSettings private (
       evictInterval,
       deleteInterval,
       logDbCallsExceeding,
-      warnAboutFilteredEventsInFlow
+      warnAboutFilteredEventsInFlow,
+      offsetBatchSize
     )
 
   def withDialect(dialect: Dialect): R2dbcProjectionSettings =
@@ -222,6 +229,10 @@ final class R2dbcProjectionSettings private (
   def withWarnAboutFilteredEventsInFlow(warnAboutFilteredEventsInFlow: 
Boolean): R2dbcProjectionSettings =
     copy(warnAboutFilteredEventsInFlow = warnAboutFilteredEventsInFlow)
 
+  /** @since 2.0.0 */
+  def withOffsetBatchSize(offsetBatchSize: Int): R2dbcProjectionSettings =
+    copy(offsetBatchSize = offsetBatchSize)
+
   val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
offsetTable
   val timestampOffsetTableWithSchema: String = schema.map(_ + 
".").getOrElse("") + timestampOffsetTable
   val managementTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
managementTable
diff --git 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index e7f14a0..a8d4231 100644
--- 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++ 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -255,6 +255,14 @@ private[projection] class R2dbcOffsetStore(
     (projection_name, projection_key, slice, persistence_id, seq_nr, 
timestamp_offset, timestamp_consumed)
     VALUES (?,?,?,?,?,?, $timestampSql)"""
 
+  private val insertTimestampOffsetBatchSql: String = {
+    val values = (1 to settings.offsetBatchSize).map(_ => s"(?,?,?,?,?,?, 
$timestampSql)").mkString(", ")
+    sql"""
+    INSERT INTO $timestampOffsetTable
+    (projection_name, projection_key, slice, persistence_id, seq_nr, 
timestamp_offset, timestamp_consumed)
+    VALUES $values"""
+  }
+
   // delete less than a timestamp
   private val deleteOldTimestampOffsetSql: String = sql"""
     DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND 
projection_name = ? AND timestamp_offset < ?
@@ -559,7 +567,7 @@ private[projection] class R2dbcOffsetStore(
     }
   }
 
-  protected def bindTimestampOffsetRecord(stmt: Statement, record: Record): 
Statement = {
+  protected def bindTimestampOffsetRecord(stmt: Statement, record: Record, 
bindStartIndex: Int = 0): Statement = {
     val slice = persistenceExt.sliceForPersistenceId(record.pid)
     val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
     val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
@@ -569,12 +577,12 @@ private[projection] class R2dbcOffsetStore(
         s"[$minSlice - $maxSlice] but received slice [$slice] for 
persistenceId [${record.pid}]")
 
     stmt
-      .bind(0, projectionId.name)
-      .bind(1, projectionId.key)
-      .bind(2, slice)
-      .bind(3, record.pid)
-      .bind(4, record.seqNr)
-      .bind(5, record.timestamp)
+      .bind(bindStartIndex, projectionId.name)
+      .bind(bindStartIndex + 1, projectionId.key)
+      .bind(bindStartIndex + 2, slice)
+      .bind(bindStartIndex + 3, record.pid)
+      .bind(bindStartIndex + 4, record.seqNr)
+      .bind(bindStartIndex + 5, record.timestamp)
   }
 
   protected def insertTimestampOffsetInTx(conn: Connection, records: 
immutable.IndexedSeq[Record]): Future[Long] = {
@@ -582,19 +590,39 @@ private[projection] class R2dbcOffsetStore(
 
     logger.trace2("saving timestamp offset [{}], {}", records.last.timestamp, 
records)
 
-    val statement = conn.createStatement(insertTimestampOffsetSql)
-
     if (records.size == 1) {
+      val statement = conn.createStatement(insertTimestampOffsetSql)
       val boundStatement = bindTimestampOffsetRecord(statement, records.head)
       R2dbcExecutor.updateOneInTx(boundStatement)
     } else {
-      // TODO Try Batch without bind parameters for better performance. Risk 
of sql injection for these parameters is low.
-      val boundStatement =
-        records.foldLeft(statement) { (stmt, rec) =>
-          stmt.add()
-          bindTimestampOffsetRecord(stmt, rec)
+      val batchSize = settings.offsetBatchSize
+      val batches = if (batchSize > 0) records.size / batchSize else 0
+      val batchResult = if (batches > 0) {
+        val batchStatements = (0 until batches).map { i =>
+          val stmt = conn.createStatement(insertTimestampOffsetBatchSql)
+          records.slice(i * batchSize, i * batchSize + 
batchSize).zipWithIndex.foreach {
+            case (rec, recIdx) =>
+              bindTimestampOffsetRecord(stmt, rec, recIdx * 6) // 6 bind 
parameters per record
+          }
+          stmt
         }
-      R2dbcExecutor.updateBatchInTx(boundStatement)
+        R2dbcExecutor.updateInTx(batchStatements).map(_.sum)
+      } else Future.successful(0L)
+
+      batchResult.flatMap { batchResultCount =>
+        val remainingRecords = records.drop(batches * batchSize)
+        if (remainingRecords.nonEmpty) {
+          val statement = conn.createStatement(insertTimestampOffsetSql)
+          val boundStatement = remainingRecords.foldLeft(statement) { (stmt, 
rec) =>
+            stmt.add()
+            bindTimestampOffsetRecord(stmt, rec)
+          }
+          // This "batch" statement is not efficient, see issue #897
+          R2dbcExecutor
+            .updateBatchInTx(boundStatement)
+            .map(_ + batchResultCount)(ExecutionContext.parasitic)
+        } else Future.successful(batchResultCount)
+      }
     }
   }
 
@@ -1106,7 +1134,7 @@ private[projection] class R2dbcOffsetStore(
       case change: DurableStateChange[_] if 
change.offset.isInstanceOf[TimestampOffset] =>
         // in case additional types are added
         throw new IllegalArgumentException(
-          s"DurableStateChange [${change.getClass.getName}] not implemented 
yet. Please report bug at 
https://github.com/apache/pekko-persistence-r2dbc/issues";)
+          s"DurableStateChange [${change.getClass.getName}] not implemented 
yet. Please report bug at https://github.com/apache/pekko-projection/issues";)
       case _ => None
     }
   }
diff --git 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index ab8cf1a..18d7fa2 100644
--- 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++ 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -56,6 +56,7 @@ import pekko.projection.internal.HandlerStrategy
 import pekko.projection.internal.InternalProjection
 import pekko.projection.internal.InternalProjectionState
 import pekko.projection.internal.ManagementState
+import pekko.projection.internal.OffsetStoredByHandler
 import pekko.projection.internal.OffsetStrategy
 import pekko.projection.internal.ProjectionContextImpl
 import pekko.projection.internal.ProjectionSettings
@@ -392,17 +393,14 @@ private[projection] object R2dbcProjectionImpl {
           } else {
             Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, 
sourceProvider))).flatMap {
               loadedEnvelopes =>
+                val offsets = 
loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector
                 val filteredEnvelopes = 
loadedEnvelopes.filterNot(isFilteredEvent)
                 if (filteredEnvelopes.isEmpty) {
-                  offsetStore.addInflights(loadedEnvelopes)
-                  FutureDone
+                  offsetStore.saveOffsets(offsets)
                 } else {
-                  delegate
-                    .process(filteredEnvelopes)
-                    .map { _ =>
-                      offsetStore.addInflights(loadedEnvelopes)
-                      Done
-                    }
+                  delegate.process(filteredEnvelopes).flatMap { _ =>
+                    offsetStore.saveOffsets(offsets)
+                  }
                 }
             }
           }
@@ -584,8 +582,9 @@ private[projection] class R2dbcProjectionImpl[Offset, 
Envelope](
   override def withRecoveryStrategy(
       recoveryStrategy: HandlerRecoveryStrategy): R2dbcProjectionImpl[Offset, 
Envelope] = {
     val newStrategy = offsetStrategy match {
-      case s: ExactlyOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
-      case s: AtLeastOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
+      case s: ExactlyOnce           => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
+      case s: AtLeastOnce           => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
+      case s: OffsetStoredByHandler => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
       // NOTE: AtMostOnce has its own withRecoveryStrategy variant
       // this method is not available for AtMostOnceProjection
       case s: AtMostOnce => s
diff --git 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala
 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala
index 5e48a76..c7c7246 100644
--- 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala
+++ 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala
@@ -14,7 +14,6 @@
 package org.apache.pekko.projection.r2dbc.scaladsl
 
 import scala.collection.immutable
-import scala.concurrent.duration.Duration
 
 import org.apache.pekko
 import pekko.Done
@@ -30,6 +29,7 @@ import pekko.projection.internal.ExactlyOnce
 import pekko.projection.internal.FlowHandlerStrategy
 import pekko.projection.internal.GroupedHandlerStrategy
 import pekko.projection.internal.NoopStatusObserver
+import pekko.projection.internal.OffsetStoredByHandler
 import pekko.projection.internal.SingleHandlerStrategy
 import pekko.projection.r2dbc.R2dbcProjectionSettings
 import pekko.projection.r2dbc.internal.R2dbcProjectionImpl
@@ -318,7 +318,7 @@ object R2dbcProjection {
       settingsOpt = None,
       sourceProvider,
       restartBackoffOpt = None,
-      offsetStrategy = AtLeastOnce(afterEnvelopes = Some(1), orAfterDuration = 
Some(Duration.Zero)),
+      offsetStrategy = OffsetStoredByHandler(),
       handlerStrategy = GroupedHandlerStrategy(adaptedHandler),
       NoopStatusObserver,
       offsetStore)
diff --git 
a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
 
b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
index 27c4671..bdb43be 100644
--- 
a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
+++ 
b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
@@ -38,6 +38,7 @@ import pekko.projection.internal.HandlerStrategy
 import pekko.projection.internal.InternalProjection
 import pekko.projection.internal.InternalProjectionState
 import pekko.projection.internal.ManagementState
+import pekko.projection.internal.OffsetStoredByHandler
 import pekko.projection.internal.OffsetStrategy
 import pekko.projection.internal.ProjectionSettings
 import pekko.projection.internal.SettingsImpl
@@ -138,8 +139,9 @@ private[projection] class SlickProjectionImpl[Offset, 
Envelope, P <: JdbcProfile
       recoveryStrategy: HandlerRecoveryStrategy): SlickProjectionImpl[Offset, 
Envelope, P] = {
     val newStrategy =
       offsetStrategy match {
-        case s: ExactlyOnce => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
-        case s: AtLeastOnce => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
+        case s: ExactlyOnce           => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
+        case s: AtLeastOnce           => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
+        case s: OffsetStoredByHandler => s.copy(recoveryStrategy = 
Some(recoveryStrategy))
         // NOTE: AtMostOnce has its own withRecoveryStrategy variant
         // this method is not available for AtMostOnceProjection
         case s: AtMostOnce => s


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to