This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new fd5d87b35a Port akka-core PR #31889: fix coordinator role in cluster
sharding (#2918)
fd5d87b35a is described below
commit fd5d87b35ac49f28a9ded27456386b98225d2280
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 27 10:29:48 2026 +0200
Port akka-core PR #31889: fix coordinator role in cluster sharding (#2918)
* Port akka-core PR #31889: fix coordinator role in cluster sharding
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/cb8ef4a6-c1d0-4b62-9062-a08e1cd41cfe
Co-authored-by: pjfanning <[email protected]>
* Fix compile error: use explicit val for rememberEntities in coordinator
role spec
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/5359fe45-b406-4eab-8f69-1e169fa4bfda
Co-authored-by: pjfanning <[email protected]>
* scalafmt
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
cluster-sharding/src/main/resources/reference.conf | 5 +-
.../pekko/cluster/sharding/ClusterSharding.scala | 197 ++++++++++++---------
.../cluster/sharding/ClusterShardingSettings.scala | 11 ++
.../pekko/cluster/sharding/ShardRegion.scala | 11 +-
.../internal/DDataRememberEntitiesShardStore.scala | 1 -
.../ClusterShardingCoordinatorRoleSpec.scala | 175 ++++++++++++++++++
.../apache/pekko/cluster/ddata/Replicator.scala | 2 +-
7 files changed, 312 insertions(+), 90 deletions(-)
diff --git a/cluster-sharding/src/main/resources/reference.conf
b/cluster-sharding/src/main/resources/reference.conf
index d079825d10..65de646647 100644
--- a/cluster-sharding/src/main/resources/reference.conf
+++ b/cluster-sharding/src/main/resources/reference.conf
@@ -368,8 +368,9 @@ pekko.cluster.sharding {
coordinator-singleton = ${pekko.cluster.singleton}
- # Copies the role for the coordinator singleton from the shards role instead
of using the one provided in the
- # "pekko.cluster.sharding.coordinator-singleton.role"
+ # By default, the role for the coordinator singleton is the same as the role
for the shards
+ # "pekko.cluster.sharding.role". Set this to off to use the role from
+ # "pekko.cluster.sharding.coordinator-singleton.role" for the coordinator
singleton.
coordinator-singleton-role-override = on
coordinator-state {
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
index 6eee343108..d8515977a4 100755
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
@@ -318,6 +318,12 @@ class ClusterSharding(system: ExtendedActorSystem) extends
Extension {
} else {
log.debug("Starting Shard Region Proxy [{}] (no actors will be hosted on
this node)...", typeName)
+ if (settings.shouldHostCoordinator(cluster)) {
+ // when using another coordinator role than the shard role the
coordinator may have to be started
+ val startCoordinatorMsg = StartCoordinatorIfNeeded(typeName, settings,
allocationStrategy)
+ guardian ! startCoordinatorMsg
+ }
+
startProxy(
typeName,
settings.role,
@@ -702,6 +708,11 @@ private[pekko] object ClusterShardingGuardian {
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId)
extends NoSerializationVerificationNeeded
+ final case class StartCoordinatorIfNeeded(
+ typeName: String,
+ settings: ClusterShardingSettings,
+ allocationStrategy: ShardAllocationStrategy)
+ extends NoSerializationVerificationNeeded
final case class Started(shardRegion: ActorRef) extends
NoSerializationVerificationNeeded
}
@@ -711,6 +722,7 @@ private[pekko] object ClusterShardingGuardian {
*/
private[pekko] class ClusterShardingGuardian extends Actor {
import ClusterShardingGuardian._
+ import ShardCoordinator.ShardAllocationStrategy
val cluster = Cluster(context.system)
val sharding = ClusterSharding(context.system)
@@ -724,35 +736,106 @@ private[pekko] class ClusterShardingGuardian extends
Actor {
private def coordinatorPath(encName: String): String =
(self.path / coordinatorSingletonManagerName(encName) / "singleton" /
"coordinator").toStringWithoutAddress
- private def replicatorSettings(shardingSettings: ClusterShardingSettings) = {
+ private def replicatorSettings(role: Option[String], settings:
ClusterShardingSettings) = {
val configuredSettings =
ReplicatorSettings(context.system.settings.config.getConfig("pekko.cluster.sharding.distributed-data"))
// Use members within the data center and with the given role (if any)
- val replicatorRoles = Set(ClusterSettings.DcRolePrefix +
cluster.settings.SelfDataCenter) ++ shardingSettings.role
+ val replicatorRoles = Set(ClusterSettings.DcRolePrefix +
cluster.settings.SelfDataCenter) ++ role
val settingsWithRoles = configuredSettings.withRoles(replicatorRoles)
- if (shardingSettings.rememberEntities)
+ if (settings.rememberEntities)
settingsWithRoles
else
settingsWithRoles.withDurableKeys(Set.empty[String])
}
- private def replicator(settings: ClusterShardingSettings): ActorRef = {
- if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData
||
- settings.stateStoreMode ==
ClusterShardingSettings.RememberEntitiesStoreCustom) {
- // one Replicator per role
- replicatorByRole.get(settings.role) match {
- case Some(ref) => ref
- case None =>
- val name = settings.role match {
- case Some(r) => URLEncoder.encode(r, ByteString.UTF_8) +
"Replicator"
- case None => "replicator"
- }
- val ref =
context.actorOf(Replicator.props(replicatorSettings(settings)), name)
- replicatorByRole = replicatorByRole.updated(settings.role, ref)
- ref
- }
- } else
- context.system.deadLetters
+ private def replicator(role: Option[String], settings:
ClusterShardingSettings): ActorRef = {
+ // one Replicator per role
+ replicatorByRole.get(role) match {
+ case Some(ref) => ref
+ case None =>
+ val name = role match {
+ case Some(r) => URLEncoder.encode(r, ByteString.UTF_8) + "Replicator"
+ case None => "replicator"
+ }
+ val ref = context.actorOf(Replicator.props(replicatorSettings(role,
settings)), name)
+ replicatorByRole = replicatorByRole.updated(role, ref)
+ ref
+ }
+ }
+
+ private def rememberEntitiesStoreProvider(
+ typeName: String,
+ settings: ClusterShardingSettings): Option[RememberEntitiesProvider] = {
+ if (!settings.rememberEntities) None
+ else {
+ // with the deprecated persistence state store mode we always use the
event sourced provider for shard regions
+ // and no store for coordinator (the coordinator is a PersistentActor in
that case)
+ val rememberEntitiesProvider =
+ if (settings.stateStoreMode ==
ClusterShardingSettings.StateStoreModePersistence) {
+ ClusterShardingSettings.RememberEntitiesStoreEventsourced
+ } else {
+ settings.rememberEntitiesStore
+ }
+ Some(rememberEntitiesProvider match {
+ case ClusterShardingSettings.RememberEntitiesStoreDData =>
+ // must run Replicator for DDataRememberEntities on both shard and
coordinator roles (if different)
+ val role = if (settings.role == settings.coordinatorSingletonRole)
settings.role else None
+ val rep = replicator(role, settings)
+ new DDataRememberEntitiesProvider(typeName, settings,
majorityMinCap, rep)
+ case ClusterShardingSettings.RememberEntitiesStoreEventsourced =>
+ new EventSourcedRememberEntitiesProvider(typeName, settings)
+ case ClusterShardingSettings.RememberEntitiesStoreCustom =>
+ new CustomStateStoreModeProvider(typeName, context.system, settings)
+ case unknown =>
+ throw new IllegalArgumentException(s"Unknown store type: $unknown")
// compiler exhaustiveness check pleaser
+ })
+ }
+ }
+
+ private def startCoordinatorIfNeeded(
+ typeName: String,
+ allocationStrategy: ShardAllocationStrategy,
+ rememberEntitiesStoreProvider: Option[RememberEntitiesProvider],
+ settings: ClusterShardingSettings): Unit = {
+ import settings.tuningParameters.coordinatorFailureBackoff
+
+ val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
+ val cName = coordinatorSingletonManagerName(encName)
+ if (settings.shouldHostCoordinator(cluster) &&
context.child(cName).isEmpty) {
+ val coordinatorProps =
+ if (settings.stateStoreMode ==
ClusterShardingSettings.StateStoreModePersistence) {
+ ShardCoordinator.props(typeName, settings, allocationStrategy)
+ } else {
+ val rep = replicator(settings.coordinatorSingletonRole, settings)
+ ShardCoordinator.props(
+ typeName,
+ settings,
+ allocationStrategy,
+ rep,
+ majorityMinCap,
+ rememberEntitiesStoreProvider)
+ }
+ val singletonProps =
+ BackoffOpts
+ .onStop(
+ childProps = coordinatorProps,
+ childName = "coordinator",
+ minBackoff = coordinatorFailureBackoff,
+ maxBackoff = coordinatorFailureBackoff * 5,
+ randomFactor = 0.2)
+ .withFinalStopMessage(_ == ShardCoordinator.Internal.Terminate)
+ .props
+ .withDeploy(Deploy.local)
+
+ val singletonSettings =
+
settings.coordinatorSingletonSettings.withSingletonName("singleton").withRole(settings.coordinatorSingletonRole)
+
+ context.actorOf(
+ ClusterSingletonManager
+ .props(singletonProps, terminationMessage =
ShardCoordinator.Internal.Terminate, singletonSettings)
+ .withDispatcher(context.props.dispatcher),
+ name = cName)
+ }
}
def receive: Receive = {
@@ -765,69 +848,11 @@ private[pekko] class ClusterShardingGuardian extends
Actor {
allocationStrategy,
handOffStopMessage) =>
try {
- import settings.role
- import settings.tuningParameters.coordinatorFailureBackoff
-
- val rep = replicator(settings)
- val rememberEntitiesStoreProvider: Option[RememberEntitiesProvider] =
- if (!settings.rememberEntities) None
- else {
- // with the deprecated persistence state store mode we always use
the event sourced provider for shard regions
- // and no store for coordinator (the coordinator is a
PersistentActor in that case)
- val rememberEntitiesProvider =
- if (settings.stateStoreMode ==
ClusterShardingSettings.StateStoreModePersistence) {
- ClusterShardingSettings.RememberEntitiesStoreEventsourced
- } else {
- settings.rememberEntitiesStore
- }
- Some(rememberEntitiesProvider match {
- case ClusterShardingSettings.RememberEntitiesStoreDData =>
- new DDataRememberEntitiesProvider(typeName, settings,
majorityMinCap, rep)
- case ClusterShardingSettings.RememberEntitiesStoreEventsourced =>
- new EventSourcedRememberEntitiesProvider(typeName, settings)
- case ClusterShardingSettings.RememberEntitiesStoreCustom =>
- new CustomStateStoreModeProvider(typeName, context.system,
settings)
- case unknown =>
- throw new IllegalArgumentException(s"Unknown store type:
$unknown") // compiler exhaustiveness check pleaser
- })
- }
-
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
- val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
val shardRegion = context.child(encName).getOrElse {
- if (context.child(cName).isEmpty) {
- val coordinatorProps =
- if (settings.stateStoreMode ==
ClusterShardingSettings.StateStoreModePersistence) {
- ShardCoordinator.props(typeName, settings, allocationStrategy)
- } else {
- ShardCoordinator
- .props(typeName, settings, allocationStrategy, rep,
majorityMinCap, rememberEntitiesStoreProvider)
- }
- val singletonProps =
- BackoffOpts
- .onStop(
- childProps = coordinatorProps,
- childName = "coordinator",
- minBackoff = coordinatorFailureBackoff,
- maxBackoff = coordinatorFailureBackoff * 5,
- randomFactor = 0.2)
- .withFinalStopMessage(_ == ShardCoordinator.Internal.Terminate)
- .props
- .withDeploy(Deploy.local)
-
- val singletonSettings = if
(settings.coordinatorSingletonOverrideRole) {
-
settings.coordinatorSingletonSettings.withSingletonName("singleton").withRole(role)
- } else {
-
settings.coordinatorSingletonSettings.withSingletonName("singleton")
- }
-
- context.actorOf(
- ClusterSingletonManager
- .props(singletonProps, terminationMessage =
ShardCoordinator.Internal.Terminate, singletonSettings)
- .withDispatcher(context.props.dispatcher),
- name = cName)
- }
+ val remEntitiesStoreProvider =
rememberEntitiesStoreProvider(typeName, settings)
+ startCoordinatorIfNeeded(typeName, allocationStrategy,
remEntitiesStoreProvider, settings)
context.actorOf(
ShardRegion
@@ -839,7 +864,7 @@ private[pekko] class ClusterShardingGuardian extends Actor {
extractEntityId = extractEntityId,
extractShardId = extractShardId,
handOffStopMessage = handOffStopMessage,
- rememberEntitiesStoreProvider)
+ remEntitiesStoreProvider)
.withDispatcher(context.props.dispatcher),
name = encName)
}
@@ -854,6 +879,7 @@ private[pekko] class ClusterShardingGuardian extends Actor {
case StartProxy(typeName, dataCenter, settings, extractEntityId,
extractShardId) =>
try {
+
val encName = URLEncoder.encode(s"${typeName}Proxy", ByteString.UTF_8)
val cPath = coordinatorPath(URLEncoder.encode(typeName,
ByteString.UTF_8))
// it must be possible to start several proxies, one per data center
@@ -882,6 +908,15 @@ private[pekko] class ClusterShardingGuardian extends Actor
{
sender() ! Status.Failure(e)
}
+ case StartCoordinatorIfNeeded(typeName, settings, allocationStrategy) =>
+ try {
+ val remEntitiesStoreProvider = rememberEntitiesStoreProvider(typeName,
settings)
+ startCoordinatorIfNeeded(typeName, allocationStrategy,
remEntitiesStoreProvider, settings)
+ } catch {
+ case NonFatal(_) =>
+ // don't restart
+ }
+
}
}
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
index f57df942b3..bb1f5c1445 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingSettings.scala
@@ -984,6 +984,17 @@ final class ClusterShardingSettings(
private[pekko] def shouldHostShard(cluster: Cluster): Boolean =
role.forall(cluster.selfMember.roles.contains)
+ /** If true, this node should run the shard coordinator, otherwise just a
shard proxy or shard region on this node. */
+ @InternalApi
+ private[pekko] def shouldHostCoordinator(cluster: Cluster): Boolean =
+ coordinatorSingletonRole.forall(cluster.selfMember.roles.contains)
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] def coordinatorSingletonRole: Option[String] =
+ if (coordinatorSingletonOverrideRole) role else
coordinatorSingletonSettings.role
+
@InternalApi
private[pekko] val passivationStrategy:
ClusterShardingSettings.PassivationStrategy =
ClusterShardingSettings.PassivationStrategy(this)
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
index a6aca98d88..d6347f4ee6 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
@@ -641,6 +641,7 @@ private[pekko] class ShardRegion(
// sort by age, oldest first
val ageOrdering = Member.ageOrdering
+ // membersByAge is only used for tracking where coordinator is running
var membersByAge: immutable.SortedSet[Member] =
immutable.SortedSet.empty(ageOrdering)
// membersByAge contains members with these status
private val memberStatusOfInterest: Set[MemberStatus] =
@@ -721,8 +722,8 @@ private[pekko] class ShardRegion(
case None => ClusterSettings.DcRolePrefix +
cluster.settings.SelfDataCenter
}
- def matchingRole(member: Member): Boolean =
- member.hasRole(targetDcRole) && role.forall(member.hasRole)
+ def matchingCoordinatorRole(member: Member): Boolean =
+ member.hasRole(targetDcRole) &&
settings.coordinatorSingletonRole.forall(member.hasRole)
/**
* When leaving the coordinator singleton is started rather quickly on next
@@ -784,7 +785,7 @@ private[pekko] class ShardRegion(
changeMembers(
immutable.SortedSet
.empty(ageOrdering)
- .union(state.members.filter(m => memberStatusOfInterest(m.status) &&
matchingRole(m))))
+ .union(state.members.filter(m => memberStatusOfInterest(m.status) &&
matchingCoordinatorRole(m))))
}
def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match {
@@ -798,7 +799,7 @@ private[pekko] class ShardRegion(
case MemberRemoved(m, _) =>
if (m.uniqueAddress == cluster.selfUniqueAddress)
context.stop(self)
- else if (matchingRole(m))
+ else if (matchingCoordinatorRole(m))
changeMembers(membersByAge.filterNot(_.uniqueAddress ==
m.uniqueAddress))
case MemberDowned(m) =>
@@ -818,7 +819,7 @@ private[pekko] class ShardRegion(
}
private def addMember(m: Member): Unit = {
- if (matchingRole(m) && memberStatusOfInterest(m.status)) {
+ if (matchingCoordinatorRole(m) && memberStatusOfInterest(m.status)) {
// replace, it's possible that the status, or upNumber is changed
changeMembers(membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)
+ m)
}
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
index 0ff285c06f..9667bc60af 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
@@ -92,7 +92,6 @@ private[pekko] final class DDataRememberEntitiesShardStore(
extends Actor
with Stash
with ActorLogging {
-
import DDataRememberEntitiesShardStore._
implicit val ec: ExecutionContext = context.dispatcher
diff --git
a/cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sharding/ClusterShardingCoordinatorRoleSpec.scala
b/cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sharding/ClusterShardingCoordinatorRoleSpec.scala
new file mode 100644
index 0000000000..d1bb7c545b
--- /dev/null
+++
b/cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sharding/ClusterShardingCoordinatorRoleSpec.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2019-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.cluster.sharding
+
+import scala.concurrent.duration._
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+
+import org.apache.pekko
+import pekko.actor.ActorRef
+import pekko.actor.PoisonPill
+import pekko.actor.Props
+import pekko.cluster.sharding.MultiNodeClusterShardingSpec.EntityActor
+import pekko.testkit._
+
+class ClusterShardingCoordinatorRoleSpecConfig(
+ mode: String,
+ rememberEntities: Boolean,
+ rememberEntitiesStore: String =
ClusterShardingSettings.RememberEntitiesStoreDData)
+ extends MultiNodeClusterShardingConfig(
+ mode,
+ rememberEntities,
+ rememberEntitiesStore = rememberEntitiesStore,
+ additionalConfig = """
+ pekko.cluster.sharding {
+ role = shard
+ coordinator-singleton-role-override = off
+ coordinator-singleton.role = coordinator
+ entity-restart-backoff = 100 ms
+ }
+ """) {
+
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+ val fourth = role("fourth")
+
+ val coordinatorConfig: Config =
ConfigFactory.parseString("""pekko.cluster.roles = [ "coordinator" ]""")
+ val shardConfig: Config = ConfigFactory.parseString("""pekko.cluster.roles =
[ "shard" ]""")
+
+ nodeConfig(first, second)(coordinatorConfig)
+
+ nodeConfig(third, fourth)(shardConfig)
+
+}
+
+class DDataClusterShardingCoordinatorRoleSpec
+ extends ClusterShardingCoordinatorRoleSpec(
+ new ClusterShardingCoordinatorRoleSpecConfig(
+ ClusterShardingSettings.StateStoreModeDData,
+ rememberEntities = false))
+class DDataClusterShardingCoordinatorRoleSpecMultiJvmNode1 extends
DDataClusterShardingCoordinatorRoleSpec
+class DDataClusterShardingCoordinatorRoleSpecMultiJvmNode2 extends
DDataClusterShardingCoordinatorRoleSpec
+class DDataClusterShardingCoordinatorRoleSpecMultiJvmNode3 extends
DDataClusterShardingCoordinatorRoleSpec
+class DDataClusterShardingCoordinatorRoleSpecMultiJvmNode4 extends
DDataClusterShardingCoordinatorRoleSpec
+
+class DDataRememberClusterShardingCoordinatorRoleSpec
+ extends ClusterShardingCoordinatorRoleSpec(
+ new ClusterShardingCoordinatorRoleSpecConfig(
+ ClusterShardingSettings.StateStoreModeDData,
+ rememberEntities = true))
+class DDataRememberClusterShardingCoordinatorRoleSpecMultiJvmNode1
+ extends DDataRememberClusterShardingCoordinatorRoleSpec
+class DDataRememberClusterShardingCoordinatorRoleSpecMultiJvmNode2
+ extends DDataRememberClusterShardingCoordinatorRoleSpec
+class DDataRememberClusterShardingCoordinatorRoleSpecMultiJvmNode3
+ extends DDataRememberClusterShardingCoordinatorRoleSpec
+class DDataRememberClusterShardingCoordinatorRoleSpecMultiJvmNode4
+ extends DDataRememberClusterShardingCoordinatorRoleSpec
+
+abstract class ClusterShardingCoordinatorRoleSpec(multiNodeConfig:
ClusterShardingCoordinatorRoleSpecConfig)
+ extends MultiNodeClusterShardingSpec(multiNodeConfig)
+ with ImplicitSender {
+
+ import multiNodeConfig.{ first, fourth, second, third }
+
+ private val rememberEntities = multiNodeConfig.rememberEntities
+
+ def startSharding(probe: ActorRef): ActorRef = {
+ startSharding(
+ system,
+ typeName = "Entity",
+ entityProps = Props(new EntityActor(probe)),
+ settings =
ClusterShardingSettings(system).withRememberEntities(multiNodeConfig.rememberEntities),
+ extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId,
+ extractShardId = MultiNodeClusterShardingSpec.intExtractShardId)
+ }
+
+ private lazy val region = ClusterSharding(system).shardRegion("Entity")
+
+ private val entityProbe = TestProbe()
+
+ s"Cluster sharding (${multiNodeConfig.mode}, remember
${multiNodeConfig.rememberEntities}) with separate coordinator role" must {
+
+ "use nodes with shard role" in within(30.seconds) {
+ startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first,
second, third))
+
+ join(first, first)
+ join(second, first)
+ join(third, first)
+ startSharding(entityProbe.ref)
+
+ runOn(third) {
+ region ! 1
+ expectMsg(1)
+ lastSender.path.address.hasLocalScope should ===(true)
+ entityProbe.expectMsg(EntityActor.Started(lastSender))
+ }
+ runOn(first, second) {
+ region ! 1
+ expectMsg(1)
+ lastSender.path should be(node(third) / "system" / "sharding" /
"Entity" / "1" / "1")
+ entityProbe.expectNoMessage()
+ }
+
+ enterBarrier("first-ok")
+
+ if (rememberEntities) {
+ runOn(third) {
+ region ! 2
+ expectMsg(2)
+ watch(lastSender)
+ val ref = lastSender
+ ref ! PoisonPill
+ expectTerminated(ref)
+ entityProbe.expectMsg(EntityActor.Started(ref))
+ // and then started again by remember entities
+ // not same ActorRef, but same path
+ entityProbe.expectMsgType[EntityActor.Started].ref.path should
be(ref.path)
+ }
+ runOn(first, second) {
+ entityProbe.expectNoMessage()
+ }
+
+ }
+
+ enterBarrier("after-1")
+ }
+
+ if (rememberEntities) {
+ "restart remembered entities" in {
+ join(fourth, second)
+
+ runOn(first) {
+ cluster.leave(third)
+ cluster.leave(first)
+ }
+ enterBarrier("first left")
+
+ runOn(fourth) {
+ // started by remember entities on new node
+ entityProbe.expectMsgType[EntityActor.Started](20.seconds)
+ }
+ runOn(second) {
+ entityProbe.expectNoMessage()
+ }
+
+ enterBarrier("after-2")
+ }
+ }
+
+ }
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
index 8c876ad785..45f910393d 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
@@ -1283,7 +1283,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
require(!cluster.isTerminated, "Cluster node must not be terminated")
require(
roles.subsetOf(cluster.selfRoles),
- s"This cluster member [$selfAddress] doesn't have all the roles
[${roles.mkString(", ")}]")
+ s"This cluster member [$selfAddress] with roles
[${cluster.selfRoles.mkString(", ")}] doesn't have all the roles
[${roles.mkString(", ")}]")
private val payloadSizeAggregator = {
val sizeExceeding = settings.logDataSizeExceeding.getOrElse(Int.MaxValue)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]