This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 645b76fbc9 Runtime plugin configuration for DurableState (#3058)
645b76fbc9 is described below

commit 645b76fbc91d0bda70374b79563ec17b15cb1de1
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Sun Jun 14 18:26:22 2026 +0300

    Runtime plugin configuration for DurableState (#3058)
    
    * Runtime plugin configuration for DurableState
    
    Motivation:
    Like EventSourcedBehavior, DurableStateBehavior should allow configuring its
    persistence plugin at runtime so the same plugin class can back multiple,
    isolated stores (e.g. the pekko-persistence-r2dbc plugin).
    
    Modification:
    Add DurableStateBehavior.withDurableStateStorePluginConfig (Scala) and an
    overridable durableStateStorePluginConfig (Java), threaded through
    DurableStateBehaviorImpl, DurableStateSettings, and BehaviorSetup. Add
    Config-accepting overloads of DurableStateStoreRegistry.durableStateStoreFor
    and getDurableStateStoreFor that merge the runtime config ahead of the 
system
    config. The recovery-timeout is now resolved from the merged config so a 
plugin
    defined only at runtime resolves correctly.
    
    Result:
    DurableStateBehavior can be configured with a runtime plugin Config, and the
    store registry resolves the plugin using it.
    
    Tests:
    - sbt persistence/mimaReportBinaryIssues 
persistence-typed/mimaReportBinaryIssues - success
    - sbt persistence-typed-tests/testOnly (all state.scaladsl and 
state.javadsl specs) - All tests passed
    
    References:
    Fixes #1798
    
    * Use new-file ASF header for RuntimeDurableStateStoreSpec
    
    * Address review for DurableState runtime config
    
    Motivation:
    Review feedback on PR #3058: the runtime `pluginConfig` parameter shadowed
    the private `pluginConfig(pluginId)` method, and the new runtime plugin
    configuration feature was only covered by a Scala spec.
    
    Modification:
    Rename the runtime `Config` parameter to `runtimeConfig` in
    `durableStateStoreFor`/`getDurableStateStoreFor` to avoid shadowing, and add
    a Java DSL `RuntimeDurableStateStoreTest` mirroring the Scala spec to keep
    the Scala and Java DSLs in test parity.
    
    Result:
    No name collision with the private method, and the Java DSL runtime plugin
    config and `getDurableStateStoreFor(clazz, pluginId, config)` are covered.
    
    Tests:
    - sbt "persistence-typed-tests/Test/testOnly 
org.apache.pekko.persistence.typed.state.javadsl.RuntimeDurableStateStoreTest" 
- Passed 1
    - sbt "persistence-typed-tests/Test/javafmtCheck" (JDK 17) - success
    - sbt "persistence-typed-tests/headerCheckAll" - success
    
    References:
    Refs #3058
---
 .../javadsl/RuntimeDurableStateStoreTest.java      | 160 +++++++++++++++++++++
 .../scaladsl/RuntimeDurableStateStoreSpec.scala    | 100 +++++++++++++
 .../durablestate-runtime-config.excludes           |  19 +++
 .../typed/state/internal/BehaviorSetup.scala       |   6 +-
 .../state/internal/DurableStateBehaviorImpl.scala  |  12 +-
 .../state/internal/DurableStateSettings.scala      |  34 ++++-
 .../typed/state/javadsl/DurableStateBehavior.scala |  18 ++-
 .../state/scaladsl/DurableStateBehavior.scala      |  10 ++
 .../state/DurableStateStoreRegistry.scala          |  33 ++++-
 9 files changed, 381 insertions(+), 11 deletions(-)

diff --git 
a/persistence-typed-tests/src/test/java/org/apache/pekko/persistence/typed/state/javadsl/RuntimeDurableStateStoreTest.java
 
b/persistence-typed-tests/src/test/java/org/apache/pekko/persistence/typed/state/javadsl/RuntimeDurableStateStoreTest.java
new file mode 100644
index 0000000000..49b2c7f8af
--- /dev/null
+++ 
b/persistence-typed-tests/src/test/java/org/apache/pekko/persistence/typed/state/javadsl/RuntimeDurableStateStoreTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.typed.state.javadsl;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.testkit.typed.annotations.JUnitJupiterTestKit;
+import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit;
+import org.apache.pekko.actor.testkit.typed.javadsl.JUnitJupiterTestKitBuilder;
+import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturingExtension;
+import 
org.apache.pekko.actor.testkit.typed.javadsl.TestKitJUnitJupiterExtension;
+import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe;
+import org.apache.pekko.actor.typed.ActorRef;
+import org.apache.pekko.actor.typed.Behavior;
+import org.apache.pekko.persistence.state.DurableStateStoreRegistry;
+import org.apache.pekko.persistence.state.javadsl.DurableStateUpdateStore;
+import org.apache.pekko.persistence.state.javadsl.GetObjectResult;
+import 
org.apache.pekko.persistence.testkit.state.PersistenceTestKitDurableStateStoreProvider;
+import org.apache.pekko.persistence.typed.PersistenceId;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(TestKitJUnitJupiterExtension.class)
+@ExtendWith(LogCapturingExtension.class)
+public class RuntimeDurableStateStoreTest {
+
+  @JUnitJupiterTestKit public ActorTestKit testKit = new 
JUnitJupiterTestKitBuilder().build();
+
+  static Config config(String store) {
+    return ConfigFactory.parseString(
+        store
+            + " {\n"
+            + "  state.class = \""
+            + PersistenceTestKitDurableStateStoreProvider.class.getName()
+            + "\"\n"
+            + "}\n");
+  }
+
+  interface Command {}
+
+  static final class Save implements Command {
+    final String text;
+    final ActorRef<Done> replyTo;
+
+    Save(String text, ActorRef<Done> replyTo) {
+      this.text = text;
+      this.replyTo = replyTo;
+    }
+  }
+
+  static final class ShowMeWhatYouGot implements Command {
+    final ActorRef<String> replyTo;
+
+    ShowMeWhatYouGot(ActorRef<String> replyTo) {
+      this.replyTo = replyTo;
+    }
+  }
+
+  enum Stop implements Command {
+    INSTANCE
+  }
+
+  static final class Actor extends DurableStateBehavior<Command, String> {
+    private final String store;
+
+    static Behavior<Command> create(String persistenceId, String store) {
+      return new Actor(persistenceId, store);
+    }
+
+    private Actor(String persistenceId, String store) {
+      super(PersistenceId.ofUniqueId(persistenceId));
+      this.store = store;
+    }
+
+    @Override
+    public String emptyState() {
+      return "";
+    }
+
+    @Override
+    public CommandHandler<Command, String> commandHandler() {
+      return newCommandHandlerBuilder()
+          .forAnyState()
+          .onCommand(Save.class, this::onSave)
+          .onCommand(ShowMeWhatYouGot.class, this::onShow)
+          .onCommand(Stop.class, (state, cmd) -> Effect().stop())
+          .build();
+    }
+
+    private Effect<String> onSave(String state, Save cmd) {
+      String newState =
+          Stream.of(state, cmd.text).filter(s -> 
!s.isEmpty()).collect(Collectors.joining("|"));
+      return Effect().persist(newState).thenRun(() -> 
cmd.replyTo.tell(Done.getInstance()));
+    }
+
+    private Effect<String> onShow(String state, ShowMeWhatYouGot cmd) {
+      cmd.replyTo.tell(state);
+      return Effect().none();
+    }
+
+    @Override
+    public String durableStateStorePluginId() {
+      return store + ".state";
+    }
+
+    @Override
+    public Optional<Config> durableStateStorePluginConfig() {
+      return Optional.of(config(store));
+    }
+  }
+
+  @Test
+  public void configureAtRuntimeInIsolatedInstances() throws Exception {
+    TestProbe<Done> probe = testKit.createTestProbe();
+
+    // one actor in each store with same id
+    ActorRef<Command> s1 = testKit.spawn(Actor.create("id1", "store1"));
+    ActorRef<Command> s2 = testKit.spawn(Actor.create("id1", "store2"));
+    s1.tell(new Save("s1m1", probe.ref()));
+    probe.receiveMessage();
+    s2.tell(new Save("s2m1", probe.ref()));
+    probe.receiveMessage();
+
+    assertStore("store1", "s1m1");
+    assertStore("store2", "s2m1");
+  }
+
+  private void assertStore(String store, String expectedState) throws 
Exception {
+    @SuppressWarnings("unchecked")
+    DurableStateUpdateStore<String> durableStateStore =
+        DurableStateStoreRegistry.get(testKit.system())
+            .getDurableStateStoreFor(
+                DurableStateUpdateStore.class, store + ".state", 
config(store));
+    GetObjectResult<String> result =
+        durableStateStore.getObject("id1").toCompletableFuture().get(3, 
TimeUnit.SECONDS);
+    assertEquals(Optional.of(expectedState), result.value());
+  }
+}
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala
new file mode 100644
index 0000000000..4dc4bdb9e4
--- /dev/null
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/RuntimeDurableStateStoreSpec.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.typed.state.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.Behavior
+import pekko.persistence.state.DurableStateStoreRegistry
+import pekko.persistence.state.scaladsl.DurableStateUpdateStore
+import 
pekko.persistence.testkit.state.PersistenceTestKitDurableStateStoreProvider
+import pekko.persistence.typed.PersistenceId
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object RuntimeDurableStateStoreSpec {
+
+  private object Actor {
+    sealed trait Command
+    final case class Save(text: String, replyTo: ActorRef[Done]) extends 
Command
+    final case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends 
Command
+    case object Stop extends Command
+
+    def apply(persistenceId: String, store: String): Behavior[Command] =
+      DurableStateBehavior[Command, String](
+        PersistenceId.ofUniqueId(persistenceId),
+        "",
+        (state, cmd) =>
+          cmd match {
+            case Save(text, replyTo) =>
+              Effect.persist(Seq(state, 
text).filter(_.nonEmpty).mkString("|")).thenRun(_ => replyTo ! Done)
+            case ShowMeWhatYouGot(replyTo) =>
+              replyTo ! state
+              Effect.none
+            case Stop =>
+              Effect.stop()
+          })
+        .withDurableStateStorePluginId(s"$store.state")
+        .withDurableStateStorePluginConfig(Some(config(store)))
+  }
+
+  private def config(store: String): Config =
+    ConfigFactory.parseString(s"""
+      $store {
+        state.class = 
"${classOf[PersistenceTestKitDurableStateStoreProvider].getName}"
+      }
+    """)
+}
+
+class RuntimeDurableStateStoreSpec extends ScalaTestWithActorTestKit with 
AnyWordSpecLike with LogCapturing {
+
+  import RuntimeDurableStateStoreSpec._
+
+  "The durable state store plugin" must {
+
+    "be possible to configure at runtime and use in multiple isolated 
instances" in {
+      val probe = createTestProbe[Any]()
+
+      {
+        // one actor in each store with same id
+        val s1 = spawn(Actor("id1", "store1"))
+        val s2 = spawn(Actor("id1", "store2"))
+        s1 ! Actor.Save("s1m1", probe.ref)
+        probe.receiveMessage()
+        s2 ! Actor.Save("s2m1", probe.ref)
+        probe.receiveMessage()
+      }
+
+      {
+        def assertStore(store: String, expectedState: String) = {
+          val durableStateStore = DurableStateStoreRegistry(system)
+            
.durableStateStoreFor[DurableStateUpdateStore[String]](s"$store.state", 
config(store))
+          durableStateStore.getObject("id1").futureValue.value shouldBe 
Some(expectedState)
+        }
+
+        assertStore("store1", "s1m1")
+        assertStore("store2", "s2m1")
+      }
+    }
+  }
+}
diff --git 
a/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes
 
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes
new file mode 100644
index 0000000000..b86dd3685c
--- /dev/null
+++ 
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/durablestate-runtime-config.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Add DurableStateBehavior.withDurableStateStorePluginConfig
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.state.scaladsl.DurableStateBehavior.withDurableStateStorePluginConfig")
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala
index c74421ab20..332bc62e25 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/BehaviorSetup.scala
@@ -30,6 +30,8 @@ import 
pekko.persistence.typed.state.internal.InternalProtocol.RecoveryTimeout
 import pekko.persistence.typed.state.scaladsl.DurableStateBehavior
 import pekko.util.OptionVal
 
+import com.typesafe.config.ConfigFactory
+
 import org.slf4j.Logger
 import org.slf4j.MDC
 
@@ -57,7 +59,9 @@ private[pekko] final class BehaviorSetup[C, S](
   // Any instead S because adapter may change the type
   val durableStateStore: DurableStateUpdateStore[Any] =
     DurableStateStoreRegistry(context.system.toClassic)
-      
.durableStateStoreFor[DurableStateUpdateStore[Any]](settings.durableStateStorePluginId)
+      .durableStateStoreFor[DurableStateUpdateStore[Any]](
+        settings.durableStateStorePluginId,
+        settings.durableStateStorePluginConfig.getOrElse(ConfigFactory.empty))
 
   def selfClassic: ClassicActorRef = context.self.toClassic
 
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
index 65061dffe7..bb6662d07d 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
@@ -35,6 +35,8 @@ import pekko.persistence.typed.PersistenceId
 import pekko.persistence.typed.SnapshotAdapter
 import pekko.persistence.typed.state.scaladsl._
 
+import com.typesafe.config.Config
+
 import org.slf4j.LoggerFactory
 
 @InternalApi
@@ -60,6 +62,7 @@ private[pekko] final case class 
DurableStateBehaviorImpl[Command, State](
     commandHandler: DurableStateBehavior.CommandHandler[Command, State],
     loggerClass: Class[?],
     durableStateStorePluginId: Option[String] = None,
+    durableStateStorePluginConfig: Option[Config] = None,
     tag: String = "",
     snapshotAdapter: SnapshotAdapter[State] = 
NoOpSnapshotAdapter.instance[State],
     supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
@@ -80,7 +83,11 @@ private[pekko] final case class 
DurableStateBehaviorImpl[Command, State](
       case _                                => false
     }
     if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
-    val settings = DurableStateSettings(ctx.system, 
durableStateStorePluginId.getOrElse(""), customStashCapacity)
+    val settings = DurableStateSettings(
+      ctx.system,
+      durableStateStorePluginId.getOrElse(""),
+      durableStateStorePluginConfig,
+      customStashCapacity)
 
     // stashState outside supervise because StashState should survive restarts 
due to persist failures
     val stashState = new 
StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
@@ -170,6 +177,9 @@ private[pekko] final case class 
DurableStateBehaviorImpl[Command, State](
     copy(durableStateStorePluginId = if (id != "") Some(id) else None)
   }
 
+  override def withDurableStateStorePluginConfig(config: Option[Config]): 
DurableStateBehavior[Command, State] =
+    copy(durableStateStorePluginConfig = config)
+
   override def withTag(tag: String): DurableStateBehavior[Command, State] =
     copy(tag = tag)
 
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
index 33082fcb3d..5071c659a7 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateSettings.scala
@@ -34,11 +34,25 @@ import com.typesafe.config.Config
       system: ActorSystem[?],
       durableStateStorePluginId: String,
       customStashCapacity: Option[Int]): DurableStateSettings =
-    apply(system.settings.config, durableStateStorePluginId, 
customStashCapacity)
+    apply(system, durableStateStorePluginId, None, customStashCapacity)
+
+  def apply(
+      system: ActorSystem[?],
+      durableStateStorePluginId: String,
+      durableStateStorePluginConfig: Option[Config],
+      customStashCapacity: Option[Int]): DurableStateSettings =
+    apply(system.settings.config, durableStateStorePluginId, 
durableStateStorePluginConfig, customStashCapacity)
+
+  def apply(
+      config: Config,
+      durableStateStorePluginId: String,
+      customStashCapacity: Option[Int]): DurableStateSettings =
+    apply(config, durableStateStorePluginId, None, customStashCapacity)
 
   def apply(
       config: Config,
       durableStateStorePluginId: String,
+      durableStateStorePluginConfig: Option[Config],
       customStashCapacity: Option[Int]): DurableStateSettings = {
     val typedConfig = config.getConfig("pekko.persistence.typed")
 
@@ -54,7 +68,8 @@ import com.typesafe.config.Config
 
     val logOnStashing = typedConfig.getBoolean("log-stashing")
 
-    val durableStateStoreConfig = durableStateStoreConfigFor(config, 
durableStateStorePluginId)
+    val durableStateStoreConfig =
+      durableStateStoreConfigFor(config, durableStateStorePluginId, 
durableStateStorePluginConfig)
     val recoveryTimeout: FiniteDuration =
       durableStateStoreConfig.getDuration("recovery-timeout", 
TimeUnit.MILLISECONDS).millis
 
@@ -69,20 +84,26 @@ import com.typesafe.config.Config
       logOnStashing = logOnStashing,
       recoveryTimeout,
       durableStateStorePluginId,
+      durableStateStorePluginConfig,
       useContextLoggerForInternalLogging,
       recurseWhenUnstashingReadOnlyCommands)
   }
 
-  private def durableStateStoreConfigFor(config: Config, pluginId: String): 
Config = {
+  private def durableStateStoreConfigFor(
+      config: Config,
+      pluginId: String,
+      pluginConfig: Option[Config]): Config = {
+    val mergedConfig = 
pluginConfig.map(_.withFallback(config)).getOrElse(config)
+
     def defaultPluginId = {
-      val configPath = config.getString("pekko.persistence.state.plugin")
+      val configPath = mergedConfig.getString("pekko.persistence.state.plugin")
       Persistence.verifyPluginConfigIsDefined(configPath, "Default 
DurableStateStore")
       configPath
     }
 
     val configPath = if (pluginId == "") defaultPluginId else pluginId
-    Persistence.verifyPluginConfigExists(config, configPath, 
"DurableStateStore")
-    
config.getConfig(configPath).withFallback(config.getConfig("pekko.persistence.state-plugin-fallback"))
+    Persistence.verifyPluginConfigExists(mergedConfig, configPath, 
"DurableStateStore")
+    
mergedConfig.getConfig(configPath).withFallback(mergedConfig.getConfig("pekko.persistence.state-plugin-fallback"))
   }
 
 }
@@ -97,6 +118,7 @@ private[pekko] final case class DurableStateSettings(
     logOnStashing: Boolean,
     recoveryTimeout: FiniteDuration,
     durableStateStorePluginId: String,
+    durableStateStorePluginConfig: Option[Config],
     useContextLoggerForInternalLogging: Boolean,
     recurseWhenUnstashingReadOnlyCommands: Boolean) {
 
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala
index 54b3b6003a..4af86d9582 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/javadsl/DurableStateBehavior.scala
@@ -28,6 +28,10 @@ import pekko.persistence.typed.SnapshotAdapter
 import pekko.persistence.typed.state.internal
 import pekko.persistence.typed.state.internal._
 import pekko.persistence.typed.state.scaladsl
+
+import scala.jdk.OptionConverters._
+
+import com.typesafe.config.Config
 import org.jspecify.annotations.Nullable
 
 /**
@@ -114,6 +118,14 @@ abstract class DurableStateBehavior[Command, State] 
private[pekko] (
    */
   def durableStateStorePluginId: String = ""
 
+  /**
+   * Override and define the `DurableStateStore` plugin config that this actor 
should use instead of the default.
+   * This is useful when the same plugin class is configured for multiple, 
isolated stores at runtime.
+   *
+   * @since 2.0.0
+   */
+  def durableStateStorePluginConfig: Optional[Config] = Optional.empty()
+
   /**
    * The tag that can be used in persistence query.
    */
@@ -140,7 +152,11 @@ abstract class DurableStateBehavior[Command, State] 
private[pekko] (
       persistenceId,
       emptyState,
       (state, cmd) => commandHandler()(state, 
cmd).asInstanceOf[EffectImpl[State]],
-      
getClass).withTag(tag).snapshotAdapter(snapshotAdapter()).withDurableStateStorePluginId(durableStateStorePluginId)
+      getClass)
+      .withTag(tag)
+      .snapshotAdapter(snapshotAdapter())
+      .withDurableStateStorePluginId(durableStateStorePluginId)
+      .withDurableStateStorePluginConfig(durableStateStorePluginConfig.toScala)
 
     val handler = signalHandler()
     val behaviorWithSignalHandler =
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
index ec44f3abfa..3a014744bf 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehavior.scala
@@ -15,6 +15,8 @@ package org.apache.pekko.persistence.typed.state.scaladsl
 
 import scala.annotation.tailrec
 
+import com.typesafe.config.Config
+
 import org.apache.pekko
 import pekko.actor.typed.BackoffSupervisorStrategy
 import pekko.actor.typed.Behavior
@@ -146,6 +148,14 @@ object DurableStateBehavior {
    */
   def withDurableStateStorePluginId(id: String): DurableStateBehavior[Command, 
State]
 
+  /**
+   * Change the `DurableStateStore` plugin config that this actor should use.
+   * This is useful when the same plugin class is configured for multiple, 
isolated stores at runtime.
+   *
+   * @since 2.0.0
+   */
+  def withDurableStateStorePluginConfig(config: Option[Config]): 
DurableStateBehavior[Command, State]
+
   /**
    * The tag that can used in persistence query
    */
diff --git 
a/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala
 
b/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala
index 64beff8d51..bc6704ebb0 100644
--- 
a/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala
+++ 
b/persistence/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreRegistry.scala
@@ -30,6 +30,7 @@ import pekko.persistence.PluginProvider
 import pekko.persistence.state.scaladsl.DurableStateStore
 
 import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
 
 /**
  * Persistence extension for queries.
@@ -67,9 +68,12 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem)
     configPath
   }
 
-  private def pluginIdOrDefault(pluginId: String): String = {
+  private def pluginIdOrDefault(pluginId: String): String =
+    pluginIdOrDefault(pluginId, ConfigFactory.empty)
+
+  private def pluginIdOrDefault(pluginId: String, pluginConfig: Config): 
String = {
     val configPath = if (isEmpty(pluginId)) defaultPluginId else pluginId
-    Persistence.verifyPluginConfigExists(systemConfig, configPath, 
"DurableStateStore")
+    
Persistence.verifyPluginConfigExists(pluginConfig.withFallback(systemConfig), 
configPath, "DurableStateStore")
     configPath
   }
 
@@ -91,6 +95,17 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem)
     pluginFor(pluginIdOrDefault(pluginId), 
pluginConfig(pluginId)).scaladslPlugin.asInstanceOf[T]
   }
 
+  /**
+   * Scala API: Returns the 
[[pekko.persistence.state.scaladsl.DurableStateStore]] specified by the given
+   * configuration entry. The provided `runtimeConfig` is used to configure 
the plugin at runtime, taking
+   * precedence over the plugin configuration defined in the actor system 
configuration.
+   *
+   * @since 2.0.0
+   */
+  final def durableStateStoreFor[T <: scaladsl.DurableStateStore[?]](pluginId: 
String, runtimeConfig: Config): T = {
+    pluginFor(pluginIdOrDefault(pluginId, runtimeConfig), 
runtimeConfig).scaladslPlugin.asInstanceOf[T]
+  }
+
   /**
    * Java API: Returns the 
[[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given
    * configuration entry.
@@ -101,4 +116,18 @@ class DurableStateStoreRegistry(system: 
ExtendedActorSystem)
     pluginFor(pluginIdOrDefault(pluginId), 
pluginConfig(pluginId)).javadslPlugin.asInstanceOf[T]
   }
 
+  /**
+   * Java API: Returns the 
[[pekko.persistence.state.javadsl.DurableStateStore]] specified by the given
+   * configuration entry. The provided `runtimeConfig` is used to configure 
the plugin at runtime, taking
+   * precedence over the plugin configuration defined in the actor system 
configuration.
+   *
+   * @since 2.0.0
+   */
+  final def getDurableStateStoreFor[T <: javadsl.DurableStateStore[?]](
+      @nowarn("msg=never used") clazz: Class[T], // FIXME generic Class could 
be problematic in Java
+      pluginId: String,
+      runtimeConfig: Config): T = {
+    pluginFor(pluginIdOrDefault(pluginId, runtimeConfig), 
runtimeConfig).javadslPlugin.asInstanceOf[T]
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to