This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new ab12a8f5ec Port akka-core#31834: Add ScheduledClock to reduce nanoTime
overhead in recency-based passivation (#2766)
ab12a8f5ec is described below
commit ab12a8f5ecc711dcd5b73f9772f6bdda28ddb72d
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Mar 22 10:46:09 2026 +0100
Port akka-core#31834: Add ScheduledClock to reduce nanoTime overhead in
recency-based passivation (#2766)
* Initial plan
* Port akka-core PR #31834: Add ScheduledClock for recency-based
passivation strategies
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/76266e9b-5d21-4c79-976f-82c1cc8b6f54
* add mima exclude
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../org/apache/pekko/util/FrequencyListSpec.scala | 13 +--
.../org/apache/pekko/util/RecencyListSpec.scala | 15 +--
.../org/apache/pekko/util/ScheduledClockSpec.scala | 76 +++++++++++++
.../pekko/util/SegmentedRecencyListSpec.scala | 13 +--
.../scala/org/apache/pekko/util/TestClock.scala | 30 +++++
.../add-scheduled-clock.excludes | 22 ++++
actor/src/main/resources/reference.conf | 4 +
.../main/scala/org/apache/pekko/util/Clock.scala | 121 +++++++++++++++++++++
.../org/apache/pekko/util/FrequencyList.scala | 6 +-
.../scala/org/apache/pekko/util/RecencyList.scala | 18 +--
.../apache/pekko/util/SegmentedRecencyList.scala | 8 +-
.../org/apache/pekko/cluster/sharding/Shard.scala | 3 +-
.../internal/EntityPassivationStrategy.scala | 77 +++++++------
.../sharding/passivation/CompositeSpec.scala | 4 +-
.../passivation/EntityPassivationSpec.scala | 7 +-
.../cluster/sharding/passivation/IdleSpec.scala | 4 +-
.../passivation/LeastFrequentlyUsedSpec.scala | 4 +-
.../passivation/LeastRecentlyUsedSpec.scala | 4 +-
.../passivation/MostRecentlyUsedSpec.scala | 4 +-
.../sharding/passivation/simulator/Simulator.scala | 52 +++++----
20 files changed, 365 insertions(+), 120 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/FrequencyListSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/FrequencyListSpec.scala
index b3dbe1837b..6a9b8d2cad 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/util/FrequencyListSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/FrequencyListSpec.scala
@@ -18,16 +18,7 @@ import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
-object FrequencyListSpec {
- // controlled clock for testing recency windows
- // durations are always in seconds
- class TestClock extends RecencyList.Clock {
- private var time = 0L
- def tick(): Unit = time += 1
- override def currentTime(): Long = time
- override def earlierTime(duration: FiniteDuration): Long = currentTime() -
duration.toSeconds
- }
-}
+object FrequencyListSpec {}
class FrequencyListSpec extends AnyWordSpec with Matchers {
@@ -102,7 +93,7 @@ class FrequencyListSpec extends AnyWordSpec with Matchers {
}
"track overall recency of elements when enabled" in {
- val clock = new RecencyListSpec.TestClock
+ val clock = new TestClock
val frequency = new FrequencyList[String](dynamicAging = false,
OptionVal.Some(clock))
check(frequency, Nil)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/RecencyListSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/RecencyListSpec.scala
index 0e4766f91d..57afb642e3 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/util/RecencyListSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/RecencyListSpec.scala
@@ -18,16 +18,7 @@ import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
-object RecencyListSpec {
- // controlled clock for testing recency windows
- // durations are always in seconds
- class TestClock extends RecencyList.Clock {
- private var time = 0L
- def tick(): Unit = time += 1
- override def currentTime(): Long = time
- override def earlierTime(duration: FiniteDuration): Long = currentTime() -
duration.toSeconds
- }
-}
+object RecencyListSpec {}
class RecencyListSpec extends AnyWordSpec with Matchers {
@@ -41,8 +32,8 @@ class RecencyListSpec extends AnyWordSpec with Matchers {
"RecencyList" must {
"track recency of elements" in {
- val clock = new RecencyListSpec.TestClock
- val recency = new RecencyList[String](clock)
+ val clock = new TestClock
+ val recency = RecencyList[String](clock)
check(recency, Nil)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/ScheduledClockSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/ScheduledClockSpec.scala
new file mode 100644
index 0000000000..b614deea6c
--- /dev/null
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/ScheduledClockSpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2016-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.util
+
+import scala.concurrent.duration._
+
+import org.apache.pekko.testkit.PekkoSpec
+import org.apache.pekko.testkit.TimingTest
+
+class ScheduledClockSpec extends PekkoSpec {
+
+ "ScheduledClock" must {
+
+ "update at given interval" taggedAs TimingTest in {
+ val interval = 100.millis
+ val clock = new ScheduledClock(interval, system.scheduler,
system.dispatcher)
+ val t1 = clock.currentTime()
+ // interval is much smaller but to avoid flaky test due to pauses we
sleep longer
+ Thread.sleep(2000)
+ val t2 = clock.currentTime()
+ (t2 - t1) shouldBe >=(interval.toNanos)
+
+ Thread.sleep(2000)
+ val t3 = clock.currentTime()
+ (t3 - t2) shouldBe >=(interval.toNanos)
+ }
+
+ "increment each call to currentTime" in {
+ val interval = 10.seconds // don't update
+ val clock = new ScheduledClock(interval, system.scheduler,
system.dispatcher)
+ val t1 = clock.currentTime()
+ val t2 = clock.currentTime()
+ val t3 = clock.currentTime()
+ (t2 - t1) shouldBe 1L
+ (t3 - t2) shouldBe 1L
+ }
+
+ "not go backwards" taggedAs TimingTest in {
+ val interval = 20.millis
+ val clock = new ScheduledClock(interval, system.scheduler,
system.dispatcher)
+
+ // run a few threads updating in the background
+ val backgroundTasks = (1 to 3).map { _ =>
+ system.scheduler.scheduleWithFixedDelay(10.millis, 10.millis) { () =>
+ (1 to 1000).foreach { _ =>
+ clock.currentTime()
+ }
+ }(system.dispatcher)
+ }
+ try {
+ var t = clock.currentTime()
+ (1 to 1000000).foreach { n =>
+ val t2 = clock.currentTime()
+ (t2 - t) shouldBe >=(0L)
+ t = t2
+ if (n % 100000 == 0)
+ Thread.sleep(10)
+ }
+ } finally {
+ backgroundTasks.foreach(_.cancel())
+ }
+ }
+
+ }
+}
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/SegmentedRecencyListSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/SegmentedRecencyListSpec.scala
index 99aaa560dd..1f7e2fe195 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/util/SegmentedRecencyListSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/util/SegmentedRecencyListSpec.scala
@@ -18,16 +18,7 @@ import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
-object SegmentedRecencyListSpec {
- // controlled clock for testing recency windows
- // durations are always in seconds
- class TestClock extends RecencyList.Clock {
- private var time = 0L
- def tick(): Unit = time += 1
- override def currentTime(): Long = time
- override def earlierTime(duration: FiniteDuration): Long = currentTime() -
duration.toSeconds
- }
-}
+object SegmentedRecencyListSpec {}
class SegmentedRecencyListSpec extends AnyWordSpec with Matchers {
@@ -129,7 +120,7 @@ class SegmentedRecencyListSpec extends AnyWordSpec with
Matchers {
}
"remove overall least recent elements" in {
- val clock = new SegmentedRecencyListSpec.TestClock
+ val clock = new TestClock
val recency = new SegmentedRecencyList[String](initialLimits = List(5,
5), OptionVal.Some(clock))
check(recency, Nil)
diff --git a/actor-tests/src/test/scala/org/apache/pekko/util/TestClock.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/TestClock.scala
new file mode 100644
index 0000000000..200044b500
--- /dev/null
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/TestClock.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.util
+
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * Controlled clock for testing recency windows.
+ * Durations are always in seconds.
+ */
+class TestClock extends Clock {
+ private var time = 0L
+
+ def tick(): Unit = time += 1
+
+ override def currentTime(): Long = time
+
+ override def earlierTime(duration: FiniteDuration): Long = currentTime() -
duration.toSeconds
+}
diff --git
a/actor/src/main/mima-filters/2.0.x.backwards.excludes/add-scheduled-clock.excludes
b/actor/src/main/mima-filters/2.0.x.backwards.excludes/add-scheduled-clock.excludes
new file mode 100644
index 0000000000..b841a9191f
--- /dev/null
+++
b/actor/src/main/mima-filters/2.0.x.backwards.excludes/add-scheduled-clock.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Remove deprecated method in Scheduler
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FrequencyList#withOverallRecency.empty*")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.util.RecencyList$Clock")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.util.RecencyList$NanoClock")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.SegmentedRecencyList#withOverallRecency.empty")
diff --git a/actor/src/main/resources/reference.conf
b/actor/src/main/resources/reference.conf
index aa435557ae..5bb1c64888 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -921,6 +921,10 @@ pekko {
shutdown-timeout = 5s
}
+ # How frequent the clock for recency-based strategies is updated. Can be set
to 0 for usage of
+ # `System.nanoTime` for each call but that might have some overhead for high
message throughput.
+ scheduled-clock-interval = 1 s
+
io {
# By default the select loops run on dedicated threads, hence using a
diff --git a/actor/src/main/scala/org/apache/pekko/util/Clock.scala
b/actor/src/main/scala/org/apache/pekko/util/Clock.scala
new file mode 100644
index 0000000000..2226a2fec6
--- /dev/null
+++ b/actor/src/main/scala/org/apache/pekko/util/Clock.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.util
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.annotation.tailrec
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.actor.ClassicActorSystemProvider
+import pekko.actor.ExtendedActorSystem
+import pekko.actor.Extension
+import pekko.actor.ExtensionId
+import pekko.actor.ExtensionIdProvider
+import pekko.actor.Scheduler
+import pekko.annotation.InternalApi
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object Clock extends ExtensionId[Clock] with
ExtensionIdProvider {
+ override def get(system: ActorSystem): Clock = super.get(system)
+
+ override def get(system: ClassicActorSystemProvider): Clock =
super.get(system)
+
+ override def lookup = Clock
+
+ override def createExtension(system: ExtendedActorSystem): Clock = {
+ import scala.concurrent.duration._
+ val interval =
system.settings.config.getDuration("pekko.scheduled-clock-interval",
TimeUnit.MILLISECONDS).millis
+ if (interval > Duration.Zero)
+ new ScheduledClock(interval, system.scheduler, system.dispatcher)
+ else new NanoClock
+ }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] trait Clock extends Extension {
+ def currentTime(): Long
+
+ def earlierTime(duration: FiniteDuration): Long
+}
+
+/**
+ * INTERNAL API: Clock backed by `System.nanoTime`.
+ */
+@InternalApi private[pekko] final class NanoClock extends Clock {
+ override def currentTime(): Long = System.nanoTime()
+
+ override def earlierTime(duration: FiniteDuration): Long = currentTime() -
duration.toNanos
+}
+
+/**
+ * INTERNAL API: Clock backed by `System.nanoTime` but only calls `nanoTime`
with the given interval using the `scheduler`.
+ * This has the benefit of not calling `nanoTime` too often when exact
timestamps are not needed.
+ * The `currentTime` never moves backwards (but overflows to negative in same
way as `nanoTime`).
+ * Subsequent calls to `currentTime` will increment the "time" with 1, unless
for very high frequency where it may
+ * keep the same time value until next background update.
+ */
+@InternalApi private[pekko] final class ScheduledClock(
+ updateInterval: FiniteDuration,
+ scheduler: Scheduler,
+ executionContext: ExecutionContext)
+ extends Clock {
+ private val time = new AtomicLong(System.nanoTime())
+ @volatile private var updatedTime = time.get()
+
+ private val maxIncrement = math.max(updateInterval.toNanos - 100000, 0L)
+ @volatile var maxIncrementReached = false
+
+ scheduler.scheduleWithFixedDelay(updateInterval, updateInterval) { () =>
+ update()
+ }(executionContext)
+
+ @tailrec private def update(): Unit = {
+ val current = time.get()
+ val now = System.nanoTime()
+ val newTime =
+ if (now - current >= 0L) now // the diff also handles the case of
Long.MaxValue overflow to negative
+ else current
+
+ if (time.compareAndSet(current, newTime)) {
+ updatedTime = newTime
+ maxIncrementReached = false
+ } else {
+ // concurrent update via currentTime(), try again
+ update()
+ }
+ }
+
+ override def currentTime(): Long = {
+ if (maxIncrementReached) {
+ time.get()
+ } else {
+ val now = time.incrementAndGet()
+ if (now - updatedTime >= maxIncrement)
+ maxIncrementReached = true
+ now
+ }
+ }
+
+ override def earlierTime(duration: FiniteDuration): Long =
+ time.get() - duration.toNanos
+}
diff --git a/actor/src/main/scala/org/apache/pekko/util/FrequencyList.scala
b/actor/src/main/scala/org/apache/pekko/util/FrequencyList.scala
index 9f66fc9e59..772aaddd0a 100644
--- a/actor/src/main/scala/org/apache/pekko/util/FrequencyList.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/FrequencyList.scala
@@ -27,8 +27,8 @@ private[pekko] object FrequencyList {
new FrequencyList[A](dynamicAging, clock = OptionVal.None)
object withOverallRecency {
- def empty[A](dynamicAging: Boolean = false): FrequencyList[A] =
- new FrequencyList[A](dynamicAging, OptionVal.Some(new
RecencyList.NanoClock))
+ def empty[A](clock: Clock, dynamicAging: Boolean = false):
FrequencyList[A] =
+ new FrequencyList[A](dynamicAging, OptionVal.Some(clock))
}
private final class FrequencyNode[A](val priority: Long) {
@@ -60,7 +60,7 @@ private[pekko] object FrequencyList {
* Dynamic aging can be enabled for least frequently used policies, to
automatically 'age' the whole cache on evictions.
*/
@InternalApi
-private[pekko] final class FrequencyList[A](dynamicAging: Boolean, clock:
OptionVal[RecencyList.Clock]) {
+private[pekko] final class FrequencyList[A](dynamicAging: Boolean, clock:
OptionVal[Clock]) {
import FrequencyList.{ FrequencyNode, Node }
private val frequency = new DoubleLinkedList[FrequencyNode[A]](
diff --git a/actor/src/main/scala/org/apache/pekko/util/RecencyList.scala
b/actor/src/main/scala/org/apache/pekko/util/RecencyList.scala
index c7708d50e2..69ca2072f2 100644
--- a/actor/src/main/scala/org/apache/pekko/util/RecencyList.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/RecencyList.scala
@@ -23,22 +23,16 @@ import org.apache.pekko.annotation.InternalApi
*/
@InternalApi
private[pekko] object RecencyList {
- def empty[A]: RecencyList[A] = new RecencyList[A](new NanoClock)
+ def emptyWithNanoClock[A]: RecencyList[A] =
+ RecencyList[A](new NanoClock)
+
+ def apply[A](clock: Clock): RecencyList[A] =
+ new RecencyList[A](clock)
private final class Node[A](val value: A) {
var lessRecent, moreRecent: OptionVal[Node[A]] = OptionVal.None
var timestamp: Long = 0L
}
-
- trait Clock {
- def currentTime(): Long
- def earlierTime(duration: FiniteDuration): Long
- }
-
- final class NanoClock extends Clock {
- override def currentTime(): Long = System.nanoTime()
- override def earlierTime(duration: FiniteDuration): Long = currentTime() -
duration.toNanos
- }
}
/**
@@ -49,7 +43,7 @@ private[pekko] object RecencyList {
* Implemented using a doubly-linked list plus hash map for lookup, so that
all operations are constant time.
*/
@InternalApi
-private[pekko] final class RecencyList[A](clock: RecencyList.Clock) {
+private[pekko] final class RecencyList[A] private (clock: Clock) {
import RecencyList.Node
private val recency = new DoubleLinkedList[Node[A]](
diff --git
a/actor/src/main/scala/org/apache/pekko/util/SegmentedRecencyList.scala
b/actor/src/main/scala/org/apache/pekko/util/SegmentedRecencyList.scala
index 0e5320720b..a3ff6cb60e 100644
--- a/actor/src/main/scala/org/apache/pekko/util/SegmentedRecencyList.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/SegmentedRecencyList.scala
@@ -27,8 +27,8 @@ private[pekko] object SegmentedRecencyList {
new SegmentedRecencyList[A](limits, OptionVal.None)
object withOverallRecency {
- def empty[A](limits: immutable.Seq[Int]): SegmentedRecencyList[A] =
- new SegmentedRecencyList[A](limits, OptionVal.Some(new
RecencyList.NanoClock))
+ def empty[A](clock: Clock, limits: immutable.Seq[Int]):
SegmentedRecencyList[A] =
+ new SegmentedRecencyList[A](limits, OptionVal.Some(clock))
}
private final class Node[A](val value: A) {
@@ -47,9 +47,7 @@ private[pekko] object SegmentedRecencyList {
* Implemented using doubly-linked lists plus hash map for lookup, so that all
operations are constant time.
*/
@InternalApi
-private[pekko] final class SegmentedRecencyList[A](
- initialLimits: immutable.Seq[Int],
- clock: OptionVal[RecencyList.Clock]) {
+private[pekko] final class SegmentedRecencyList[A](initialLimits:
immutable.Seq[Int], clock: OptionVal[Clock]) {
import SegmentedRecencyList.Node
private var limits: immutable.IndexedSeq[Int] = initialLimits.toIndexedSeq
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
index 3792484f86..7c018326b9 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
@@ -48,6 +48,7 @@ import pekko.coordination.lease.scaladsl.Lease
import pekko.coordination.lease.scaladsl.LeaseProvider
import pekko.event.LoggingAdapter
import pekko.pattern.pipe
+import pekko.util.Clock
import pekko.util.MessageBufferMap
import pekko.util.OptionVal
import pekko.util.PrettyDuration._
@@ -470,7 +471,7 @@ private[pekko] class Shard(
private var handOffStopper: Option[ActorRef] = None
private var preparingForShutdown = false
- private val passivationStrategy = EntityPassivationStrategy(settings)
+ private val passivationStrategy = EntityPassivationStrategy(settings, clock
= () => Clock(context.system))
import context.dispatcher
private val passivateIntervalTask =
passivationStrategy.scheduledInterval.map { interval =>
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/EntityPassivationStrategy.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/EntityPassivationStrategy.scala
index e057331506..2806b5c563 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/EntityPassivationStrategy.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/EntityPassivationStrategy.scala
@@ -21,6 +21,7 @@ import pekko.annotation.InternalApi
import pekko.cluster.sharding.ClusterShardingSettings
import pekko.cluster.sharding.ShardRegion.EntityId
import pekko.util.{ FrequencyList, RecencyList, SegmentedRecencyList }
+import pekko.util.Clock
import pekko.util.FastFrequencySketch
import pekko.util.FrequencySketch
import pekko.util.OptionVal
@@ -36,26 +37,26 @@ private[pekko] object EntityPassivationStrategy {
val none: PassivateEntities = immutable.Seq.empty[EntityId]
}
- def apply(settings: ClusterShardingSettings): EntityPassivationStrategy = {
+ def apply(settings: ClusterShardingSettings, clock: () => Clock):
EntityPassivationStrategy = {
settings.passivationStrategy match {
case ClusterShardingSettings.IdlePassivationStrategy(timeout, interval)
=>
- new IdleEntityPassivationStrategy(new IdleCheck(timeout, interval))
+ new IdleEntityPassivationStrategy(new IdleCheck(timeout, interval),
clock())
case ClusterShardingSettings.LeastRecentlyUsedPassivationStrategy(limit,
segmented, idle) =>
val idleCheck = idle.map(idle => new IdleCheck(idle.timeout,
idle.interval))
- if (segmented.isEmpty) new
LeastRecentlyUsedEntityPassivationStrategy(limit, idleCheck)
- else new SegmentedLeastRecentlyUsedEntityPassivationStrategy(limit,
segmented, idleCheck)
+ if (segmented.isEmpty) new
LeastRecentlyUsedEntityPassivationStrategy(limit, idleCheck, clock())
+ else new SegmentedLeastRecentlyUsedEntityPassivationStrategy(limit,
segmented, idleCheck, clock)
case ClusterShardingSettings.MostRecentlyUsedPassivationStrategy(limit,
idle) =>
val idleCheck = idle.map(idle => new IdleCheck(idle.timeout,
idle.interval))
- new MostRecentlyUsedEntityPassivationStrategy(limit, idleCheck)
+ new MostRecentlyUsedEntityPassivationStrategy(limit, idleCheck,
clock())
case
ClusterShardingSettings.LeastFrequentlyUsedPassivationStrategy(limit,
dynamicAging, idle) =>
val idleCheck = idle.map(idle => new IdleCheck(idle.timeout,
idle.interval))
- new LeastFrequentlyUsedEntityPassivationStrategy(limit, dynamicAging,
idleCheck)
+ new LeastFrequentlyUsedEntityPassivationStrategy(limit, dynamicAging,
idleCheck, clock)
case composite: ClusterShardingSettings.CompositePassivationStrategy =>
- val main = ActiveEntities(composite.mainStrategy,
composite.idle.isDefined)
+ val main = ActiveEntities(composite.mainStrategy,
composite.idle.isDefined, clock)
if (main eq NoActiveEntities) DisabledEntityPassivationStrategy
else {
val initialLimit = composite.limit
- val window = ActiveEntities(composite.windowStrategy,
composite.idle.isDefined)
+ val window = ActiveEntities(composite.windowStrategy,
composite.idle.isDefined, clock)
val initialWindowProportion = if (window eq NoActiveEntities) 0.0
else composite.initialWindowProportion
val minimumWindowProportion = if (window eq NoActiveEntities) 0.0
else composite.minimumWindowProportion
val maximumWindowProportion = if (window eq NoActiveEntities) 0.0
else composite.maximumWindowProportion
@@ -153,11 +154,12 @@ private[pekko] final class IdleCheck(val timeout:
FiniteDuration, val interval:
* @param idleCheck passivate idle entities after the given timeout, checking
every interval
*/
@InternalApi
-private[pekko] final class IdleEntityPassivationStrategy(idleCheck: IdleCheck)
extends EntityPassivationStrategy {
+private[pekko] final class IdleEntityPassivationStrategy(idleCheck: IdleCheck,
clock: Clock)
+ extends EntityPassivationStrategy {
import EntityPassivationStrategy.PassivateEntities
- private val recencyList = RecencyList.empty[EntityId]
+ private val recencyList = RecencyList[EntityId](clock)
override val scheduledInterval: Option[FiniteDuration] =
Some(idleCheck.interval)
@@ -211,12 +213,15 @@ private[pekko] abstract class
LimitBasedEntityPassivationStrategy(initialLimit:
* @param initialLimit initial active entity capacity for a shard region
* @param idleCheck optionally passivate idle entities after the given
timeout, checking every interval
*/
-private[pekko] final class
LeastRecentlyUsedEntityPassivationStrategy(initialLimit: Int, idleCheck:
Option[IdleCheck])
+private[pekko] final class LeastRecentlyUsedEntityPassivationStrategy(
+ initialLimit: Int,
+ idleCheck: Option[IdleCheck],
+ clock: Clock)
extends LimitBasedEntityPassivationStrategy(initialLimit) {
import EntityPassivationStrategy.PassivateEntities
- val active = new LeastRecentlyUsedReplacementPolicy(initialLimit)
+ val active = new LeastRecentlyUsedReplacementPolicy(initialLimit, clock)
override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities =
active.updateLimit(perShardLimit)
@@ -248,12 +253,13 @@ private[pekko] final class
LeastRecentlyUsedEntityPassivationStrategy(initialLim
private[pekko] final class SegmentedLeastRecentlyUsedEntityPassivationStrategy(
initialLimit: Int,
proportions: immutable.Seq[Double],
- idleCheck: Option[IdleCheck])
+ idleCheck: Option[IdleCheck],
+ clock: () => Clock)
extends LimitBasedEntityPassivationStrategy(initialLimit) {
import EntityPassivationStrategy.PassivateEntities
- val active = new SegmentedLeastRecentlyUsedReplacementPolicy(initialLimit,
proportions, idleCheck.isDefined)
+ val active = new SegmentedLeastRecentlyUsedReplacementPolicy(initialLimit,
proportions, idleCheck.isDefined, clock)
override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities =
active.updateLimit(perShardLimit)
@@ -278,12 +284,15 @@ private[pekko] final class
SegmentedLeastRecentlyUsedEntityPassivationStrategy(
* @param idleCheck optionally passivate idle entities after the given
timeout, checking every interval
*/
@InternalApi
-private[pekko] final class
MostRecentlyUsedEntityPassivationStrategy(initialLimit: Int, idleCheck:
Option[IdleCheck])
+private[pekko] final class MostRecentlyUsedEntityPassivationStrategy(
+ initialLimit: Int,
+ idleCheck: Option[IdleCheck],
+ clock: Clock)
extends LimitBasedEntityPassivationStrategy(initialLimit) {
import EntityPassivationStrategy.PassivateEntities
- val active = new MostRecentlyUsedReplacementPolicy(initialLimit)
+ val active = new MostRecentlyUsedReplacementPolicy(initialLimit, clock)
override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities =
active.updateLimit(perShardLimit)
@@ -312,12 +321,13 @@ private[pekko] final class
MostRecentlyUsedEntityPassivationStrategy(initialLimi
private[pekko] final class LeastFrequentlyUsedEntityPassivationStrategy(
initialLimit: Int,
dynamicAging: Boolean,
- idleCheck: Option[IdleCheck])
+ idleCheck: Option[IdleCheck],
+ clock: () => Clock)
extends LimitBasedEntityPassivationStrategy(initialLimit) {
import EntityPassivationStrategy.PassivateEntities
- val active = new LeastFrequentlyUsedReplacementPolicy(initialLimit,
dynamicAging, idleCheck.isDefined)
+ val active = new LeastFrequentlyUsedReplacementPolicy(initialLimit,
dynamicAging, idleCheck.isDefined, clock)
override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities =
active.updateLimit(perShardLimit)
@@ -337,15 +347,18 @@ private[pekko] final class
LeastFrequentlyUsedEntityPassivationStrategy(
*/
@InternalApi
private[pekko] object ActiveEntities {
- def apply(strategy: ClusterShardingSettings.PassivationStrategy,
idleEnabled: Boolean): ActiveEntities =
+ def apply(
+ strategy: ClusterShardingSettings.PassivationStrategy,
+ idleEnabled: Boolean,
+ clock: () => Clock): ActiveEntities =
strategy match {
case ClusterShardingSettings.LeastRecentlyUsedPassivationStrategy(_,
segmented, _) =>
- if (segmented.isEmpty) new
LeastRecentlyUsedReplacementPolicy(initialLimit = 0)
- else new SegmentedLeastRecentlyUsedReplacementPolicy(initialLimit = 0,
segmented, idleEnabled)
+ if (segmented.isEmpty) new
LeastRecentlyUsedReplacementPolicy(initialLimit = 0, clock())
+ else new SegmentedLeastRecentlyUsedReplacementPolicy(initialLimit = 0,
segmented, idleEnabled, clock)
case ClusterShardingSettings.MostRecentlyUsedPassivationStrategy(_, _) =>
- new MostRecentlyUsedReplacementPolicy(initialLimit = 0)
+ new MostRecentlyUsedReplacementPolicy(initialLimit = 0, clock())
case ClusterShardingSettings.LeastFrequentlyUsedPassivationStrategy(_,
dynamicAging, _) =>
- new LeastFrequentlyUsedReplacementPolicy(initialLimit = 0,
dynamicAging, idleEnabled)
+ new LeastFrequentlyUsedReplacementPolicy(initialLimit = 0,
dynamicAging, idleEnabled, clock)
case _ => NoActiveEntities
}
}
@@ -433,11 +446,11 @@ private[pekko] object NoActiveEntities extends
ActiveEntities {
* @param initialLimit initial active entity capacity for a shard
*/
@InternalApi
-private[pekko] final class LeastRecentlyUsedReplacementPolicy(initialLimit:
Int) extends ActiveEntities {
+private[pekko] final class LeastRecentlyUsedReplacementPolicy(initialLimit:
Int, clock: Clock) extends ActiveEntities {
import EntityPassivationStrategy.PassivateEntities
private var limit = initialLimit
- private val recencyList = RecencyList.empty[EntityId]
+ private val recencyList = RecencyList[EntityId](clock)
override def size: Int = recencyList.size
@@ -482,7 +495,8 @@ private[pekko] final class
LeastRecentlyUsedReplacementPolicy(initialLimit: Int)
private[pekko] final class SegmentedLeastRecentlyUsedReplacementPolicy(
initialLimit: Int,
proportions: immutable.Seq[Double],
- idleEnabled: Boolean)
+ idleEnabled: Boolean,
+ clock: () => Clock)
extends ActiveEntities {
import EntityPassivationStrategy.PassivateEntities
@@ -495,7 +509,7 @@ private[pekko] final class
SegmentedLeastRecentlyUsedReplacementPolicy(
}
private val segmentedRecencyList =
- if (idleEnabled)
SegmentedRecencyList.withOverallRecency.empty[EntityId](segmentLimits)
+ if (idleEnabled)
SegmentedRecencyList.withOverallRecency.empty[EntityId](clock(), segmentLimits)
else SegmentedRecencyList.empty[EntityId](segmentLimits)
override def size: Int = segmentedRecencyList.size
@@ -532,11 +546,11 @@ private[pekko] final class
SegmentedLeastRecentlyUsedReplacementPolicy(
* @param initialLimit initial active entity capacity for a shard
*/
@InternalApi
-private[pekko] final class MostRecentlyUsedReplacementPolicy(initialLimit:
Int) extends ActiveEntities {
+private[pekko] final class MostRecentlyUsedReplacementPolicy(initialLimit:
Int, clock: Clock) extends ActiveEntities {
import EntityPassivationStrategy.PassivateEntities
private var limit = initialLimit
- private val recencyList = RecencyList.empty[EntityId]
+ private val recencyList = RecencyList[EntityId](clock)
override def size: Int = recencyList.size
@@ -578,7 +592,8 @@ private[pekko] final class
MostRecentlyUsedReplacementPolicy(initialLimit: Int)
private[pekko] final class LeastFrequentlyUsedReplacementPolicy(
initialLimit: Int,
dynamicAging: Boolean,
- idleEnabled: Boolean)
+ idleEnabled: Boolean,
+ clock: () => Clock)
extends ActiveEntities {
import EntityPassivationStrategy.PassivateEntities
@@ -586,7 +601,7 @@ private[pekko] final class
LeastFrequentlyUsedReplacementPolicy(
private var limit = initialLimit
private val frequencyList =
- if (idleEnabled)
FrequencyList.withOverallRecency.empty[EntityId](dynamicAging)
+ if (idleEnabled) FrequencyList.withOverallRecency.empty[EntityId](clock(),
dynamicAging)
else FrequencyList.empty[EntityId](dynamicAging)
override def size: Int = frequencyList.size
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/CompositeSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/CompositeSpec.scala
index 29590a1e8a..f9fc1f7ca7 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/CompositeSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/CompositeSpec.scala
@@ -390,7 +390,7 @@ class CompositeWithIdleSpec extends
AbstractEntityPassivationSpec(CompositeSpec.
"passivate entities when they haven't seen messages for the configured
timeout" in {
val region = start()
- val lastSendNanoTime1 = System.nanoTime()
+ val lastSendNanoTime1 = clock.currentTime()
region ! Envelope(shard = 1, id = 1, message = "A")
region ! Envelope(shard = 1, id = 2, message = "B")
@@ -401,7 +401,7 @@ class CompositeWithIdleSpec extends
AbstractEntityPassivationSpec(CompositeSpec.
Thread.sleep((configuredIdleTimeout / 2).toMillis)
region ! Envelope(shard = 1, id = 3, message = "E")
Thread.sleep((configuredIdleTimeout / 2).toMillis)
- val lastSendNanoTime2 = System.nanoTime()
+ val lastSendNanoTime2 = clock.currentTime()
region ! Envelope(shard = 1, id = 3, message = "F")
expectReceived(id = 1, message = "A")
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/EntityPassivationSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/EntityPassivationSpec.scala
index a2e77c3a61..7ef0d613e6 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/EntityPassivationSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/EntityPassivationSpec.scala
@@ -26,6 +26,7 @@ import pekko.cluster.sharding.ShardRegion
import pekko.testkit.PekkoSpec
import pekko.testkit.TestProbe
import pekko.testkit.WithLogCapturing
+import pekko.util.Clock
import org.scalatest.concurrent.Eventually
@@ -42,6 +43,7 @@ object EntityPassivationSpec {
pekko.remote.artery.canonical.port = 0
pekko.cluster.sharding.verbose-debug-logging = on
pekko.cluster.sharding.fail-on-invalid-entity-state-transition = on
+ pekko.scheduled-clock-interval = 100 ms
""")
val disabledConfig: Config = ConfigFactory.parseString("""
@@ -62,9 +64,10 @@ object EntityPassivationSpec {
}
class Entity(probes: Map[String, ActorRef]) extends Actor {
+ private val clock = Clock(context.system)
def id = context.self.path.name
- def received(message: Any) = probes(id) ! Entity.Received(id, message,
System.nanoTime())
+ def received(message: Any) = probes(id) ! Entity.Received(id, message,
clock.currentTime())
def receive = {
case Entity.Stop =>
@@ -99,6 +102,8 @@ abstract class AbstractEntityPassivationSpec(config: Config,
expectedEntities: I
settings.passivationStrategySettings.idleEntitySettings.fold(Duration.Zero)(_.timeout)
val configuredActiveEntityLimit: Int =
settings.passivationStrategySettings.activeEntityLimit.getOrElse(0)
+ lazy val clock: Clock = Clock(system)
+
val probes: Map[Int, TestProbe] = (1 to expectedEntities).map(id => id ->
TestProbe()).toMap
val probeRefs: Map[String, ActorRef] = probes.map { case (id, probe) =>
id.toString -> probe.ref }
val stateProbe: TestProbe = TestProbe()
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/IdleSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/IdleSpec.scala
index 158fb21d85..a3bfde5287 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/IdleSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/IdleSpec.scala
@@ -37,7 +37,7 @@ class IdleSpec extends
AbstractEntityPassivationSpec(IdleSpec.config, expectedEn
"passivate entities when they haven't seen messages for the configured
duration" in {
val region = start()
- val lastSendNanoTime1 = System.nanoTime()
+ val lastSendNanoTime1 = clock.currentTime()
region ! Envelope(shard = 1, id = 1, message = "A")
region ! Envelope(shard = 2, id = 2, message = "B")
Thread.sleep((configuredIdleTimeout / 2).toMillis)
@@ -45,7 +45,7 @@ class IdleSpec extends
AbstractEntityPassivationSpec(IdleSpec.config, expectedEn
Thread.sleep((configuredIdleTimeout / 2).toMillis)
region ! Envelope(shard = 2, id = 2, message = "D")
Thread.sleep((configuredIdleTimeout / 2).toMillis)
- val lastSendNanoTime2 = System.nanoTime()
+ val lastSendNanoTime2 = clock.currentTime()
region ! Envelope(shard = 2, id = 2, message = "E")
expectReceived(id = 1, message = "A")
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/LeastFrequentlyUsedSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/LeastFrequentlyUsedSpec.scala
index 3b52bdb04e..0fb1c91b4f 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/LeastFrequentlyUsedSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/LeastFrequentlyUsedSpec.scala
@@ -303,7 +303,7 @@ class LeastFrequentlyUsedWithIdleSpec
"passivate entities when they haven't seen messages for the configured
timeout" in {
val region = start()
- val lastSendNanoTime1 = System.nanoTime()
+ val lastSendNanoTime1 = clock.currentTime()
region ! Envelope(shard = 1, id = 1, message = "A")
region ! Envelope(shard = 1, id = 2, message = "B")
@@ -314,7 +314,7 @@ class LeastFrequentlyUsedWithIdleSpec
Thread.sleep((configuredIdleTimeout / 2).toMillis)
region ! Envelope(shard = 1, id = 3, message = "E")
Thread.sleep((configuredIdleTimeout / 2).toMillis)
- val lastSendNanoTime2 = System.nanoTime()
+ val lastSendNanoTime2 = clock.currentTime()
region ! Envelope(shard = 1, id = 3, message = "F")
expectReceived(id = 1, message = "A")
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/LeastRecentlyUsedSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/LeastRecentlyUsedSpec.scala
index 66164a3fec..b077fd9298 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/LeastRecentlyUsedSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/LeastRecentlyUsedSpec.scala
@@ -263,7 +263,7 @@ class LeastRecentlyUsedWithIdleSpec
"passivate entities when they haven't seen messages for the configured
timeout" in {
val region = start()
- val lastSendNanoTime1 = System.nanoTime()
+ val lastSendNanoTime1 = clock.currentTime()
region ! Envelope(shard = 1, id = 1, message = "A")
region ! Envelope(shard = 1, id = 2, message = "B")
@@ -274,7 +274,7 @@ class LeastRecentlyUsedWithIdleSpec
Thread.sleep((configuredIdleTimeout / 2).toMillis)
region ! Envelope(shard = 1, id = 3, message = "E")
Thread.sleep((configuredIdleTimeout / 2).toMillis)
- val lastSendNanoTime2 = System.nanoTime()
+ val lastSendNanoTime2 = clock.currentTime()
region ! Envelope(shard = 1, id = 3, message = "F")
expectReceived(id = 1, message = "A")
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/MostRecentlyUsedSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/MostRecentlyUsedSpec.scala
index 259bf8d881..286af254ae 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/MostRecentlyUsedSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/MostRecentlyUsedSpec.scala
@@ -166,7 +166,7 @@ class MostRecentlyUsedWithIdleSpec
"passivate entities when they haven't seen messages for the configured
timeout" in {
val region = start()
- val lastSendNanoTime1 = System.nanoTime()
+ val lastSendNanoTime1 = clock.currentTime()
region ! Envelope(shard = 1, id = 1, message = "A")
region ! Envelope(shard = 1, id = 2, message = "B")
@@ -177,7 +177,7 @@ class MostRecentlyUsedWithIdleSpec
Thread.sleep((configuredIdleTimeout / 2).toMillis)
region ! Envelope(shard = 1, id = 3, message = "E")
Thread.sleep((configuredIdleTimeout / 2).toMillis)
- val lastSendNanoTime2 = System.nanoTime()
+ val lastSendNanoTime2 = clock.currentTime()
region ! Envelope(shard = 1, id = 3, message = "F")
expectReceived(id = 1, message = "A")
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
index e4a0137395..bd82635560 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/passivation/simulator/Simulator.scala
@@ -44,6 +44,7 @@ import
pekko.cluster.sharding.internal.SegmentedLeastRecentlyUsedEntityPassivati
import
pekko.cluster.sharding.internal.SegmentedLeastRecentlyUsedReplacementPolicy
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.Source
+import pekko.util.Clock
import pekko.util.OptionVal
import com.typesafe.config.ConfigFactory
@@ -63,8 +64,8 @@ object Simulator {
final case class Results(name: String, stats: ShardingStats)
def run(settings: SimulatorSettings): Unit = {
- val simulations = settings.runs.map(Simulation.apply)
implicit val system: ActorSystem = ActorSystem("simulator")
+ val simulations = settings.runs.map(s => Simulation(s, () =>
Clock(system)))
implicit val ec: ExecutionContext = system.dispatcher
Source(simulations)
.runFoldAsync(Seq.empty[Results]) { (results, simulation) =>
@@ -97,13 +98,13 @@ object Simulator {
strategyCreator: StrategyCreator)
object Simulation {
- def apply(runSettings: SimulatorSettings.RunSettings): Simulation =
+ def apply(runSettings: SimulatorSettings.RunSettings, clock: () => Clock):
Simulation =
Simulation(
name = runSettings.name,
numberOfShards = runSettings.shards,
numberOfRegions = runSettings.regions,
accessPattern = accessPattern(runSettings.pattern),
- strategyCreator = strategyCreator(runSettings))
+ strategyCreator = strategyCreator(runSettings, clock))
def accessPattern(patternSettings: SimulatorSettings.PatternSettings):
AccessPattern = patternSettings match {
case SimulatorSettings.PatternSettings.Synthetic(generator, events) =>
@@ -138,18 +139,18 @@ object Simulator {
new JoinedAccessPatterns(patterns.map(accessPattern))
}
- def strategyCreator(runSettings: SimulatorSettings.RunSettings):
StrategyCreator =
+ def strategyCreator(runSettings: SimulatorSettings.RunSettings, clock: ()
=> Clock): StrategyCreator =
runSettings.strategy match {
case SimulatorSettings.StrategySettings.Optimal(perRegionLimit) =>
new ClairvoyantStrategyCreator(perRegionLimit)
case
SimulatorSettings.StrategySettings.LeastRecentlyUsed(perRegionLimit, segmented)
=>
- new LeastRecentlyUsedStrategyCreator(perRegionLimit, segmented)
+ new LeastRecentlyUsedStrategyCreator(perRegionLimit, segmented,
clock)
case
SimulatorSettings.StrategySettings.MostRecentlyUsed(perRegionLimit) =>
- new MostRecentlyUsedStrategyCreator(perRegionLimit)
+ new MostRecentlyUsedStrategyCreator(perRegionLimit, clock())
case
SimulatorSettings.StrategySettings.LeastFrequentlyUsed(perRegionLimit,
dynamicAging) =>
- new LeastFrequentlyUsedStrategyCreator(perRegionLimit, dynamicAging)
+ new LeastFrequentlyUsedStrategyCreator(perRegionLimit, dynamicAging,
clock)
case settings: SimulatorSettings.StrategySettings.Composite =>
- new CompositeStrategyCreator(settings)
+ new CompositeStrategyCreator(settings, clock)
case SimulatorSettings.StrategySettings.NoStrategy =>
DisabledStrategyCreator
}
@@ -305,32 +306,35 @@ object Simulator {
override def preprocess(access: Access): Access = access
}
- final class LeastRecentlyUsedStrategyCreator(perRegionLimit: Int, segmented:
immutable.Seq[Double])
+ final class LeastRecentlyUsedStrategyCreator(
+ perRegionLimit: Int,
+ segmented: immutable.Seq[Double],
+ clock: () => Clock)
extends PassivationStrategyCreator {
override def create(shardId: ShardId): SimulatedStrategy =
new PassivationStrategy(
if (segmented.nonEmpty)
- new
SegmentedLeastRecentlyUsedEntityPassivationStrategy(perRegionLimit, segmented,
idleCheck = None)
- else new LeastRecentlyUsedEntityPassivationStrategy(perRegionLimit,
idleCheck = None))
+ new
SegmentedLeastRecentlyUsedEntityPassivationStrategy(perRegionLimit, segmented,
idleCheck = None, clock)
+ else new LeastRecentlyUsedEntityPassivationStrategy(perRegionLimit,
idleCheck = None, clock()))
}
- final class MostRecentlyUsedStrategyCreator(perRegionLimit: Int) extends
PassivationStrategyCreator {
+ final class MostRecentlyUsedStrategyCreator(perRegionLimit: Int, clock:
Clock) extends PassivationStrategyCreator {
override def create(shardId: ShardId): SimulatedStrategy =
- new PassivationStrategy(new
MostRecentlyUsedEntityPassivationStrategy(perRegionLimit, idleCheck = None))
+ new PassivationStrategy(new
MostRecentlyUsedEntityPassivationStrategy(perRegionLimit, idleCheck = None,
clock))
}
- final class LeastFrequentlyUsedStrategyCreator(perRegionLimit: Int,
dynamicAging: Boolean)
+ final class LeastFrequentlyUsedStrategyCreator(perRegionLimit: Int,
dynamicAging: Boolean, clock: () => Clock)
extends PassivationStrategyCreator {
override def create(shardId: ShardId): SimulatedStrategy =
new PassivationStrategy(
- new LeastFrequentlyUsedEntityPassivationStrategy(perRegionLimit,
dynamicAging, idleCheck = None))
+ new LeastFrequentlyUsedEntityPassivationStrategy(perRegionLimit,
dynamicAging, idleCheck = None, clock))
}
- final class CompositeStrategyCreator(settings:
SimulatorSettings.StrategySettings.Composite)
+ final class CompositeStrategyCreator(settings:
SimulatorSettings.StrategySettings.Composite, clock: () => Clock)
extends PassivationStrategyCreator {
override def create(shardId: ShardId): SimulatedStrategy = {
- val main = activeEntities(settings.main)
- val window = activeEntities(settings.window)
+ val main = activeEntities(settings.main, clock)
+ val window = activeEntities(settings.window, clock)
val initialWindowProportion = if (window eq NoActiveEntities) 0.0 else
settings.initialWindowProportion
val minimumWindowProportion = if (window eq NoActiveEntities) 0.0 else
settings.minimumWindowProportion
val maximumWindowProportion = if (window eq NoActiveEntities) 0.0 else
settings.maximumWindowProportion
@@ -351,16 +355,18 @@ object Simulator {
idleCheck = None))
}
- private def activeEntities(strategySettings:
SimulatorSettings.StrategySettings): ActiveEntities =
+ private def activeEntities(
+ strategySettings: SimulatorSettings.StrategySettings,
+ clock: () => Clock): ActiveEntities =
strategySettings match {
case
SimulatorSettings.StrategySettings.LeastRecentlyUsed(perRegionLimit, segmented)
if segmented.isEmpty =>
- new LeastRecentlyUsedReplacementPolicy(perRegionLimit)
+ new LeastRecentlyUsedReplacementPolicy(perRegionLimit, clock())
case
SimulatorSettings.StrategySettings.LeastRecentlyUsed(perRegionLimit, segmented)
=>
- new SegmentedLeastRecentlyUsedReplacementPolicy(perRegionLimit,
segmented, idleEnabled = false)
+ new SegmentedLeastRecentlyUsedReplacementPolicy(perRegionLimit,
segmented, idleEnabled = false, clock)
case
SimulatorSettings.StrategySettings.MostRecentlyUsed(perRegionLimit) =>
- new MostRecentlyUsedReplacementPolicy(perRegionLimit)
+ new MostRecentlyUsedReplacementPolicy(perRegionLimit, clock())
case
SimulatorSettings.StrategySettings.LeastFrequentlyUsed(perRegionLimit,
dynamicAging) =>
- new LeastFrequentlyUsedReplacementPolicy(perRegionLimit,
dynamicAging, idleEnabled = false)
+ new LeastFrequentlyUsedReplacementPolicy(perRegionLimit,
dynamicAging, idleEnabled = false, clock)
case _ => NoActiveEntities
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]