This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new fd5d87b35a Port akka-core PR #31889: fix coordinator role in cluster 
sharding (#2918)
fd5d87b35a is described below

commit fd5d87b35ac49f28a9ded27456386b98225d2280
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 27 10:29:48 2026 +0200

    Port akka-core PR #31889: fix coordinator role in cluster sharding (#2918)
    
    * Port akka-core PR #31889: fix coordinator role in cluster sharding
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/cb8ef4a6-c1d0-4b62-9062-a08e1cd41cfe
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix compile error: use explicit val for rememberEntities in coordinator 
role spec
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/5359fe45-b406-4eab-8f69-1e169fa4bfda
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 cluster-sharding/src/main/resources/reference.conf |   5 +-
 .../pekko/cluster/sharding/ClusterSharding.scala   | 197 ++++++++++++---------
 .../cluster/sharding/ClusterShardingSettings.scala |  11 ++
 .../pekko/cluster/sharding/ShardRegion.scala       |  11 +-
 .../internal/DDataRememberEntitiesShardStore.scala |   1 -
 .../ClusterShardingCoordinatorRoleSpec.scala       | 175 ++++++++++++++++++
 .../apache/pekko/cluster/ddata/Replicator.scala    |   2 +-
 7 files changed, 312 insertions(+), 90 deletions(-)

diff --git a/cluster-sharding/src/main/resources/reference.conf 
b/cluster-sharding/src/main/resources/reference.conf
index d079825d10..65de646647 100644
--- a/cluster-sharding/src/main/resources/reference.conf
+++ b/cluster-sharding/src/main/resources/reference.conf
@@ -368,8 +368,9 @@ pekko.cluster.sharding {
   coordinator-singleton = ${pekko.cluster.singleton}
 
 
-  # Copies the role for the coordinator singleton from the shards role instead 
of using the one provided in the
-  # "pekko.cluster.sharding.coordinator-singleton.role"
+  # By default, the role for the coordinator singleton is the same as the role 
for the shards
+  # "pekko.cluster.sharding.role". Set this to off to use the role from
+  # "pekko.cluster.sharding.coordinator-singleton.role" for the coordinator 
singleton.
   coordinator-singleton-role-override = on
 
   coordinator-state {
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
index 6eee343108..d8515977a4 100755
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
@@ -318,6 +318,12 @@ class ClusterSharding(system: ExtendedActorSystem) extends 
Extension {
     } else {
       log.debug("Starting Shard Region Proxy [{}] (no actors will be hosted on 
this node)...", typeName)
 
+      if (settings.shouldHostCoordinator(cluster)) {
+        // when using another coordinator role than the shard role the 
coordinator may have to be started
+        val startCoordinatorMsg = StartCoordinatorIfNeeded(typeName, settings, 
allocationStrategy)
+        guardian ! startCoordinatorMsg
+      }
+
       startProxy(
         typeName,
         settings.role,
@@ -702,6 +708,11 @@ private[pekko] object ClusterShardingGuardian {
       extractEntityId: ShardRegion.ExtractEntityId,
       extractShardId: ShardRegion.ExtractShardId)
       extends NoSerializationVerificationNeeded
+  final case class StartCoordinatorIfNeeded(
+      typeName: String,
+      settings: ClusterShardingSettings,
+      allocationStrategy: ShardAllocationStrategy)
+      extends NoSerializationVerificationNeeded
   final case class Started(shardRegion: ActorRef) extends 
NoSerializationVerificationNeeded
 }
 
@@ -711,6 +722,7 @@ private[pekko] object ClusterShardingGuardian {
  */
 private[pekko] class ClusterShardingGuardian extends Actor {
   import ClusterShardingGuardian._
+  import ShardCoordinator.ShardAllocationStrategy
 
   val cluster = Cluster(context.system)
   val sharding = ClusterSharding(context.system)
@@ -724,35 +736,106 @@ private[pekko] class ClusterShardingGuardian extends 
Actor {
   private def coordinatorPath(encName: String): String =
     (self.path / coordinatorSingletonManagerName(encName) / "singleton" / 
"coordinator").toStringWithoutAddress
 
-  private def replicatorSettings(shardingSettings: ClusterShardingSettings) = {
+  private def replicatorSettings(role: Option[String], settings: 
ClusterShardingSettings) = {
     val configuredSettings =
       
ReplicatorSettings(context.system.settings.config.getConfig("pekko.cluster.sharding.distributed-data"))
     // Use members within the data center and with the given role (if any)
-    val replicatorRoles = Set(ClusterSettings.DcRolePrefix + 
cluster.settings.SelfDataCenter) ++ shardingSettings.role
+    val replicatorRoles = Set(ClusterSettings.DcRolePrefix + 
cluster.settings.SelfDataCenter) ++ role
     val settingsWithRoles = configuredSettings.withRoles(replicatorRoles)
-    if (shardingSettings.rememberEntities)
+    if (settings.rememberEntities)
       settingsWithRoles
     else
       settingsWithRoles.withDurableKeys(Set.empty[String])
   }
 
-  private def replicator(settings: ClusterShardingSettings): ActorRef = {
-    if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData 
||
-      settings.stateStoreMode == 
ClusterShardingSettings.RememberEntitiesStoreCustom) {
-      // one Replicator per role
-      replicatorByRole.get(settings.role) match {
-        case Some(ref) => ref
-        case None      =>
-          val name = settings.role match {
-            case Some(r) => URLEncoder.encode(r, ByteString.UTF_8) + 
"Replicator"
-            case None    => "replicator"
-          }
-          val ref = 
context.actorOf(Replicator.props(replicatorSettings(settings)), name)
-          replicatorByRole = replicatorByRole.updated(settings.role, ref)
-          ref
-      }
-    } else
-      context.system.deadLetters
+  private def replicator(role: Option[String], settings: 
ClusterShardingSettings): ActorRef = {
+    // one Replicator per role
+    replicatorByRole.get(role) match {
+      case Some(ref) => ref
+      case None      =>
+        val name = role match {
+          case Some(r) => URLEncoder.encode(r, ByteString.UTF_8) + "Replicator"
+          case None    => "replicator"
+        }
+        val ref = context.actorOf(Replicator.props(replicatorSettings(role, 
settings)), name)
+        replicatorByRole = replicatorByRole.updated(role, ref)
+        ref
+    }
+  }
+
+  private def rememberEntitiesStoreProvider(
+      typeName: String,
+      settings: ClusterShardingSettings): Option[RememberEntitiesProvider] = {
+    if (!settings.rememberEntities) None
+    else {
+      // with the deprecated persistence state store mode we always use the 
event sourced provider for shard regions
+      // and no store for coordinator (the coordinator is a PersistentActor in 
that case)
+      val rememberEntitiesProvider =
+        if (settings.stateStoreMode == 
ClusterShardingSettings.StateStoreModePersistence) {
+          ClusterShardingSettings.RememberEntitiesStoreEventsourced
+        } else {
+          settings.rememberEntitiesStore
+        }
+      Some(rememberEntitiesProvider match {
+        case ClusterShardingSettings.RememberEntitiesStoreDData =>
+          // must run Replicator for DDataRememberEntities on both shard and 
coordinator roles (if different)
+          val role = if (settings.role == settings.coordinatorSingletonRole) 
settings.role else None
+          val rep = replicator(role, settings)
+          new DDataRememberEntitiesProvider(typeName, settings, 
majorityMinCap, rep)
+        case ClusterShardingSettings.RememberEntitiesStoreEventsourced =>
+          new EventSourcedRememberEntitiesProvider(typeName, settings)
+        case ClusterShardingSettings.RememberEntitiesStoreCustom =>
+          new CustomStateStoreModeProvider(typeName, context.system, settings)
+        case unknown =>
+          throw new IllegalArgumentException(s"Unknown store type: $unknown") 
// compiler exhaustiveness check pleaser
+      })
+    }
+  }
+
+  private def startCoordinatorIfNeeded(
+      typeName: String,
+      allocationStrategy: ShardAllocationStrategy,
+      rememberEntitiesStoreProvider: Option[RememberEntitiesProvider],
+      settings: ClusterShardingSettings): Unit = {
+    import settings.tuningParameters.coordinatorFailureBackoff
+
+    val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
+    val cName = coordinatorSingletonManagerName(encName)
+    if (settings.shouldHostCoordinator(cluster) && 
context.child(cName).isEmpty) {
+      val coordinatorProps =
+        if (settings.stateStoreMode == 
ClusterShardingSettings.StateStoreModePersistence) {
+          ShardCoordinator.props(typeName, settings, allocationStrategy)
+        } else {
+          val rep = replicator(settings.coordinatorSingletonRole, settings)
+          ShardCoordinator.props(
+            typeName,
+            settings,
+            allocationStrategy,
+            rep,
+            majorityMinCap,
+            rememberEntitiesStoreProvider)
+        }
+      val singletonProps =
+        BackoffOpts
+          .onStop(
+            childProps = coordinatorProps,
+            childName = "coordinator",
+            minBackoff = coordinatorFailureBackoff,
+            maxBackoff = coordinatorFailureBackoff * 5,
+            randomFactor = 0.2)
+          .withFinalStopMessage(_ == ShardCoordinator.Internal.Terminate)
+          .props
+          .withDeploy(Deploy.local)
+
+      val singletonSettings =
+        
settings.coordinatorSingletonSettings.withSingletonName("singleton").withRole(settings.coordinatorSingletonRole)
+
+      context.actorOf(
+        ClusterSingletonManager
+          .props(singletonProps, terminationMessage = 
ShardCoordinator.Internal.Terminate, singletonSettings)
+          .withDispatcher(context.props.dispatcher),
+        name = cName)
+    }
   }
 
   def receive: Receive = {
@@ -765,69 +848,11 @@ private[pekko] class ClusterShardingGuardian extends 
Actor {
           allocationStrategy,
           handOffStopMessage) =>
       try {
-        import settings.role
-        import settings.tuningParameters.coordinatorFailureBackoff
-
-        val rep = replicator(settings)
-        val rememberEntitiesStoreProvider: Option[RememberEntitiesProvider] =
-          if (!settings.rememberEntities) None
-          else {
-            // with the deprecated persistence state store mode we always use 
the event sourced provider for shard regions
-            // and no store for coordinator (the coordinator is a 
PersistentActor in that case)
-            val rememberEntitiesProvider =
-              if (settings.stateStoreMode == 
ClusterShardingSettings.StateStoreModePersistence) {
-                ClusterShardingSettings.RememberEntitiesStoreEventsourced
-              } else {
-                settings.rememberEntitiesStore
-              }
-            Some(rememberEntitiesProvider match {
-              case ClusterShardingSettings.RememberEntitiesStoreDData =>
-                new DDataRememberEntitiesProvider(typeName, settings, 
majorityMinCap, rep)
-              case ClusterShardingSettings.RememberEntitiesStoreEventsourced =>
-                new EventSourcedRememberEntitiesProvider(typeName, settings)
-              case ClusterShardingSettings.RememberEntitiesStoreCustom =>
-                new CustomStateStoreModeProvider(typeName, context.system, 
settings)
-              case unknown =>
-                throw new IllegalArgumentException(s"Unknown store type: 
$unknown") // compiler exhaustiveness check pleaser
-            })
-          }
-
         val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
-        val cName = coordinatorSingletonManagerName(encName)
         val cPath = coordinatorPath(encName)
         val shardRegion = context.child(encName).getOrElse {
-          if (context.child(cName).isEmpty) {
-            val coordinatorProps =
-              if (settings.stateStoreMode == 
ClusterShardingSettings.StateStoreModePersistence) {
-                ShardCoordinator.props(typeName, settings, allocationStrategy)
-              } else {
-                ShardCoordinator
-                  .props(typeName, settings, allocationStrategy, rep, 
majorityMinCap, rememberEntitiesStoreProvider)
-              }
-            val singletonProps =
-              BackoffOpts
-                .onStop(
-                  childProps = coordinatorProps,
-                  childName = "coordinator",
-                  minBackoff = coordinatorFailureBackoff,
-                  maxBackoff = coordinatorFailureBackoff * 5,
-                  randomFactor = 0.2)
-                .withFinalStopMessage(_ == ShardCoordinator.Internal.Terminate)
-                .props
-                .withDeploy(Deploy.local)
-
-            val singletonSettings = if 
(settings.coordinatorSingletonOverrideRole) {
-              
settings.coordinatorSingletonSettings.withSingletonName("singleton").withRole(role)
-            } else {
-              
settings.coordinatorSingletonSettings.withSingletonName("singleton")
-            }
-
-            context.actorOf(
-              ClusterSingletonManager
-                .props(singletonProps, terminationMessage = 
ShardCoordinator.Internal.Terminate, singletonSettings)
-                .withDispatcher(context.props.dispatcher),
-              name = cName)
-          }
+          val remEntitiesStoreProvider = 
rememberEntitiesStoreProvider(typeName, settings)
+          startCoordinatorIfNeeded(typeName, allocationStrategy, 
remEntitiesStoreProvider, settings)
 
           context.actorOf(
             ShardRegion
@@ -839,7 +864,7 @@ private[pekko] class ClusterShardingGuardian extends Actor {
                 extractEntityId = extractEntityId,
                 extractShardId = extractShardId,
                 handOffStopMessage = handOffStopMessage,
-                rememberEntitiesStoreProvider)
+                remEntitiesStoreProvider)
               .withDispatcher(context.props.dispatcher),
             name = encName)
         }
@@ -854,6 +879,7 @@ private[pekko] class ClusterShardingGuardian extends Actor {
 
     case StartProxy(typeName, dataCenter, settings, extractEntityId, 
extractShardId) =>
       try {
+
         val encName = URLEncoder.encode(s"${typeName}Proxy", ByteString.UTF_8)
         val cPath = coordinatorPath(URLEncoder.encode(typeName, 
ByteString.UTF_8))
         // it must be possible to start several proxies, one per data center
@@ -882,6 +908,15 @@ private[pekko] class ClusterShardingGuardian extends Actor 
{
           sender() ! Status.Failure(e)
       }
 
+    case StartCoordinatorIfNeeded(typeName, settings, allocationStrategy) =>
+      try {
+        val remEntitiesStoreProvider = rememberEntitiesStoreProvider(typeName, 
settings)
+        startCoordinatorIfNeeded(typeName, allocationStrategy, 
remEntitiesStoreProvider, settings)
+      } catch {
+        case NonFatal(_) =>
+        // don't restart
+      }
+
   }
 
 }
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
index f57df942b3..bb1f5c1445 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
@@ -984,6 +984,17 @@ final class ClusterShardingSettings(
   private[pekko] def shouldHostShard(cluster: Cluster): Boolean =
     role.forall(cluster.selfMember.roles.contains)
 
+  /** If true, this node should run the shard coordinator, otherwise just a 
shard proxy or shard region on this node. */
+  @InternalApi
+  private[pekko] def shouldHostCoordinator(cluster: Cluster): Boolean =
+    coordinatorSingletonRole.forall(cluster.selfMember.roles.contains)
+
+  /**
+   * INTERNAL API
+   */
+  @InternalApi private[pekko] def coordinatorSingletonRole: Option[String] =
+    if (coordinatorSingletonOverrideRole) role else 
coordinatorSingletonSettings.role
+
   @InternalApi
   private[pekko] val passivationStrategy: 
ClusterShardingSettings.PassivationStrategy =
     ClusterShardingSettings.PassivationStrategy(this)
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
index a6aca98d88..d6347f4ee6 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
@@ -641,6 +641,7 @@ private[pekko] class ShardRegion(
 
   // sort by age, oldest first
   val ageOrdering = Member.ageOrdering
+  // membersByAge is only used for tracking where coordinator is running
   var membersByAge: immutable.SortedSet[Member] = 
immutable.SortedSet.empty(ageOrdering)
   // membersByAge contains members with these status
   private val memberStatusOfInterest: Set[MemberStatus] =
@@ -721,8 +722,8 @@ private[pekko] class ShardRegion(
     case None    => ClusterSettings.DcRolePrefix + 
cluster.settings.SelfDataCenter
   }
 
-  def matchingRole(member: Member): Boolean =
-    member.hasRole(targetDcRole) && role.forall(member.hasRole)
+  def matchingCoordinatorRole(member: Member): Boolean =
+    member.hasRole(targetDcRole) && 
settings.coordinatorSingletonRole.forall(member.hasRole)
 
   /**
    * When leaving the coordinator singleton is started rather quickly on next
@@ -784,7 +785,7 @@ private[pekko] class ShardRegion(
     changeMembers(
       immutable.SortedSet
         .empty(ageOrdering)
-        .union(state.members.filter(m => memberStatusOfInterest(m.status) && 
matchingRole(m))))
+        .union(state.members.filter(m => memberStatusOfInterest(m.status) && 
matchingCoordinatorRole(m))))
   }
 
   def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match {
@@ -798,7 +799,7 @@ private[pekko] class ShardRegion(
     case MemberRemoved(m, _) =>
       if (m.uniqueAddress == cluster.selfUniqueAddress)
         context.stop(self)
-      else if (matchingRole(m))
+      else if (matchingCoordinatorRole(m))
         changeMembers(membersByAge.filterNot(_.uniqueAddress == 
m.uniqueAddress))
 
     case MemberDowned(m) =>
@@ -818,7 +819,7 @@ private[pekko] class ShardRegion(
   }
 
   private def addMember(m: Member): Unit = {
-    if (matchingRole(m) && memberStatusOfInterest(m.status)) {
+    if (matchingCoordinatorRole(m) && memberStatusOfInterest(m.status)) {
       // replace, it's possible that the status, or upNumber is changed
       changeMembers(membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress) 
+ m)
     }
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
index 0ff285c06f..9667bc60af 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
@@ -92,7 +92,6 @@ private[pekko] final class DDataRememberEntitiesShardStore(
     extends Actor
     with Stash
     with ActorLogging {
-
   import DDataRememberEntitiesShardStore._
 
   implicit val ec: ExecutionContext = context.dispatcher
diff --git 
a/cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sharding/ClusterShardingCoordinatorRoleSpec.scala
 
b/cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sharding/ClusterShardingCoordinatorRoleSpec.scala
new file mode 100644
index 0000000000..d1bb7c545b
--- /dev/null
+++ 
b/cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sharding/ClusterShardingCoordinatorRoleSpec.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2019-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.cluster.sharding
+
+import scala.concurrent.duration._
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+
+import org.apache.pekko
+import pekko.actor.ActorRef
+import pekko.actor.PoisonPill
+import pekko.actor.Props
+import pekko.cluster.sharding.MultiNodeClusterShardingSpec.EntityActor
+import pekko.testkit._
+
+class ClusterShardingCoordinatorRoleSpecConfig(
+    mode: String,
+    rememberEntities: Boolean,
+    rememberEntitiesStore: String = 
ClusterShardingSettings.RememberEntitiesStoreDData)
+    extends MultiNodeClusterShardingConfig(
+      mode,
+      rememberEntities,
+      rememberEntitiesStore = rememberEntitiesStore,
+      additionalConfig = """
+  pekko.cluster.sharding {
+    role = shard
+    coordinator-singleton-role-override = off
+    coordinator-singleton.role = coordinator
+    entity-restart-backoff = 100 ms
+  }  
+  """) {
+
+  val first = role("first")
+  val second = role("second")
+  val third = role("third")
+  val fourth = role("fourth")
+
+  val coordinatorConfig: Config = 
ConfigFactory.parseString("""pekko.cluster.roles = [ "coordinator" ]""")
+  val shardConfig: Config = ConfigFactory.parseString("""pekko.cluster.roles = 
[ "shard" ]""")
+
+  nodeConfig(first, second)(coordinatorConfig)
+
+  nodeConfig(third, fourth)(shardConfig)
+
+}
+
+class DDataClusterShardingCoordinatorRoleSpec
+    extends ClusterShardingCoordinatorRoleSpec(
+      new ClusterShardingCoordinatorRoleSpecConfig(
+        ClusterShardingSettings.StateStoreModeDData,
+        rememberEntities = false))
+class DDataClusterShardingCoordinatorRoleSpecMultiJvmNode1 extends 
DDataClusterShardingCoordinatorRoleSpec
+class DDataClusterShardingCoordinatorRoleSpecMultiJvmNode2 extends 
DDataClusterShardingCoordinatorRoleSpec
+class DDataClusterShardingCoordinatorRoleSpecMultiJvmNode3 extends 
DDataClusterShardingCoordinatorRoleSpec
+class DDataClusterShardingCoordinatorRoleSpecMultiJvmNode4 extends 
DDataClusterShardingCoordinatorRoleSpec
+
+class DDataRememberClusterShardingCoordinatorRoleSpec
+    extends ClusterShardingCoordinatorRoleSpec(
+      new ClusterShardingCoordinatorRoleSpecConfig(
+        ClusterShardingSettings.StateStoreModeDData,
+        rememberEntities = true))
+class DDataRememberClusterShardingCoordinatorRoleSpecMultiJvmNode1
+    extends DDataRememberClusterShardingCoordinatorRoleSpec
+class DDataRememberClusterShardingCoordinatorRoleSpecMultiJvmNode2
+    extends DDataRememberClusterShardingCoordinatorRoleSpec
+class DDataRememberClusterShardingCoordinatorRoleSpecMultiJvmNode3
+    extends DDataRememberClusterShardingCoordinatorRoleSpec
+class DDataRememberClusterShardingCoordinatorRoleSpecMultiJvmNode4
+    extends DDataRememberClusterShardingCoordinatorRoleSpec
+
+abstract class ClusterShardingCoordinatorRoleSpec(multiNodeConfig: 
ClusterShardingCoordinatorRoleSpecConfig)
+    extends MultiNodeClusterShardingSpec(multiNodeConfig)
+    with ImplicitSender {
+
+  import multiNodeConfig.{ first, fourth, second, third }
+
+  private val rememberEntities = multiNodeConfig.rememberEntities
+
+  def startSharding(probe: ActorRef): ActorRef = {
+    startSharding(
+      system,
+      typeName = "Entity",
+      entityProps = Props(new EntityActor(probe)),
+      settings = 
ClusterShardingSettings(system).withRememberEntities(multiNodeConfig.rememberEntities),
+      extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId,
+      extractShardId = MultiNodeClusterShardingSpec.intExtractShardId)
+  }
+
+  private lazy val region = ClusterSharding(system).shardRegion("Entity")
+
+  private val entityProbe = TestProbe()
+
+  s"Cluster sharding (${multiNodeConfig.mode}, remember 
${multiNodeConfig.rememberEntities}) with separate coordinator role" must {
+
+    "use nodes with shard role" in within(30.seconds) {
+      startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, 
second, third))
+
+      join(first, first)
+      join(second, first)
+      join(third, first)
+      startSharding(entityProbe.ref)
+
+      runOn(third) {
+        region ! 1
+        expectMsg(1)
+        lastSender.path.address.hasLocalScope should ===(true)
+        entityProbe.expectMsg(EntityActor.Started(lastSender))
+      }
+      runOn(first, second) {
+        region ! 1
+        expectMsg(1)
+        lastSender.path should be(node(third) / "system" / "sharding" / 
"Entity" / "1" / "1")
+        entityProbe.expectNoMessage()
+      }
+
+      enterBarrier("first-ok")
+
+      if (rememberEntities) {
+        runOn(third) {
+          region ! 2
+          expectMsg(2)
+          watch(lastSender)
+          val ref = lastSender
+          ref ! PoisonPill
+          expectTerminated(ref)
+          entityProbe.expectMsg(EntityActor.Started(ref))
+          // and then started again by remember entities
+          // not same ActorRef, but same path
+          entityProbe.expectMsgType[EntityActor.Started].ref.path should 
be(ref.path)
+        }
+        runOn(first, second) {
+          entityProbe.expectNoMessage()
+        }
+
+      }
+
+      enterBarrier("after-1")
+    }
+
+    if (rememberEntities) {
+      "restart remembered entities" in {
+        join(fourth, second)
+
+        runOn(first) {
+          cluster.leave(third)
+          cluster.leave(first)
+        }
+        enterBarrier("first left")
+
+        runOn(fourth) {
+          // started by remember entities on new node
+          entityProbe.expectMsgType[EntityActor.Started](20.seconds)
+        }
+        runOn(second) {
+          entityProbe.expectNoMessage()
+        }
+
+        enterBarrier("after-2")
+      }
+    }
+
+  }
+}
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
index 8c876ad785..45f910393d 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
@@ -1283,7 +1283,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   require(!cluster.isTerminated, "Cluster node must not be terminated")
   require(
     roles.subsetOf(cluster.selfRoles),
-    s"This cluster member [$selfAddress] doesn't have all the roles 
[${roles.mkString(", ")}]")
+    s"This cluster member [$selfAddress] with roles 
[${cluster.selfRoles.mkString(", ")}] doesn't have all the roles 
[${roles.mkString(", ")}]")
 
   private val payloadSizeAggregator = {
     val sizeExceeding = settings.logDataSizeExceeding.getOrElse(Int.MaxValue)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to