This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 1fb53ea  Port akka-persistence-r2dbc PR #313: Dynamically 
enable/disable publish-events (#353)
1fb53ea is described below

commit 1fb53ea1f9b76e909074c7695d6be1a84c0cc833
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 7 08:53:14 2026 +0100

    Port akka-persistence-r2dbc PR #313: Dynamically enable/disable 
publish-events (#353)
    
    * Port akka-persistence-r2dbc PR #313: Dynamically enable/disable 
publish-events
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/13f4e372-88f0-4596-836c-7c38e6380b19
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix typos in EWMASpec test names
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/13f4e372-88f0-4596-836c-7c38e6380b19
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 core/src/main/resources/reference.conf             | 23 +++++-
 .../pekko/persistence/r2dbc/R2dbcSettings.scala    |  9 +++
 .../pekko/persistence/r2dbc/internal/EWMA.scala    | 84 ++++++++++++++++++++
 .../pekko/persistence/r2dbc/internal/PubSub.scala  | 87 +++++++++++++++------
 .../persistence/r2dbc/internal/EWMASpec.scala      | 89 ++++++++++++++++++++++
 .../query/EventsBySliceBacktrackingSpec.scala      | 11 ++-
 .../r2dbc/query/EventsBySlicePubSubSpec.scala      | 70 +++++++++++++++++
 .../r2dbc/query/EventsBySliceSpec.scala            |  2 +
 docs/src/main/paradox/projection.md                |  2 +-
 docs/src/main/paradox/query.md                     | 20 +++--
 project/Dependencies.scala                         |  3 +
 11 files changed, 365 insertions(+), 35 deletions(-)

diff --git a/core/src/main/resources/reference.conf 
b/core/src/main/resources/reference.conf
index 0344382..f5a5075 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -11,12 +11,27 @@ pekko.persistence.r2dbc {
     # Otherwise it would be a pinned dispatcher, see 
https://github.com/akka/akka/issues/31058
     plugin-dispatcher = "pekko.actor.default-dispatcher"
 
-    # Enable this to reduce latency of eventsBySlices. The persisted events 
will be
-    # published as Pekko messages and consumed directly by running 
eventsBySlices
-    # queries. Tradeoff is more CPU and network resources that are used. The 
events
+    # Set this to off to disable publishing of events as Pekko messages to 
running
+    # eventsBySlices queries.
+    # Tradeoff is more CPU and network resources that are used. The events
     # must still be retrieved from the database, but at a lower polling 
frequency,
     # because delivery of published messages are not guaranteed.
-    publish-events = off
+    # When this feature is enabled it will measure the throughput and 
automatically
+    # disable/enable if the throughput exceeds the configured threshold. See
+    # publish-events-dynamic configuration.
+    publish-events = on
+
+    # When publish-events is enabled it will measure the throughput and 
automatically
+    # disable/enable if the throughput exceeds the configured threshold.
+    # This configuration cannot be defined per journal, but is global for the 
ActorSystem.
+    publish-events-dynamic {
+      # If exponentially weighted moving average of measured throughput 
exceeds this
+      # threshold publishing of events is disabled. It is enabled again when 
lower than
+      # the threshold.
+      throughput-threshold = 300
+      # The interval of the throughput measurements.
+      throughput-collect-interval = 10 seconds
+    }
 
     # replay filter not needed for this plugin
     replay-filter.mode = off
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index 6a5d368..b3978f2 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -291,3 +291,12 @@ trait UseConnnectionFactory {
 
   val useConnectionFactory: String = config.getString("use-connection-factory")
 }
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class PublishEventsDynamicSettings(config: Config) {
+  val throughputThreshold: Int = config.getInt("throughput-threshold")
+  val throughputCollectInterval: FiniteDuration = 
config.getDuration("throughput-collect-interval").toScala
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EWMA.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EWMA.scala
new file mode 100644
index 0000000..0fb10d3
--- /dev/null
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EWMA.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import scala.concurrent.duration.FiniteDuration
+
+import org.apache.pekko.annotation.InternalApi
+
+/**
+ * The exponentially weighted moving average (EWMA) approach captures 
short-term movements in volatility for a
+ * conditional volatility forecasting model. By virtue of its alpha, or decay 
factor, this provides a statistical
+ * streaming data model that is exponentially biased towards newer entries.
+ *
+ * https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+ *
+ * An EWMA only needs the most recent forecast value to be kept, as opposed to 
a standard moving average model.
+ *
+ * Original source code taken from from
+ * 
https://github.com/akka/akka/blob/main/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/EWMA.scala
+ *
+ * @param alpha
+ *   decay factor, sets how quickly the exponential weighting decays for past 
data compared to new data, see
+ *   https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+ *
+ * @param value
+ *   the current exponentially weighted moving average, e.g. Y(n - 1), or, the 
sampled value resulting from the previous
+ *   smoothing iteration. This value is always used as the previous EWMA to 
calculate the new EWMA.
+ */
+@InternalApi private[pekko] final case class EWMA(value: Double, alpha: 
Double) {
+
+  require(0.0 <= alpha && alpha <= 1.0, "alpha must be between 0.0 and 1.0")
+
+  val nanoTime = System.nanoTime()
+
+  /**
+   * Calculates the exponentially weighted moving average for a given 
monitored data set.
+   *
+   * @param xn
+   *   the new data point
+   * @return
+   *   a new EWMA with the updated value
+   */
+  def :+(xn: Double): EWMA = {
+    val newValue = (alpha * xn) + (1 - alpha) * value
+    copy(value = newValue)
+  }
+
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object EWMA {
+
+  /**
+   * math.log(2)
+   */
+  private val LogOf2 = 0.69315
+
+  /**
+   * Calculate the alpha (decay factor) from specified half-life and interval 
between observations. Half-life is the
+   * interval over which the weights decrease by a factor of two. The 
relevance of each data sample is halved for every
+   * passing half-life duration,
+   * i.e. after 4 times the half-life, a data sample's relevance is reduced to 
6% of its original relevance. The initial
+   * relevance of a data sample is given by `1 – 0.5 ^ (collect-interval / 
half-life)`.
+   */
+  def alpha(halfLife: FiniteDuration, collectInterval: FiniteDuration): Double 
= {
+    val halfLifeMillis = halfLife.toMillis
+    require(halfLife.toMillis > 0, "halfLife must be > 0 s")
+    val decayRate = LogOf2 / halfLifeMillis
+    1 - math.exp(-decayRate * collectInterval.toMillis)
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
index 8946f34..f8d9562 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
@@ -17,6 +17,7 @@ import java.net.URLEncoder
 import java.nio.charset.StandardCharsets
 import java.time.Instant
 import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.pekko
 import pekko.actor.typed.ActorRef
@@ -30,12 +31,16 @@ import pekko.persistence.PersistentRepr
 import pekko.persistence.journal.Tagged
 import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.r2dbc.PublishEventsDynamicSettings
 import pekko.persistence.typed.PersistenceId
+import org.slf4j.LoggerFactory
 
 /**
  * INTERNAL API
  */
 @InternalApi private[pekko] object PubSub extends ExtensionId[PubSub] {
+  private val log = LoggerFactory.getLogger(classOf[PubSub])
+
   def createExtension(system: ActorSystem[_]): PubSub = new PubSub(system)
 
   // Java API
@@ -47,9 +52,21 @@ import pekko.persistence.typed.PersistenceId
  * INTERNAL API
  */
 @InternalApi private[pekko] class PubSub(system: ActorSystem[_]) extends 
Extension {
+  import PubSub.log
+
   private val topics = new ConcurrentHashMap[String, ActorRef[Any]]
   private val persistenceExt = Persistence(system)
 
+  private val settings = new PublishEventsDynamicSettings(
+    
system.settings.config.getConfig("pekko.persistence.r2dbc.journal.publish-events-dynamic"))
+  private val throughputCollectIntervalMillis = 
settings.throughputCollectInterval.toMillis
+  private val throughputThreshold = settings.throughputThreshold.toDouble
+  private val throughputSampler =
+    math.min(1000, math.max(1, settings.throughputThreshold / 10)) // 1/10 of 
threshold, but between 1-1000
+  private val throughputCounter = new AtomicLong
+  @volatile private var throughput =
+    EWMA(0.0, EWMA.alpha(settings.throughputCollectInterval * 2, 
settings.throughputCollectInterval))
+
   def eventTopic[Event](entityType: String, slice: Int): 
ActorRef[Topic.Command[EventEnvelope[Event]]] = {
     val name = topicName(entityType, slice)
     topics
@@ -61,29 +78,55 @@ import pekko.persistence.typed.PersistenceId
     URLEncoder.encode(s"r2dbc-$entityType-$slice", 
StandardCharsets.UTF_8.name())
 
   def publish(pr: PersistentRepr, timestamp: Instant): Unit = {
-    val pid = pr.persistenceId
-    val entityType = PersistenceId.extractEntityType(pid)
-    val slice = persistenceExt.sliceForPersistenceId(pid)
-
-    val offset = TimestampOffset(timestamp, timestamp, Map(pid -> 
pr.sequenceNr))
-    val payload =
-      pr.payload match {
-        case Tagged(payload, _) =>
-          // eventsByTag not implemented (see issue #82), but events can still 
be tagged, so we unwrap this tagged event.
-          payload
-
-        case other => other
+
+    val n = throughputCounter.incrementAndGet()
+    if (n % throughputSampler == 0) {
+      val ewma = throughput
+      val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000
+      if (durationMillis >= throughputCollectIntervalMillis) {
+        // doesn't have to be exact so "missed" or duplicate concurrent calls 
don't matter
+        throughputCounter.set(0L)
+        val rps = n * 1000.0 / durationMillis
+        val newEwma = ewma :+ rps
+        throughput = newEwma
+        if (ewma.value < throughputThreshold && newEwma.value >= 
throughputThreshold) {
+          log.info("Disabled publishing of events. Throughput greater than 
[{}] events/s", throughputThreshold)
+        } else if (ewma.value >= throughputThreshold && newEwma.value < 
throughputThreshold) {
+          log.info("Enabled publishing of events. Throughput less than [{}] 
events/s", throughputThreshold)
+        } else {
+          log.debug(
+            "Publishing of events is {}. Throughput is [{}] events/s",
+            if (newEwma.value < throughputThreshold) "enabled" else "disabled",
+            newEwma.value)
+        }
       }
+    }
+
+    if (throughput.value < throughputThreshold) {
+      val pid = pr.persistenceId
+      val entityType = PersistenceId.extractEntityType(pid)
+      val slice = persistenceExt.sliceForPersistenceId(pid)
+
+      val offset = TimestampOffset(timestamp, timestamp, Map(pid -> 
pr.sequenceNr))
+      val payload =
+        pr.payload match {
+          case Tagged(payload, _) =>
+            // eventsByTag not implemented (see issue #82), but events can 
still be tagged, so we unwrap this tagged event.
+            payload
+
+          case other => other
+        }
 
-    val envelope = new EventEnvelope(
-      offset,
-      pid,
-      pr.sequenceNr,
-      Option(payload),
-      timestamp.toEpochMilli,
-      pr.metadata,
-      entityType,
-      slice)
-    eventTopic(entityType, slice) ! Topic.Publish(envelope)
+      val envelope = new EventEnvelope(
+        offset,
+        pid,
+        pr.sequenceNr,
+        Option(payload),
+        timestamp.toEpochMilli,
+        pr.metadata,
+        entityType,
+        slice)
+      eventTopic(entityType, slice) ! Topic.Publish(envelope)
+    }
   }
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/EWMASpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/EWMASpec.scala
new file mode 100644
index 0000000..205ee3b
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/EWMASpec.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import scala.concurrent.duration._
+
+import org.scalatest.TestSuite
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+/**
+ * Original source code taken from
+ * 
https://github.com/akka/akka/blob/main/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/EWMASpec.scala
+ */
+class EWMASpec extends AnyWordSpec with TestSuite with Matchers {
+
+  "EWMA" should {
+
+    "calculate same ewma for constant values" in {
+      val ds = EWMA(value = 100.0, alpha = 0.18) :+
+        100.0 :+ 100.0 :+ 100.0
+      ds.value should ===(100.0 +- 0.001)
+    }
+
+    "calculate correct ewma for normal decay" in {
+      val d0 = EWMA(value = 1000.0, alpha = 2.0 / (1 + 10))
+      d0.value should ===(1000.0 +- 0.01)
+      val d1 = d0 :+ 10.0
+      d1.value should ===(820.0 +- 0.01)
+      val d2 = d1 :+ 10.0
+      d2.value should ===(672.73 +- 0.01)
+      val d3 = d2 :+ 10.0
+      d3.value should ===(552.23 +- 0.01)
+      val d4 = d3 :+ 10.0
+      d4.value should ===(453.64 +- 0.01)
+
+      val dn = (1 to 100).foldLeft(d0)((d, _) => d :+ 10.0)
+      dn.value should ===(10.0 +- 0.1)
+    }
+
+    "calculate ewma for alpha 1.0, max bias towards latest value" in {
+      val d0 = EWMA(value = 100.0, alpha = 1.0)
+      d0.value should ===(100.0 +- 0.01)
+      val d1 = d0 :+ 1.0
+      d1.value should ===(1.0 +- 0.01)
+      val d2 = d1 :+ 57.0
+      d2.value should ===(57.0 +- 0.01)
+      val d3 = d2 :+ 10.0
+      d3.value should ===(10.0 +- 0.01)
+    }
+
+    "calculate alpha from half-life and collect interval" in {
+      // according to 
https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+      val expectedAlpha = 0.1
+      // alpha = 2.0 / (1 + N)
+      val n = 19
+      val halfLife = n.toDouble / 2.8854
+      val collectInterval = 1.second
+      val halfLifeDuration = (halfLife * 1000).millis
+      EWMA.alpha(halfLifeDuration, collectInterval) should ===(expectedAlpha 
+- 0.001)
+    }
+
+    "calculate sane alpha from short half-life" in {
+      val alpha = EWMA.alpha(1.millis, 3.seconds)
+      alpha should be <= 1.0
+      alpha should be >= 0.0
+      alpha should ===(1.0 +- 0.001)
+    }
+
+    "calculate sane alpha from long half-life" in {
+      val alpha = EWMA.alpha(1.day, 3.seconds)
+      alpha should be <= 1.0
+      alpha should be >= 0.0
+      alpha should ===(0.0 +- 0.001)
+    }
+
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index ad655b5..3b244b5 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -35,11 +35,20 @@ import pekko.persistence.typed.PersistenceId
 import pekko.serialization.SerializationExtension
 import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.scaladsl.TestSink
+import com.typesafe.config.ConfigFactory
 import org.scalatest.wordspec.AnyWordSpecLike
 import org.slf4j.LoggerFactory
 
+object EventsBySliceBacktrackingSpec {
+  private val config = ConfigFactory
+    .parseString("""
+    pekko.persistence.r2dbc.journal.publish-events = off
+    """)
+    .withFallback(TestConfig.config)
+}
+
 class EventsBySliceBacktrackingSpec
-    extends ScalaTestWithActorTestKit(TestConfig.config)
+    extends ScalaTestWithActorTestKit(EventsBySliceBacktrackingSpec.config)
     with AnyWordSpecLike
     with TestDbLifecycle
     with TestData
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 47e7f4a..553446b 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -15,9 +15,13 @@ package org.apache.pekko.persistence.r2dbc.query
 
 import java.time.Instant
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.internal.pubsub.TopicImpl
@@ -39,6 +43,7 @@ import pekko.stream.scaladsl.Sink
 import pekko.stream.scaladsl.Source
 import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.typed.scaladsl.ActorFlow
 import com.typesafe.config.Config
 import com.typesafe.config.ConfigFactory
 import org.scalatest.wordspec.AnyWordSpecLike
@@ -50,9 +55,15 @@ object EventsBySlicePubSubSpec {
         .parseString("""
     pekko.persistence.r2dbc {
       journal.publish-events = on
+      journal.publish-events-dynamic {
+        throughput-threshold = 50
+        throughput-collect-interval = 1 second
+      }
+
       # no events from database query, only via pub-sub
       behind-current-time = 5 minutes
     }
+    pekko.actor.testkit.typed.filter-leeway = 20.seconds
     """)
         
.withFallback(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.unresolvedConfig))
     )
@@ -178,6 +189,65 @@ class EventsBySlicePubSubSpec
       out shouldBe List(envA1, envA2, envA3, envB1, envA1, envB2) // envA1 was 
evicted and therefore duplicate
     }
 
+    "dynamically enable/disable publishing based on throughput" in new Setup {
+      import pekko.actor.typed.scaladsl.adapter._
+
+      val consumerProbe = createTestProbe[EventEnvelope[String]]()
+
+      query
+        .eventsBySlices[String](setupEntityType, slice, slice, NoOffset)
+        .runWith(
+          Sink.actorRef(consumerProbe.ref.toClassic, onCompleteMessage = 
"done", onFailureMessage = _.getMessage))
+
+      val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]()
+      eventually {
+        PubSub(typedSystem).eventTopic[String](setupEntityType, slice) ! 
TopicImpl.GetTopicStats(topicStatsProbe.ref)
+        topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 1
+      }
+
+      for (i <- 1 to 10) {
+        persister ! PersistWithAck(s"e-$i", probe.ref)
+        probe.expectMessage(Done)
+      }
+
+      consumerProbe.receiveMessages(10)
+
+      LoggingTestKit.info("Disabled publishing of events").expect {
+        val done1 = Source(11 to 600)
+          .throttle(200, 1.second)
+          .via(ActorFlow.ask[Int, PersistWithAck, Done](1)(persister) { case 
(i, replyTo) =>
+            PersistWithAck(s"e-$i", replyTo)
+          })
+          .runWith(Sink.ignore)
+
+        Await.result(done1, 20.seconds)
+      }
+
+      var count = 0
+      var lookForMore = true
+      while (lookForMore) {
+        try {
+          consumerProbe.receiveMessage(1.second)
+          count += 1
+        } catch {
+          case _: AssertionError => lookForMore = false // timeout
+        }
+      }
+      count should be <= 500
+
+      LoggingTestKit.info("Enabled publishing of events").expect {
+        val done2 = Source(601 to 800)
+          .throttle(20, 1.second)
+          .via(ActorFlow.ask[Int, PersistWithAck, Done](1)(persister) { case 
(i, replyTo) =>
+            PersistWithAck(s"e-$i", replyTo)
+          })
+          .runWith(Sink.ignore)
+
+        Await.result(done2, 20.seconds)
+      }
+
+    }
+
   }
 
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
index e506c9f..26c0408 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
@@ -62,6 +62,8 @@ object EventsBySliceSpec {
     pekko.persistence.r2dbc.query.behind-current-time = 500 millis
     pekko.persistence.r2dbc-small-buffer = $${pekko.persistence.r2dbc}
 
+    pekko.persistence.r2dbc.journal.publish-events = off
+
     # this is used by the "read in chunks" test
     pekko.persistence.r2dbc-small-buffer.query {
       buffer-size = 4
diff --git a/docs/src/main/paradox/projection.md 
b/docs/src/main/paradox/projection.md
index 550203a..9caf0da 100644
--- a/docs/src/main/paradox/projection.md
+++ b/docs/src/main/paradox/projection.md
@@ -236,7 +236,7 @@ The supported offset types of the `R2dbcProjection` are:
 
 ## Publish events for lower latency
 
-To reduce the latency until the Projection finds and process new events you 
can enable the feature described in @ref:[eventsBySlices 
documentation](query.md#publish-events-for-lower-latency-of-eventsbyslices).
+See @ref:[eventsBySlices 
documentation](query.md#publish-events-for-lower-latency-of-eventsbyslices).
 
 ## Compatibility with plugin configuration at runtime
 
diff --git a/docs/src/main/paradox/query.md b/docs/src/main/paradox/query.md
index a560c10..5eb621d 100644
--- a/docs/src/main/paradox/query.md
+++ b/docs/src/main/paradox/query.md
@@ -94,16 +94,22 @@ The `eventsBySlices` query polls the database periodically 
to find new events. B
 few seconds, see `pekko.persistence.r2dbc.query.refresh-interval` in the 
@ref:[Configuration](#configuration).
 This interval can be reduced for lower latency, with the drawback of querying 
the database more frequently.
 
-If you need latency below a few 100 milliseconds you can enable a feature that 
will publish the events within
-the Pekko Cluster instead of reducing `refresh-interval`. Running 
`eventsBySlices` will subscribe to the events
-and emit them directly without waiting for next query poll. The tradeoff is 
that more CPU and network resources
-are used. The events must still be retrieved from the database, but at a lower 
polling frequency,
-because delivery of published messages are not guaranteed.
+To reduce the latency there is a feature that will publish the events within 
the Pekko Cluster. Running
+`eventsBySlices` will subscribe to the events and emit them directly without 
waiting for next query poll.
+The tradeoff is that more CPU and network resources are used. The events must 
still be retrieved from the database,
+but at a lower polling frequency, because delivery of published messages are 
not guaranteed.
 
-Enable publishing of events with configuration:
+This feature is enabled by default and it will measure the throughput and 
automatically disable/enable if
+the exponentially weighted moving average of measured throughput exceeds the 
configured threshold.
 
 ```
-pekko.persistence.r2dbc.journal.publish-events = on
+pekko.persistence.r2dbc.publish-events-dynamic.throughput-threshold = 300
+```
+
+Disable publishing of events with configuration:
+
+```
+pekko.persistence.r2dbc.journal.publish-events = off
 ```
 
 ## Durable state queries
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index ce19afe..9af221c 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -51,6 +51,7 @@ object Dependencies {
     val pekkoShardingTyped = "org.apache.pekko" %% 
"pekko-cluster-sharding-typed" % PekkoVersion % Test
     val pekkoStream = "org.apache.pekko" %% "pekko-stream" % PekkoVersion % 
Test
     val pekkoStreamTestkit = "org.apache.pekko" %% "pekko-stream-testkit" % 
PekkoVersion % Test
+    val pekkoStreamTyped = "org.apache.pekko" %% "pekko-stream-typed" % 
PekkoVersion % Test
     val pekkoTestkit = "org.apache.pekko" %% "pekko-testkit" % PekkoVersion % 
Test
 
     val pekkoProjectionEventSourced =
@@ -78,6 +79,7 @@ object Dependencies {
     r2dbcMysql % "provided,test",
     TestDeps.pekkoPersistenceTck,
     TestDeps.pekkoStreamTestkit,
+    TestDeps.pekkoStreamTyped,
     TestDeps.pekkoActorTestkitTyped,
     TestDeps.pekkoJackson,
     TestDeps.logback,
@@ -122,6 +124,7 @@ object Dependencies {
     TestDeps.pekkoSlf4j,
     TestDeps.pekkoStream,
     TestDeps.pekkoStreamTestkit,
+    TestDeps.pekkoStreamTyped,
     TestDeps.pekkoTestkit)
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to