This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 1fb53ea Port akka-persistence-r2dbc PR #313: Dynamically
enable/disable publish-events (#353)
1fb53ea is described below
commit 1fb53ea1f9b76e909074c7695d6be1a84c0cc833
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 7 08:53:14 2026 +0100
Port akka-persistence-r2dbc PR #313: Dynamically enable/disable
publish-events (#353)
* Port akka-persistence-r2dbc PR #313: Dynamically enable/disable
publish-events
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/13f4e372-88f0-4596-836c-7c38e6380b19
Co-authored-by: pjfanning <[email protected]>
* Fix typos in EWMASpec test names
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/13f4e372-88f0-4596-836c-7c38e6380b19
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
core/src/main/resources/reference.conf | 23 +++++-
.../pekko/persistence/r2dbc/R2dbcSettings.scala | 9 +++
.../pekko/persistence/r2dbc/internal/EWMA.scala | 84 ++++++++++++++++++++
.../pekko/persistence/r2dbc/internal/PubSub.scala | 87 +++++++++++++++------
.../persistence/r2dbc/internal/EWMASpec.scala | 89 ++++++++++++++++++++++
.../query/EventsBySliceBacktrackingSpec.scala | 11 ++-
.../r2dbc/query/EventsBySlicePubSubSpec.scala | 70 +++++++++++++++++
.../r2dbc/query/EventsBySliceSpec.scala | 2 +
docs/src/main/paradox/projection.md | 2 +-
docs/src/main/paradox/query.md | 20 +++--
project/Dependencies.scala | 3 +
11 files changed, 365 insertions(+), 35 deletions(-)
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index 0344382..f5a5075 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -11,12 +11,27 @@ pekko.persistence.r2dbc {
# Otherwise it would be a pinned dispatcher, see
https://github.com/akka/akka/issues/31058
plugin-dispatcher = "pekko.actor.default-dispatcher"
- # Enable this to reduce latency of eventsBySlices. The persisted events
will be
- # published as Pekko messages and consumed directly by running
eventsBySlices
- # queries. Tradeoff is more CPU and network resources that are used. The
events
+ # Set this to off to disable publishing of events as Pekko messages to
running
+ # eventsBySlices queries.
+ # Tradeoff is more CPU and network resources that are used. The events
# must still be retrieved from the database, but at a lower polling
frequency,
# because delivery of published messages are not guaranteed.
- publish-events = off
+ # When this feature is enabled it will measure the throughput and
automatically
+ # disable/enable if the throughput exceeds the configured threshold. See
+ # publish-events-dynamic configuration.
+ publish-events = on
+
+ # When publish-events is enabled it will measure the throughput and
automatically
+ # disable/enable if the throughput exceeds the configured threshold.
+ # This configuration cannot be defined per journal, but is global for the
ActorSystem.
+ publish-events-dynamic {
+ # If exponentially weighted moving average of measured throughput
exceeds this
+ # threshold publishing of events is disabled. It is enabled again when
lower than
+ # the threshold.
+ throughput-threshold = 300
+ # The interval of the throughput measurements.
+ throughput-collect-interval = 10 seconds
+ }
# replay filter not needed for this plugin
replay-filter.mode = off
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index 6a5d368..b3978f2 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -291,3 +291,12 @@ trait UseConnnectionFactory {
val useConnectionFactory: String = config.getString("use-connection-factory")
}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class PublishEventsDynamicSettings(config: Config) {
+ val throughputThreshold: Int = config.getInt("throughput-threshold")
+ val throughputCollectInterval: FiniteDuration =
config.getDuration("throughput-collect-interval").toScala
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EWMA.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EWMA.scala
new file mode 100644
index 0000000..0fb10d3
--- /dev/null
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EWMA.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import scala.concurrent.duration.FiniteDuration
+
+import org.apache.pekko.annotation.InternalApi
+
+/**
+ * The exponentially weighted moving average (EWMA) approach captures
short-term movements in volatility for a
+ * conditional volatility forecasting model. By virtue of its alpha, or decay
factor, this provides a statistical
+ * streaming data model that is exponentially biased towards newer entries.
+ *
+ * https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+ *
+ * An EWMA only needs the most recent forecast value to be kept, as opposed to
a standard moving average model.
+ *
+ * Original source code taken from from
+ *
https://github.com/akka/akka/blob/main/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/EWMA.scala
+ *
+ * @param alpha
+ * decay factor, sets how quickly the exponential weighting decays for past
data compared to new data, see
+ * https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+ *
+ * @param value
+ * the current exponentially weighted moving average, e.g. Y(n - 1), or, the
sampled value resulting from the previous
+ * smoothing iteration. This value is always used as the previous EWMA to
calculate the new EWMA.
+ */
+@InternalApi private[pekko] final case class EWMA(value: Double, alpha:
Double) {
+
+ require(0.0 <= alpha && alpha <= 1.0, "alpha must be between 0.0 and 1.0")
+
+ val nanoTime = System.nanoTime()
+
+ /**
+ * Calculates the exponentially weighted moving average for a given
monitored data set.
+ *
+ * @param xn
+ * the new data point
+ * @return
+ * a new EWMA with the updated value
+ */
+ def :+(xn: Double): EWMA = {
+ val newValue = (alpha * xn) + (1 - alpha) * value
+ copy(value = newValue)
+ }
+
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object EWMA {
+
+ /**
+ * math.log(2)
+ */
+ private val LogOf2 = 0.69315
+
+ /**
+ * Calculate the alpha (decay factor) from specified half-life and interval
between observations. Half-life is the
+ * interval over which the weights decrease by a factor of two. The
relevance of each data sample is halved for every
+ * passing half-life duration,
+ * i.e. after 4 times the half-life, a data sample's relevance is reduced to
6% of its original relevance. The initial
+ * relevance of a data sample is given by `1 – 0.5 ^ (collect-interval /
half-life)`.
+ */
+ def alpha(halfLife: FiniteDuration, collectInterval: FiniteDuration): Double
= {
+ val halfLifeMillis = halfLife.toMillis
+ require(halfLife.toMillis > 0, "halfLife must be > 0 s")
+ val decayRate = LogOf2 / halfLifeMillis
+ 1 - math.exp(-decayRate * collectInterval.toMillis)
+ }
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
index 8946f34..f8d9562 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
@@ -17,6 +17,7 @@ import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
import org.apache.pekko
import pekko.actor.typed.ActorRef
@@ -30,12 +31,16 @@ import pekko.persistence.PersistentRepr
import pekko.persistence.journal.Tagged
import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.r2dbc.PublishEventsDynamicSettings
import pekko.persistence.typed.PersistenceId
+import org.slf4j.LoggerFactory
/**
* INTERNAL API
*/
@InternalApi private[pekko] object PubSub extends ExtensionId[PubSub] {
+ private val log = LoggerFactory.getLogger(classOf[PubSub])
+
def createExtension(system: ActorSystem[_]): PubSub = new PubSub(system)
// Java API
@@ -47,9 +52,21 @@ import pekko.persistence.typed.PersistenceId
* INTERNAL API
*/
@InternalApi private[pekko] class PubSub(system: ActorSystem[_]) extends
Extension {
+ import PubSub.log
+
private val topics = new ConcurrentHashMap[String, ActorRef[Any]]
private val persistenceExt = Persistence(system)
+ private val settings = new PublishEventsDynamicSettings(
+
system.settings.config.getConfig("pekko.persistence.r2dbc.journal.publish-events-dynamic"))
+ private val throughputCollectIntervalMillis =
settings.throughputCollectInterval.toMillis
+ private val throughputThreshold = settings.throughputThreshold.toDouble
+ private val throughputSampler =
+ math.min(1000, math.max(1, settings.throughputThreshold / 10)) // 1/10 of
threshold, but between 1-1000
+ private val throughputCounter = new AtomicLong
+ @volatile private var throughput =
+ EWMA(0.0, EWMA.alpha(settings.throughputCollectInterval * 2,
settings.throughputCollectInterval))
+
def eventTopic[Event](entityType: String, slice: Int):
ActorRef[Topic.Command[EventEnvelope[Event]]] = {
val name = topicName(entityType, slice)
topics
@@ -61,29 +78,55 @@ import pekko.persistence.typed.PersistenceId
URLEncoder.encode(s"r2dbc-$entityType-$slice",
StandardCharsets.UTF_8.name())
def publish(pr: PersistentRepr, timestamp: Instant): Unit = {
- val pid = pr.persistenceId
- val entityType = PersistenceId.extractEntityType(pid)
- val slice = persistenceExt.sliceForPersistenceId(pid)
-
- val offset = TimestampOffset(timestamp, timestamp, Map(pid ->
pr.sequenceNr))
- val payload =
- pr.payload match {
- case Tagged(payload, _) =>
- // eventsByTag not implemented (see issue #82), but events can still
be tagged, so we unwrap this tagged event.
- payload
-
- case other => other
+
+ val n = throughputCounter.incrementAndGet()
+ if (n % throughputSampler == 0) {
+ val ewma = throughput
+ val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000
+ if (durationMillis >= throughputCollectIntervalMillis) {
+ // doesn't have to be exact so "missed" or duplicate concurrent calls
don't matter
+ throughputCounter.set(0L)
+ val rps = n * 1000.0 / durationMillis
+ val newEwma = ewma :+ rps
+ throughput = newEwma
+ if (ewma.value < throughputThreshold && newEwma.value >=
throughputThreshold) {
+ log.info("Disabled publishing of events. Throughput greater than
[{}] events/s", throughputThreshold)
+ } else if (ewma.value >= throughputThreshold && newEwma.value <
throughputThreshold) {
+ log.info("Enabled publishing of events. Throughput less than [{}]
events/s", throughputThreshold)
+ } else {
+ log.debug(
+ "Publishing of events is {}. Throughput is [{}] events/s",
+ if (newEwma.value < throughputThreshold) "enabled" else "disabled",
+ newEwma.value)
+ }
}
+ }
+
+ if (throughput.value < throughputThreshold) {
+ val pid = pr.persistenceId
+ val entityType = PersistenceId.extractEntityType(pid)
+ val slice = persistenceExt.sliceForPersistenceId(pid)
+
+ val offset = TimestampOffset(timestamp, timestamp, Map(pid ->
pr.sequenceNr))
+ val payload =
+ pr.payload match {
+ case Tagged(payload, _) =>
+ // eventsByTag not implemented (see issue #82), but events can
still be tagged, so we unwrap this tagged event.
+ payload
+
+ case other => other
+ }
- val envelope = new EventEnvelope(
- offset,
- pid,
- pr.sequenceNr,
- Option(payload),
- timestamp.toEpochMilli,
- pr.metadata,
- entityType,
- slice)
- eventTopic(entityType, slice) ! Topic.Publish(envelope)
+ val envelope = new EventEnvelope(
+ offset,
+ pid,
+ pr.sequenceNr,
+ Option(payload),
+ timestamp.toEpochMilli,
+ pr.metadata,
+ entityType,
+ slice)
+ eventTopic(entityType, slice) ! Topic.Publish(envelope)
+ }
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/EWMASpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/EWMASpec.scala
new file mode 100644
index 0000000..205ee3b
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/EWMASpec.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import scala.concurrent.duration._
+
+import org.scalatest.TestSuite
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+/**
+ * Original source code taken from
+ *
https://github.com/akka/akka/blob/main/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/EWMASpec.scala
+ */
+class EWMASpec extends AnyWordSpec with TestSuite with Matchers {
+
+ "EWMA" should {
+
+ "calculate same ewma for constant values" in {
+ val ds = EWMA(value = 100.0, alpha = 0.18) :+
+ 100.0 :+ 100.0 :+ 100.0
+ ds.value should ===(100.0 +- 0.001)
+ }
+
+ "calculate correct ewma for normal decay" in {
+ val d0 = EWMA(value = 1000.0, alpha = 2.0 / (1 + 10))
+ d0.value should ===(1000.0 +- 0.01)
+ val d1 = d0 :+ 10.0
+ d1.value should ===(820.0 +- 0.01)
+ val d2 = d1 :+ 10.0
+ d2.value should ===(672.73 +- 0.01)
+ val d3 = d2 :+ 10.0
+ d3.value should ===(552.23 +- 0.01)
+ val d4 = d3 :+ 10.0
+ d4.value should ===(453.64 +- 0.01)
+
+ val dn = (1 to 100).foldLeft(d0)((d, _) => d :+ 10.0)
+ dn.value should ===(10.0 +- 0.1)
+ }
+
+ "calculate ewma for alpha 1.0, max bias towards latest value" in {
+ val d0 = EWMA(value = 100.0, alpha = 1.0)
+ d0.value should ===(100.0 +- 0.01)
+ val d1 = d0 :+ 1.0
+ d1.value should ===(1.0 +- 0.01)
+ val d2 = d1 :+ 57.0
+ d2.value should ===(57.0 +- 0.01)
+ val d3 = d2 :+ 10.0
+ d3.value should ===(10.0 +- 0.01)
+ }
+
+ "calculate alpha from half-life and collect interval" in {
+ // according to
https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+ val expectedAlpha = 0.1
+ // alpha = 2.0 / (1 + N)
+ val n = 19
+ val halfLife = n.toDouble / 2.8854
+ val collectInterval = 1.second
+ val halfLifeDuration = (halfLife * 1000).millis
+ EWMA.alpha(halfLifeDuration, collectInterval) should ===(expectedAlpha
+- 0.001)
+ }
+
+ "calculate sane alpha from short half-life" in {
+ val alpha = EWMA.alpha(1.millis, 3.seconds)
+ alpha should be <= 1.0
+ alpha should be >= 0.0
+ alpha should ===(1.0 +- 0.001)
+ }
+
+ "calculate sane alpha from long half-life" in {
+ val alpha = EWMA.alpha(1.day, 3.seconds)
+ alpha should be <= 1.0
+ alpha should be >= 0.0
+ alpha should ===(0.0 +- 0.001)
+ }
+
+ }
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index ad655b5..3b244b5 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -35,11 +35,20 @@ import pekko.persistence.typed.PersistenceId
import pekko.serialization.SerializationExtension
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
+import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory
+object EventsBySliceBacktrackingSpec {
+ private val config = ConfigFactory
+ .parseString("""
+ pekko.persistence.r2dbc.journal.publish-events = off
+ """)
+ .withFallback(TestConfig.config)
+}
+
class EventsBySliceBacktrackingSpec
- extends ScalaTestWithActorTestKit(TestConfig.config)
+ extends ScalaTestWithActorTestKit(EventsBySliceBacktrackingSpec.config)
with AnyWordSpecLike
with TestDbLifecycle
with TestData
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 47e7f4a..553446b 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -15,9 +15,13 @@ package org.apache.pekko.persistence.r2dbc.query
import java.time.Instant
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.internal.pubsub.TopicImpl
@@ -39,6 +43,7 @@ import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.typed.scaladsl.ActorFlow
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
@@ -50,9 +55,15 @@ object EventsBySlicePubSubSpec {
.parseString("""
pekko.persistence.r2dbc {
journal.publish-events = on
+ journal.publish-events-dynamic {
+ throughput-threshold = 50
+ throughput-collect-interval = 1 second
+ }
+
# no events from database query, only via pub-sub
behind-current-time = 5 minutes
}
+ pekko.actor.testkit.typed.filter-leeway = 20.seconds
""")
.withFallback(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.unresolvedConfig))
)
@@ -178,6 +189,65 @@ class EventsBySlicePubSubSpec
out shouldBe List(envA1, envA2, envA3, envB1, envA1, envB2) // envA1 was
evicted and therefore duplicate
}
+ "dynamically enable/disable publishing based on throughput" in new Setup {
+ import pekko.actor.typed.scaladsl.adapter._
+
+ val consumerProbe = createTestProbe[EventEnvelope[String]]()
+
+ query
+ .eventsBySlices[String](setupEntityType, slice, slice, NoOffset)
+ .runWith(
+ Sink.actorRef(consumerProbe.ref.toClassic, onCompleteMessage =
"done", onFailureMessage = _.getMessage))
+
+ val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]()
+ eventually {
+ PubSub(typedSystem).eventTopic[String](setupEntityType, slice) !
TopicImpl.GetTopicStats(topicStatsProbe.ref)
+ topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 1
+ }
+
+ for (i <- 1 to 10) {
+ persister ! PersistWithAck(s"e-$i", probe.ref)
+ probe.expectMessage(Done)
+ }
+
+ consumerProbe.receiveMessages(10)
+
+ LoggingTestKit.info("Disabled publishing of events").expect {
+ val done1 = Source(11 to 600)
+ .throttle(200, 1.second)
+ .via(ActorFlow.ask[Int, PersistWithAck, Done](1)(persister) { case
(i, replyTo) =>
+ PersistWithAck(s"e-$i", replyTo)
+ })
+ .runWith(Sink.ignore)
+
+ Await.result(done1, 20.seconds)
+ }
+
+ var count = 0
+ var lookForMore = true
+ while (lookForMore) {
+ try {
+ consumerProbe.receiveMessage(1.second)
+ count += 1
+ } catch {
+ case _: AssertionError => lookForMore = false // timeout
+ }
+ }
+ count should be <= 500
+
+ LoggingTestKit.info("Enabled publishing of events").expect {
+ val done2 = Source(601 to 800)
+ .throttle(20, 1.second)
+ .via(ActorFlow.ask[Int, PersistWithAck, Done](1)(persister) { case
(i, replyTo) =>
+ PersistWithAck(s"e-$i", replyTo)
+ })
+ .runWith(Sink.ignore)
+
+ Await.result(done2, 20.seconds)
+ }
+
+ }
+
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
index e506c9f..26c0408 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
@@ -62,6 +62,8 @@ object EventsBySliceSpec {
pekko.persistence.r2dbc.query.behind-current-time = 500 millis
pekko.persistence.r2dbc-small-buffer = $${pekko.persistence.r2dbc}
+ pekko.persistence.r2dbc.journal.publish-events = off
+
# this is used by the "read in chunks" test
pekko.persistence.r2dbc-small-buffer.query {
buffer-size = 4
diff --git a/docs/src/main/paradox/projection.md
b/docs/src/main/paradox/projection.md
index 550203a..9caf0da 100644
--- a/docs/src/main/paradox/projection.md
+++ b/docs/src/main/paradox/projection.md
@@ -236,7 +236,7 @@ The supported offset types of the `R2dbcProjection` are:
## Publish events for lower latency
-To reduce the latency until the Projection finds and process new events you
can enable the feature described in @ref:[eventsBySlices
documentation](query.md#publish-events-for-lower-latency-of-eventsbyslices).
+See @ref:[eventsBySlices
documentation](query.md#publish-events-for-lower-latency-of-eventsbyslices).
## Compatibility with plugin configuration at runtime
diff --git a/docs/src/main/paradox/query.md b/docs/src/main/paradox/query.md
index a560c10..5eb621d 100644
--- a/docs/src/main/paradox/query.md
+++ b/docs/src/main/paradox/query.md
@@ -94,16 +94,22 @@ The `eventsBySlices` query polls the database periodically
to find new events. B
few seconds, see `pekko.persistence.r2dbc.query.refresh-interval` in the
@ref:[Configuration](#configuration).
This interval can be reduced for lower latency, with the drawback of querying
the database more frequently.
-If you need latency below a few 100 milliseconds you can enable a feature that
will publish the events within
-the Pekko Cluster instead of reducing `refresh-interval`. Running
`eventsBySlices` will subscribe to the events
-and emit them directly without waiting for next query poll. The tradeoff is
that more CPU and network resources
-are used. The events must still be retrieved from the database, but at a lower
polling frequency,
-because delivery of published messages are not guaranteed.
+To reduce the latency there is a feature that will publish the events within
the Pekko Cluster. Running
+`eventsBySlices` will subscribe to the events and emit them directly without
waiting for next query poll.
+The tradeoff is that more CPU and network resources are used. The events must
still be retrieved from the database,
+but at a lower polling frequency, because delivery of published messages are
not guaranteed.
-Enable publishing of events with configuration:
+This feature is enabled by default and it will measure the throughput and
automatically disable/enable if
+the exponentially weighted moving average of measured throughput exceeds the
configured threshold.
```
-pekko.persistence.r2dbc.journal.publish-events = on
+pekko.persistence.r2dbc.publish-events-dynamic.throughput-threshold = 300
+```
+
+Disable publishing of events with configuration:
+
+```
+pekko.persistence.r2dbc.journal.publish-events = off
```
## Durable state queries
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index ce19afe..9af221c 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -51,6 +51,7 @@ object Dependencies {
val pekkoShardingTyped = "org.apache.pekko" %%
"pekko-cluster-sharding-typed" % PekkoVersion % Test
val pekkoStream = "org.apache.pekko" %% "pekko-stream" % PekkoVersion %
Test
val pekkoStreamTestkit = "org.apache.pekko" %% "pekko-stream-testkit" %
PekkoVersion % Test
+ val pekkoStreamTyped = "org.apache.pekko" %% "pekko-stream-typed" %
PekkoVersion % Test
val pekkoTestkit = "org.apache.pekko" %% "pekko-testkit" % PekkoVersion %
Test
val pekkoProjectionEventSourced =
@@ -78,6 +79,7 @@ object Dependencies {
r2dbcMysql % "provided,test",
TestDeps.pekkoPersistenceTck,
TestDeps.pekkoStreamTestkit,
+ TestDeps.pekkoStreamTyped,
TestDeps.pekkoActorTestkitTyped,
TestDeps.pekkoJackson,
TestDeps.logback,
@@ -122,6 +124,7 @@ object Dependencies {
TestDeps.pekkoSlf4j,
TestDeps.pekkoStream,
TestDeps.pekkoStreamTestkit,
+ TestDeps.pekkoStreamTyped,
TestDeps.pekkoTestkit)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]