This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 645b76fbc9 Runtime plugin configuration for DurableState (#3058)
645b76fbc9 is described below
commit 645b76fbc91d0bda70374b79563ec17b15cb1de1
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Sun Jun 14 18:26:22 2026 +0300
Runtime plugin configuration for DurableState (#3058)
* Runtime plugin configuration for DurableState
Motivation:
Like EventSourcedBehavior, DurableStateBehavior should allow configuring its
persistence plugin at runtime so the same plugin class can back multiple,
isolated stores (e.g. the pekko-persistence-r2dbc plugin).
Modification:
Add DurableStateBehavior.withDurableStateStorePluginConfig (Scala) and an
overridable durableStateStorePluginConfig (Java), threaded through
DurableStateBehaviorImpl, DurableStateSettings, and BehaviorSetup. Add
Config-accepting overloads of DurableStateStoreRegistry.durableStateStoreFor
and getDurableStateStoreFor that merge the runtime config ahead of the
system
config. The recovery-timeout is now resolved from the merged config so a
plugin
defined only at runtime resolves correctly.
Result:
DurableStateBehavior can be configured with a runtime plugin Config, and the
store registry resolves the plugin using it.
Tests:
- sbt persistence/mimaReportBinaryIssues
persistence-typed/mimaReportBinaryIssues - success
- sbt persistence-typed-tests/testOnly (all state.scaladsl and
state.javadsl specs) - All tests passed
References:
Fixes #1798
* Use new-file ASF header for RuntimeDurableStateStoreSpec
* Address review for DurableState runtime config
Motivation:
Review feedback on PR #3058: the runtime `pluginConfig` parameter shadowed
the private `pluginConfig(pluginId)` method, and the new runtime plugin
configuration feature was only covered by a Scala spec.
Modification:
Rename the runtime `Config` parameter to `runtimeConfig` in
`durableStateStoreFor`/`getDurableStateStoreFor` to avoid shadowing, and add
a Java DSL `RuntimeDurableStateStoreTest` mirroring the Scala spec to keep
the Scala and Java DSLs in test parity.
Result:
No name collision with the private method, and the Java DSL runtime plugin
config and `getDurableStateStoreFor(clazz, pluginId, config)` are covered.
Tests:
- sbt "persistence-typed-tests/Test/testOnly
org.apache.pekko.persistence.typed.state.javadsl.RuntimeDurableStateStoreTest"
- Passed 1
- sbt "persistence-typed-tests/Test/javafmtCheck" (JDK 17) - success
- sbt "persistence-typed-tests/headerCheckAll" - success
References:
Refs #3058
---
.../javadsl/RuntimeDurableStateStoreTest.java | 160 +++++++++++++++++++++
.../scaladsl/RuntimeDurableStateStoreSpec.scala | 100 +++++++++++++
.../durablestate-runtime-config.excludes | 19 +++
.../typed/state/internal/BehaviorSetup.scala | 6 +-
.../state/internal/DurableStateBehaviorImpl.scala | 12 +-
.../state/internal/DurableStateSettings.scala | 34 ++++-
.../typed/state/javadsl/DurableStateBehavior.scala | 18 ++-
.../state/scaladsl/DurableStateBehavior.scala | 10 ++
.../state/DurableStateStoreRegistry.scala | 33 ++++-
9 files changed, 381 insertions(+), 11 deletions(-)
diff --git
a/persistence-typed-tests/src/test/java/org/apache/pekko/persistence/typed/state/javadsl/RuntimeDurableStateStoreTest.java
b/persistence-typed-tests/src/test/java/org/apache/pekko/persistence/typed/state/javadsl/RuntimeDurableStateStoreTest.java
new file mode 100644
index 0000000000..49b2c7f8af
--- /dev/null
+++
b/persistence-typed-tests/src/test/java/org/apache/pekko/persistence/typed/state/javadsl/RuntimeDurableStateStoreTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.typed.state.javadsl;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.testkit.typed.annotations.JUnitJupiterTestKit;
+import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit;
+import org.apache.pekko.actor.testkit.typed.javadsl.JUnitJupiterTestKitBuilder;
+import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturingExtension;
+import
org.apache.pekko.actor.testkit.typed.javadsl.TestKitJUnitJupiterExtension;
+import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe;
+import org.apache.pekko.actor.typed.ActorRef;
+import org.apache.pekko.actor.typed.Behavior;
+import org.apache.pekko.persistence.state.DurableStateStoreRegistry;
+import org.apache.pekko.persistence.state.javadsl.DurableStateUpdateStore;
+import org.apache.pekko.persistence.state.javadsl.GetObjectResult;
+import
org.apache.pekko.persistence.testkit.state.PersistenceTestKitDurableStateStoreProvider;
+import org.apache.pekko.persistence.typed.PersistenceId;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(TestKitJUnitJupiterExtension.class)
+@ExtendWith(LogCapturingExtension.class)
+public class RuntimeDurableStateStoreTest {
+
+ @JUnitJupiterTestKit public ActorTestKit testKit = new
JUnitJupiterTestKitBuilder().build();
+
+ static Config config(String store) {
+ return ConfigFactory.parseString(
+ store
+ + " {\n"
+ + " state.class = \""
+ + PersistenceTestKitDurableStateStoreProvider.class.getName()
+ + "\"\n"
+ + "}\n");
+ }
+
+ interface Command {}
+
+ static final class Save implements Command {
+ final String text;
+ final ActorRef<Done> replyTo;
+
+ Save(String text, ActorRef<Done> replyTo) {
+ this.text = text;
+ this.replyTo = replyTo;
+ }
+ }
+
+ static final class ShowMeWhatYouGot implements Command {
+ final ActorRef<String> replyTo;
+
+ ShowMeWhatYouGot(ActorRef<String> replyTo) {
+ this.replyTo = replyTo;
+ }
+ }
+
+ enum Stop implements Command {
+ INSTANCE
+ }
+
+ static final class Actor extends DurableStateBehavior<Command, String> {
+ private final String store;
+
+ static Behavior<Command> create(String persistenceId, String store) {
+ return new Actor(persistenceId, store);
+ }
+
+ private Actor(String persistenceId, String store) {
+ super(PersistenceId.ofUniqueId(persistenceId));
+ this.store = store;
+ }
+
+ @Override
+ public String emptyState() {
+ return "";
+ }
+
+ @Override
+ public CommandHandler<Command, String> commandHandler() {
+ return newCommandHandlerBuilder()
+ .forAnyState()
+ .onCommand(Save.class, this::onSave)
+ .onCommand(ShowMeWhatYouGot.class, this::onShow)
+ .onCommand(Stop.class, (state, cmd) -> Effect().stop())
+ .build();
+ }
+
+ private Effect<String> onSave(String state, Save cmd) {
+ String newState =
+ Stream.of(state, cmd.text).filter(s ->
!s.isEmpty()).collect(Collectors.joining("|"));
+ return Effect().persist(newState).thenRun(() ->
cmd.replyTo.tell(Done.getInstance()));
+ }
+
+ private Effect<String> onShow(String state, ShowMeWhatYouGot cmd) {
+ cmd.replyTo.tell(state);
+ return Effect().none();
+ }
+
+ @Override
+ public String durableStateStorePluginId() {
+ return store + ".state";
+ }
+
+ @Override
+ public Optional<Config> durableStateStorePluginConfig() {
+ return Optional.of(config(store));
+ }
+ }
+
+ @Test
+ public void configureAtRuntimeInIsolatedInstances() throws Exception {
+ TestProbe<Done> probe = testKit.createTestProbe();
+
+ // one actor in each store with same id
+ ActorRef<Command> s1 = testKit.spawn(Actor.create("id1", "store1"));
+ ActorRef<Command> s2 = testKit.spawn(Actor.create("id1", "store2"));
+ s1.tell(new Save("s1m1", probe.ref()));
+ probe.receiveMessage();
+ s2.tell(new Save("s2m1", probe.ref()));
+ probe.receiveMessage();
+
+ assertStore("store1", "s1m1");
+ assertStore("store2", "s2m1");
+ }
+
+ private void assertStore(String store, String expectedState) throws
Exception {
+ @SuppressWarnings("unchecked")
+ DurableStateUpdateStore<String> durableStateStore =
+ DurableStateStoreRegistry.get(testKit.system())
+ .getDurableStateStoreFor(
+ DurableStateUpdateStore.class, store + ".state",
config(store));
+ GetObjectResult<String> result =
+ durableStateStore.getObject("id1").toCompletableFuture().get(3,
TimeUnit.SECONDS);
+ assertEquals(Optional.of(expectedState), result.value());
+ }
+}
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala
new file mode 100644
index 0000000000..4dc4bdb9e4
--- /dev/null
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.typed.state.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.Behavior
+import pekko.persistence.state.DurableStateStoreRegistry
+import pekko.persistence.state.scaladsl.DurableStateUpdateStore
+import
pekko.persistence.testkit.state.PersistenceTestKitDurableStateStoreProvider
+import pekko.persistence.typed.PersistenceId
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object RuntimeDurableStateStoreSpec {
+
+ private object Actor {
+ sealed trait Command
+ final case class Save(text: String, replyTo: ActorRef[Done]) extends
Command
+ final case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends
Command
+ case object Stop extends Command
+
+ def apply(persistenceId: String, store: String): Behavior[Command] =
+ DurableStateBehavior[Command, String](
+ PersistenceId.ofUniqueId(persistenceId),
+ "",
+ (state, cmd) =>
+ cmd match {
+ case Save(text, replyTo) =>
+ Effect.persist(Seq(state,
text).filter(_.nonEmpty).mkString("|")).thenRun(_ => replyTo ! Done)
+ case ShowMeWhatYouGot(replyTo) =>
+ replyTo ! state
+ Effect.none
+ case Stop =>
+ Effect.stop()
+ })
+ .withDurableStateStorePluginId(s"$store.state")
+ .withDurableStateStorePluginConfig(Some(config(store)))
+ }
+
+ private def config(store: String): Config =
+ ConfigFactory.parseString(s"""
+ $store {
+ state.class =
"${classOf[PersistenceTestKitDurableStateStoreProvider].getName}"
+ }
+ """)
+}
+
+class RuntimeDurableStateStoreSpec extends ScalaTestWithActorTestKit with
AnyWordSpecLike with LogCapturing {
+
+ import RuntimeDurableStateStoreSpec._
+
+ "The durable state store plugin" must {
+
+ "be possible to configure at runtime and use in multiple isolated
instances" in {
+ val probe = createTestProbe[Any]()
+
+ {
+ // one actor in each store with same id
+ val s1 = spawn(Actor("id1", "store1"))
+ val s2 = spawn(Actor("id1", "store2"))
+ s1 ! Actor.Save("s1m1", probe.ref)
+ probe.receiveMessage()
+ s2 ! Actor.Save("s2m1", probe.ref)
+ probe.receiveMessage()
+ }
+
+ {
+ def assertStore(store: String, expectedState: String) = {
+ val durableStateStore = DurableStateStoreRegistry(system)
+
.durableStateStoreFor[DurableStateUpdateStore[String]](s"$store.state",
config(store))
+ durableStateStore.getObject("id1").futureValue.value shouldBe
Some(expectedState)
+ }
+
+ assertStore("store1", "s1m1")
+ assertStore("store2", "s2m1")
+ }
+ }
+ }
+}
diff --git
a/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes
new file mode 100644
index 0000000000..b86dd3685c
--- /dev/null
+++
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Add DurableStateBehavior.withDurableStateStorePluginConfig
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.state.scaladsl.DurableStateBehavior.withDurableStateStorePluginConfig")
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala
index c74421ab20..332bc62e25 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala
@@ -30,6 +30,8 @@ import
pekko.persistence.typed.state.internal.InternalProtocol.RecoveryTimeout
import pekko.persistence.typed.state.scaladsl.DurableStateBehavior
import pekko.util.OptionVal
+import com.typesafe.config.ConfigFactory
+
import org.slf4j.Logger
import org.slf4j.MDC
@@ -57,7 +59,9 @@ private[pekko] final class BehaviorSetup[C, S](
// Any instead S because adapter may change the type
val durableStateStore: DurableStateUpdateStore[Any] =
DurableStateStoreRegistry(context.system.toClassic)
-
.durableStateStoreFor[DurableStateUpdateStore[Any]](settings.durableStateStorePluginId)
+ .durableStateStoreFor[DurableStateUpdateStore[Any]](
+ settings.durableStateStorePluginId,
+ settings.durableStateStorePluginConfig.getOrElse(ConfigFactory.empty))
def selfClassic: ClassicActorRef = context.self.toClassic
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
index 65061dffe7..bb6662d07d 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
@@ -35,6 +35,8 @@ import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.SnapshotAdapter
import pekko.persistence.typed.state.scaladsl._
+import com.typesafe.config.Config
+
import org.slf4j.LoggerFactory
@InternalApi
@@ -60,6 +62,7 @@ private[pekko] final case class
DurableStateBehaviorImpl[Command, State](
commandHandler: DurableStateBehavior.CommandHandler[Command, State],
loggerClass: Class[?],
durableStateStorePluginId: Option[String] = None,
+ durableStateStorePluginConfig: Option[Config] = None,
tag: String = "",
snapshotAdapter: SnapshotAdapter[State] =
NoOpSnapshotAdapter.instance[State],
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
@@ -80,7 +83,11 @@ private[pekko] final case class
DurableStateBehaviorImpl[Command, State](
case _ => false
}
if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
- val settings = DurableStateSettings(ctx.system,
durableStateStorePluginId.getOrElse(""), customStashCapacity)
+ val settings = DurableStateSettings(
+ ctx.system,
+ durableStateStorePluginId.getOrElse(""),
+ durableStateStorePluginConfig,
+ customStashCapacity)
// stashState outside supervise because StashState should survive restarts
due to persist failures
val stashState = new
StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
@@ -170,6 +177,9 @@ private[pekko] final case class
DurableStateBehaviorImpl[Command, State](
copy(durableStateStorePluginId = if (id != "") Some(id) else None)
}
+ override def withDurableStateStorePluginConfig(config: Option[Config]):
DurableStateBehavior[Command, State] =
+ copy(durableStateStorePluginConfig = config)
+
override def withTag(tag: String): DurableStateBehavior[Command, State] =
copy(tag = tag)
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
index 33082fcb3d..5071c659a7 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
@@ -34,11 +34,25 @@ import com.typesafe.config.Config
system: ActorSystem[?],
durableStateStorePluginId: String,
customStashCapacity: Option[Int]): DurableStateSettings =
- apply(system.settings.config, durableStateStorePluginId,
customStashCapacity)
+ apply(system, durableStateStorePluginId, None, customStashCapacity)
+
+ def apply(
+ system: ActorSystem[?],
+ durableStateStorePluginId: String,
+ durableStateStorePluginConfig: Option[Config],
+ customStashCapacity: Option[Int]): DurableStateSettings =
+ apply(system.settings.config, durableStateStorePluginId,
durableStateStorePluginConfig, customStashCapacity)
+
+ def apply(
+ config: Config,
+ durableStateStorePluginId: String,
+ customStashCapacity: Option[Int]): DurableStateSettings =
+ apply(config, durableStateStorePluginId, None, customStashCapacity)
def apply(
config: Config,
durableStateStorePluginId: String,
+ durableStateStorePluginConfig: Option[Config],
customStashCapacity: Option[Int]): DurableStateSettings = {
val typedConfig = config.getConfig("pekko.persistence.typed")
@@ -54,7 +68,8 @@ import com.typesafe.config.Config
val logOnStashing = typedConfig.getBoolean("log-stashing")
- val durableStateStoreConfig = durableStateStoreConfigFor(config,
durableStateStorePluginId)
+ val durableStateStoreConfig =
+ durableStateStoreConfigFor(config, durableStateStorePluginId,
durableStateStorePluginConfig)
val recoveryTimeout: FiniteDuration =
durableStateStoreConfig.getDuration("recovery-timeout",
TimeUnit.MILLISECONDS).millis
@@ -69,20 +84,26 @@ import com.typesafe.config.Config
logOnStashing = logOnStashing,
recoveryTimeout,
durableStateStorePluginId,
+ durableStateStorePluginConfig,
useContextLoggerForInternalLogging,
recurseWhenUnstashingReadOnlyCommands)
}
- private def durableStateStoreConfigFor(config: Config, pluginId: String):
Config = {
+ private def durableStateStoreConfigFor(
+ config: Config,
+ pluginId: String,
+ pluginConfig: Option[Config]): Config = {
+ val mergedConfig =
pluginConfig.map(_.withFallback(config)).getOrElse(config)
+
def defaultPluginId = {
- val configPath = config.getString("pekko.persistence.state.plugin")
+ val configPath = mergedConfig.getString("pekko.persistence.state.plugin")
Persistence.verifyPluginConfigIsDefined(configPath, "Default
DurableStateStore")
configPath
}
val configPath = if (pluginId == "") defaultPluginId else pluginId
- Persistence.verifyPluginConfigExists(config, configPath,
"DurableStateStore")
-
config.getConfig(configPath).withFallback(config.getConfig("pekko.persistence.state-plugin-fallback"))
+ Persistence.verifyPluginConfigExists(mergedConfig, configPath,
"DurableStateStore")
+
mergedConfig.getConfig(configPath).withFallback(mergedConfig.getConfig("pekko.persistence.state-plugin-fallback"))
}
}
@@ -97,6 +118,7 @@ private[pekko] final case class DurableStateSettings(
logOnStashing: Boolean,
recoveryTimeout: FiniteDuration,
durableStateStorePluginId: String,
+ durableStateStorePluginConfig: Option[Config],
useContextLoggerForInternalLogging: Boolean,
recurseWhenUnstashingReadOnlyCommands: Boolean) {
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala
index 54b3b6003a..4af86d9582 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala
@@ -28,6 +28,10 @@ import pekko.persistence.typed.SnapshotAdapter
import pekko.persistence.typed.state.internal
import pekko.persistence.typed.state.internal._
import pekko.persistence.typed.state.scaladsl
+
+import scala.jdk.OptionConverters._
+
+import com.typesafe.config.Config
import org.jspecify.annotations.Nullable
/**
@@ -114,6 +118,14 @@ abstract class DurableStateBehavior[Command, State]
private[pekko] (
*/
def durableStateStorePluginId: String = ""
+ /**
+ * Override and define the `DurableStateStore` plugin config that this actor
should use instead of the default.
+ * This is useful when the same plugin class is configured for multiple,
isolated stores at runtime.
+ *
+ * @since 2.0.0
+ */
+ def durableStateStorePluginConfig: Optional[Config] = Optional.empty()
+
/**
* The tag that can be used in persistence query.
*/
@@ -140,7 +152,11 @@ abstract class DurableStateBehavior[Command, State]
private[pekko] (
persistenceId,
emptyState,
(state, cmd) => commandHandler()(state,
cmd).asInstanceOf[EffectImpl[State]],
-
getClass).withTag(tag).snapshotAdapter(snapshotAdapter()).withDurableStateStorePluginId(durableStateStorePluginId)
+ getClass)
+ .withTag(tag)
+ .snapshotAdapter(snapshotAdapter())
+ .withDurableStateStorePluginId(durableStateStorePluginId)
+ .withDurableStateStorePluginConfig(durableStateStorePluginConfig.toScala)
val handler = signalHandler()
val behaviorWithSignalHandler =
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
index ec44f3abfa..3a014744bf 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
@@ -15,6 +15,8 @@ package org.apache.pekko.persistence.typed.state.scaladsl
import scala.annotation.tailrec
+import com.typesafe.config.Config
+
import org.apache.pekko
import pekko.actor.typed.BackoffSupervisorStrategy
import pekko.actor.typed.Behavior
@@ -146,6 +148,14 @@ object DurableStateBehavior {
*/
def withDurableStateStorePluginId(id: String): DurableStateBehavior[Command,
State]
+ /**
+ * Change the `DurableStateStore` plugin config that this actor should use.
+ * This is useful when the same plugin class is configured for multiple,
isolated stores at runtime.
+ *
+ * @since 2.0.0
+ */
+ def withDurableStateStorePluginConfig(config: Option[Config]):
DurableStateBehavior[Command, State]
+
/**
* The tag that can used in persistence query
*/
diff --git
a/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala
b/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala
index 64beff8d51..bc6704ebb0 100644
---
a/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala
+++
b/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala
@@ -30,6 +30,7 @@ import pekko.persistence.PluginProvider
import pekko.persistence.state.scaladsl.DurableStateStore
import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
/**
* Persistence extension for queries.
@@ -67,9 +68,12 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem)
configPath
}
- private def pluginIdOrDefault(pluginId: String): String = {
+ private def pluginIdOrDefault(pluginId: String): String =
+ pluginIdOrDefault(pluginId, ConfigFactory.empty)
+
+ private def pluginIdOrDefault(pluginId: String, pluginConfig: Config):
String = {
val configPath = if (isEmpty(pluginId)) defaultPluginId else pluginId
- Persistence.verifyPluginConfigExists(systemConfig, configPath,
"DurableStateStore")
+
Persistence.verifyPluginConfigExists(pluginConfig.withFallback(systemConfig),
configPath, "DurableStateStore")
configPath
}
@@ -91,6 +95,17 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem)
pluginFor(pluginIdOrDefault(pluginId),
pluginConfig(pluginId)).scaladslPlugin.asInstanceOf[T]
}
+ /**
+ * Scala API: Returns the
[[pekko.persistence.state.scaladsl.DurableStateStore]] specified by the given
+ * configuration entry. The provided `runtimeConfig` is used to configure
the plugin at runtime, taking
+ * precedence over the plugin configuration defined in the actor system
configuration.
+ *
+ * @since 2.0.0
+ */
+ final def durableStateStoreFor[T <: scaladsl.DurableStateStore[?]](pluginId:
String, runtimeConfig: Config): T = {
+ pluginFor(pluginIdOrDefault(pluginId, runtimeConfig),
runtimeConfig).scaladslPlugin.asInstanceOf[T]
+ }
+
/**
* Java API: Returns the
[[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given
* configuration entry.
@@ -101,4 +116,18 @@ class DurableStateStoreRegistry(system:
ExtendedActorSystem)
pluginFor(pluginIdOrDefault(pluginId),
pluginConfig(pluginId)).javadslPlugin.asInstanceOf[T]
}
+ /**
+ * Java API: Returns the
[[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given
+ * configuration entry. The provided `runtimeConfig` is used to configure
the plugin at runtime, taking
+ * precedence over the plugin configuration defined in the actor system
configuration.
+ *
+ * @since 2.0.0
+ */
+ final def getDurableStateStoreFor[T <: javadsl.DurableStateStore[?]](
+ @nowarn("msg=never used") clazz: Class[T], // FIXME generic Class could
be problematic in Java
+ pluginId: String,
+ runtimeConfig: Config): T = {
+ pluginFor(pluginIdOrDefault(pluginId, runtimeConfig),
runtimeConfig).javadslPlugin.asInstanceOf[T]
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]