This is an automated email from the ASF dual-hosted git repository.
xiaozhenliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new c7fcd9f427 refactor(amber): move resource allocator into cost
estimator (#3550)
c7fcd9f427 is described below
commit c7fcd9f4278f850e3c1456866ba6f25f85a8f489
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Wed Aug 20 18:25:08 2025 -0700
refactor(amber): move resource allocator into cost estimator (#3550)
For the cost-based scheduler in Amber, ideally, the ResourceAllocator
should be part of the CostEstimator. However for historical reasons and
the fact that the current ResourceAllocator is not cost-based at all, we
have been separating these two modules.
To make the modules clearer and to make it easier for future updates to
the ResourceAllocator, this PR merges the ResourceAllocator as part of
CostEstimator.
Note as the ResourceAllocator does not produce a real cost currently,
the CostEstimator still uses a cost model that does not rely on the
resource allocator. In the future when the ResourceAllocator does
produce a real cost, we can further consolidate these two modules.
### Code Changes
- `ResourceAllocator` is now inside `CostEstimator` for
`CostBasedScheduleGenerator`
- `allocateResources()` is now executed as part of the search process
for `CostBasedScheduleGenerator` (previously it was done after the
search), and is merged with `estimate()` to be
`allocateResourcesAndEvaluateCost`
- Moved some methods only used by `ExpansionGreedyScheduleGenerator`
from the base class into that class.
---
.../scheduling/CostBasedScheduleGenerator.scala | 59 ++++++++--------
.../architecture/scheduling/CostEstimator.scala | 30 ++++++--
.../ExpansionGreedyScheduleGenerator.scala | 71 ++++++++++++++++++-
.../scheduling/ScheduleGenerator.scala | 79 ++--------------------
.../architecture/scheduling/SchedulingUtils.scala | 57 ++++++++++++++++
.../resourcePolicies/ResourceAllocator.scala | 10 +--
.../scheduling/DefaultCostEstimatorSpec.scala | 35 ++++++++--
7 files changed, 223 insertions(+), 118 deletions(-)
diff --git
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 59ef4deaf8..2d26c3fed5 100644
---
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -29,6 +29,7 @@ import edu.uci.ics.amber.core.workflow.{
PhysicalPlan,
WorkflowContext
}
+import
edu.uci.ics.amber.engine.architecture.scheduling.SchedulingUtils.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.config.{
IntermediateInputPortConfig,
OutputPortConfig,
@@ -68,7 +69,11 @@ class CostBasedScheduleGenerator(
)
private val costEstimator =
- new DefaultCostEstimator(workflowContext = workflowContext, actorId =
actorId)
+ new DefaultCostEstimator(
+ workflowContext = workflowContext,
+ resourceAllocator = resourceAllocator,
+ actorId = actorId
+ )
def generate(): (Schedule, PhysicalPlan) = {
val startTime = System.nanoTime()
@@ -323,7 +328,6 @@ class CostBasedScheduleGenerator(
)
val regionDAG = searchResult.regionDAG
- allocateResource(regionDAG)
regionDAG
}
@@ -397,10 +401,7 @@ class CostBasedScheduleGenerator(
def updateOptimumIfApplicable(regionDAG: DirectedAcyclicGraph[Region,
RegionLink]): Unit = {
if (oEarlyStop) schedulableStates.add(currentState)
// Calculate the current state's cost and update the bestResult if
it's lower
- val cost =
- evaluate(
- RegionPlan(regionDAG.vertexSet().asScala.toSet,
regionDAG.edgeSet().asScala.toSet)
- )
+ val cost = allocateResourcesAndEvaluateCost(regionDAG)
if (cost < bestResult.cost) {
bestResult = SearchResult(currentState, regionDAG, cost)
}
@@ -453,12 +454,7 @@ class CostBasedScheduleGenerator(
physicalPlan.getBlockingAndDependeeLinks ++ neighborState
) match {
case Left(regionDAG) =>
- evaluate(
- RegionPlan(
- regionDAG.vertexSet().asScala.toSet,
- regionDAG.edgeSet().asScala.toSet
- )
- )
+ allocateResourcesAndEvaluateCost(regionDAG)
case Right(_) =>
Double.MaxValue
}
@@ -544,10 +540,7 @@ class CostBasedScheduleGenerator(
*/
def updateOptimumIfApplicable(regionDAG: DirectedAcyclicGraph[Region,
RegionLink]): Unit = {
// Calculate the current state's cost and update the bestResult if
it's lower
- val cost =
- evaluate(
- RegionPlan(regionDAG.vertexSet().asScala.toSet,
regionDAG.edgeSet().asScala.toSet)
- )
+ val cost = allocateResourcesAndEvaluateCost(regionDAG)
if (cost < bestResult.cost) {
bestResult = SearchResult(currentState, regionDAG, cost)
}
@@ -577,12 +570,7 @@ class CostBasedScheduleGenerator(
physicalPlan.getBlockingAndDependeeLinks ++ neighborState
) match {
case Left(regionDAG) =>
- evaluate(
- RegionPlan(
- regionDAG.vertexSet().asScala.toSet,
- regionDAG.edgeSet().asScala.toSet
- )
- )
+ allocateResourcesAndEvaluateCost(regionDAG)
case Right(_) =>
Double.MaxValue
}
@@ -601,16 +589,33 @@ class CostBasedScheduleGenerator(
}
/**
- * The cost function used by the search. Takes a region plan, generates one
or more (to be done in the future)
- * schedules based on the region plan, and calculates the cost of the
schedule(s) using Cost Estimator. Uses the cost
- * of the best schedule (currently only considers one schedule) as the cost
of the region plan.
+ * Takes a region DAG, generates one or more (to be done in the future)
schedules based on the region DAG, allocates
+ * resources to each region in the region DAG, and calculates the cost of
the schedule(s) using Cost Estimator. Uses
+ * the cost of the best schedule (currently only considers one schedule) as
the cost of the region DAG.
*
* @return A cost determined by the cost estimator.
*/
- private def evaluate(regionPlan: RegionPlan): Double = {
+ private def allocateResourcesAndEvaluateCost(
+ regionDAG: DirectedAcyclicGraph[Region, RegionLink]
+ ): Double = {
+ val regionPlan =
+ RegionPlan(regionDAG.vertexSet().asScala.toSet,
regionDAG.edgeSet().asScala.toSet)
val schedule = generateScheduleFromRegionPlan(regionPlan)
// In the future we may allow multiple regions in a level and split the
resources.
- schedule.map(level => level.map(region => costEstimator.estimate(region,
1)).sum).sum
+ schedule
+ .map(level =>
+ level
+ .map(region => {
+ val (resourceConfig, regionCost) =
+ costEstimator.allocateResourcesAndEstimateCost(region, 1)
+ // Update the region in the regionDAG to be the new region with
resources allocated.
+ val regionWithResourceConfig = region.copy(resourceConfig =
Some(resourceConfig))
+ replaceVertex(regionDAG, region, regionWithResourceConfig)
+ regionCost
+ })
+ .sum
+ )
+ .sum
}
}
diff --git
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
index 6e7d5d8b85..78964f57bb 100644
---
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
@@ -24,6 +24,8 @@ import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.amber.core.workflow.WorkflowContext
import
edu.uci.ics.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST
+import edu.uci.ics.amber.engine.architecture.scheduling.config.ResourceConfig
+import
edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.ResourceAllocator
import edu.uci.ics.amber.engine.common.AmberLogging
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.SqlServer.withTransaction
@@ -36,7 +38,15 @@ import scala.util.{Failure, Success, Try}
* A cost estimator should estimate a cost of running a region under the
given resource constraints as units.
*/
trait CostEstimator {
- def estimate(region: Region, resourceUnits: Int): Double
+
+ /**
+ * Uses the given resource units to allocate resources to the region, and
determine a cost based on the allocation.
+ *
+ * Note currently the ResourceAllocator is not cost-based and thus we use a
cost model that does not rely on the
+ * allocator, i.e., the cost estimation process is external to the
ResourceAllocator.
+ * @return A ResourceConfig for the region and an estimated cost of this
region.
+ */
+ def allocateResourcesAndEstimateCost(region: Region, resourceUnits: Int):
(ResourceConfig, Double)
}
object DefaultCostEstimator {
@@ -50,6 +60,7 @@ object DefaultCostEstimator {
*/
class DefaultCostEstimator(
workflowContext: WorkflowContext,
+ val resourceAllocator: ResourceAllocator,
val actorId: ActorVirtualIdentity
) extends CostEstimator
with AmberLogging {
@@ -73,8 +84,15 @@ class DefaultCostEstimator(
case Some(_) =>
}
- override def estimate(region: Region, resourceUnits: Int): Double = {
- this.operatorEstimatedTimeOption match {
+ override def allocateResourcesAndEstimateCost(
+ region: Region,
+ resourceUnits: Int
+ ): (ResourceConfig, Double) = {
+ // Currently the dummy cost from resourceAllocator is discarded.
+ val (resourceConfig, _) = resourceAllocator.allocate(region)
+ // We use a cost model that does not rely on the resource allocation.
+ // TODO: Once the ResourceAllocator actually calculates a cost, we can use
its calculated cost.
+ val cost = this.operatorEstimatedTimeOption match {
case Some(operatorEstimatedTime) =>
// Use past statistics (wall-clock runtime). We use the execution time
of the longest-running
// operator in each region to represent the region's execution time,
and use the sum of all the regions'
@@ -89,11 +107,9 @@ class DefaultCostEstimator(
// Without past statistics (e.g., first execution), we use number of
ports needing storage as the cost.
// Each port needing storage has a portConfig.
// This is independent of the schedule / resource allocator.
- region.resourceConfig match {
- case Some(config) => config.portConfigs.size
- case None => 0
- }
+ resourceConfig.portConfigs.size
}
+ (resourceConfig, cost)
}
/**
diff --git
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
index 9db1a8988d..8cbc7075bf 100644
---
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
@@ -29,7 +29,7 @@ import edu.uci.ics.amber.core.workflow.{
PhysicalPlan,
WorkflowContext
}
-import
edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
+import SchedulingUtils.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.config.{
IntermediateInputPortConfig,
OutputPortConfig,
@@ -37,11 +37,12 @@ import
edu.uci.ics.amber.engine.architecture.scheduling.config.{
}
import org.jgrapht.alg.connectivity.BiconnectivityInspector
import org.jgrapht.graph.DirectedAcyclicGraph
+import org.jgrapht.traverse.TopologicalOrderIterator
import java.net.URI
import scala.annotation.tailrec
import scala.collection.mutable
-import scala.jdk.CollectionConverters.CollectionHasAsScala
+import scala.jdk.CollectionConverters.{CollectionHasAsScala,
IteratorHasAsScala}
@deprecated(
"This greedy schedule generator will be removed in the future. Use
CostBasedScheduleGenerator instead."
@@ -361,6 +362,72 @@ class ExpansionGreedyScheduleGenerator(
newPhysicalPlan
}
+ private def allocateResource(
+ regionDAG: DirectedAcyclicGraph[Region, RegionLink]
+ ): Unit = {
+ // generate the resource configs
+ new TopologicalOrderIterator(regionDAG).asScala
+ .foreach(region => {
+ val (resourceConfig, _) = resourceAllocator.allocate(region)
+ val regionWithResourceConfig = region.copy(resourceConfig =
Some(resourceConfig))
+ replaceVertex(regionDAG, region, regionWithResourceConfig)
+ })
+ }
+
+ private def getRegions(
+ physicalOpId: PhysicalOpIdentity,
+ regionDAG: DirectedAcyclicGraph[Region, RegionLink]
+ ): Set[Region] = {
+ regionDAG
+ .vertexSet()
+ .asScala
+ .filter(region => region.getOperators.map(_.id).contains(physicalOpId))
+ .toSet
+ }
+
+ /**
+ * For a dependee input link, although it connects two regions A->B, we
include this link and its toOp in region A
+ * so that the dependee link will be completed first.
+ */
+ private def populateDependeeLinks(
+ regionDAG: DirectedAcyclicGraph[Region, RegionLink]
+ ): Unit = {
+
+ val dependeeLinks = physicalPlan
+ .topologicalIterator()
+ .flatMap { physicalOpId =>
+ val upstreamPhysicalOpIds =
physicalPlan.getUpstreamPhysicalOpIds(physicalOpId)
+ upstreamPhysicalOpIds.flatMap { upstreamPhysicalOpId =>
+ physicalPlan
+ .getLinksBetween(upstreamPhysicalOpId, physicalOpId)
+ .filter(link =>
+ physicalPlan
+ .getOperator(physicalOpId)
+ .isInputLinkDependee(link)
+ )
+ }
+ }
+ .toSet
+
+ dependeeLinks
+ .flatMap { link => getRegions(link.fromOpId, regionDAG).map(region =>
region -> link) }
+ .groupBy(_._1)
+ .view
+ .mapValues(_.map(_._2))
+ .foreach {
+ case (region, links) =>
+ val newRegion = region.copy(
+ physicalLinks = region.physicalLinks ++ links,
+ physicalOps =
+ region.getOperators ++ links.map(_.toOpId).map(id =>
physicalPlan.getOperator(id)),
+ ports = region.getPorts ++ links.map(dependeeLink =>
+ GlobalPortIdentity(dependeeLink.toOpId, dependeeLink.toPortId,
input = true)
+ )
+ )
+ replaceVertex(regionDAG, region, newRegion)
+ }
+ }
+
/**
* This function creates and connects a region DAG while conducting
materialization replacement.
* It keeps attempting to create a region DAG from the given PhysicalPlan.
When failed, a list
diff --git
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala
index ad6187104d..41405ebf3a 100644
---
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala
@@ -22,7 +22,6 @@ package edu.uci.ics.amber.engine.architecture.scheduling
import edu.uci.ics.amber.config.ApplicationConfig
import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity
import edu.uci.ics.amber.core.workflow._
-import
edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
DefaultResourceAllocator,
ExecutionClusterInfo
@@ -69,6 +68,12 @@ abstract class ScheduleGenerator(
var physicalPlan: PhysicalPlan
) {
private val executionClusterInfo = new ExecutionClusterInfo()
+ val resourceAllocator =
+ new DefaultResourceAllocator(
+ physicalPlan,
+ executionClusterInfo,
+ workflowContext.workflowSettings
+ )
def generate(): (Schedule, PhysicalPlan)
@@ -121,76 +126,4 @@ abstract class ScheduleGenerator(
}.toMap
Schedule(levelSets)
}
-
- def allocateResource(
- regionDAG: DirectedAcyclicGraph[Region, RegionLink]
- ): Unit = {
-
- val resourceAllocator =
- new DefaultResourceAllocator(
- physicalPlan,
- executionClusterInfo,
- workflowContext.workflowSettings
- )
- // generate the resource configs
- new TopologicalOrderIterator(regionDAG).asScala
- .foreach(region => {
- val (newRegion, _) = resourceAllocator.allocate(region)
- replaceVertex(regionDAG, region, newRegion)
- })
- }
-
- def getRegions(
- physicalOpId: PhysicalOpIdentity,
- regionDAG: DirectedAcyclicGraph[Region, RegionLink]
- ): Set[Region] = {
- regionDAG
- .vertexSet()
- .asScala
- .filter(region => region.getOperators.map(_.id).contains(physicalOpId))
- .toSet
- }
-
- /**
- * For a dependee input link, although it connects two regions A->B, we
include this link and its toOp in region A
- * so that the dependee link will be completed first.
- */
- def populateDependeeLinks(
- regionDAG: DirectedAcyclicGraph[Region, RegionLink]
- ): Unit = {
-
- val dependeeLinks = physicalPlan
- .topologicalIterator()
- .flatMap { physicalOpId =>
- val upstreamPhysicalOpIds =
physicalPlan.getUpstreamPhysicalOpIds(physicalOpId)
- upstreamPhysicalOpIds.flatMap { upstreamPhysicalOpId =>
- physicalPlan
- .getLinksBetween(upstreamPhysicalOpId, physicalOpId)
- .filter(link =>
- physicalPlan
- .getOperator(physicalOpId)
- .isInputLinkDependee(link)
- )
- }
- }
- .toSet
-
- dependeeLinks
- .flatMap { link => getRegions(link.fromOpId, regionDAG).map(region =>
region -> link) }
- .groupBy(_._1)
- .view
- .mapValues(_.map(_._2))
- .foreach {
- case (region, links) =>
- val newRegion = region.copy(
- physicalLinks = region.physicalLinks ++ links,
- physicalOps =
- region.getOperators ++ links.map(_.toOpId).map(id =>
physicalPlan.getOperator(id)),
- ports = region.getPorts ++ links.map(dependeeLink =>
- GlobalPortIdentity(dependeeLink.toOpId, dependeeLink.toPortId,
input = true)
- )
- )
- replaceVertex(regionDAG, region, newRegion)
- }
- }
}
diff --git
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/SchedulingUtils.scala
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/SchedulingUtils.scala
new file mode 100644
index 0000000000..46ba8580b1
--- /dev/null
+++
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/SchedulingUtils.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.amber.engine.architecture.scheduling
+
+import org.jgrapht.graph.DirectedAcyclicGraph
+
+import scala.jdk.CollectionConverters.CollectionHasAsScala
+
+object SchedulingUtils {
+
+ def replaceVertex(
+ graph: DirectedAcyclicGraph[Region, RegionLink],
+ oldVertex: Region,
+ newVertex: Region
+ ): Unit = {
+ if (oldVertex.equals(newVertex)) {
+ return
+ }
+ graph.addVertex(newVertex)
+ graph
+ .outgoingEdgesOf(oldVertex)
+ .asScala
+ .toList
+ .foreach(oldEdge => {
+ val dest = graph.getEdgeTarget(oldEdge)
+ graph.removeEdge(oldEdge)
+ graph.addEdge(newVertex, dest, RegionLink(newVertex.id, dest.id))
+ })
+ graph
+ .incomingEdgesOf(oldVertex)
+ .asScala
+ .toList
+ .foreach(oldEdge => {
+ val source = graph.getEdgeSource(oldEdge)
+ graph.removeEdge(oldEdge)
+ graph.addEdge(source, newVertex, RegionLink(source.id, newVertex.id))
+ })
+ graph.removeVertex(oldVertex)
+ }
+}
diff --git
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
index 7a7f9add58..39f3f5b9ac 100644
---
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
@@ -33,7 +33,7 @@ import java.net.URI
import scala.collection.mutable
trait ResourceAllocator {
- def allocate(region: Region): (Region, Double)
+ def allocate(region: Region): (ResourceConfig, Double)
}
class DefaultResourceAllocator(
@@ -58,14 +58,14 @@ class DefaultResourceAllocator(
*
* @param region The region for which to allocate resources.
* @return A tuple containing:
- * 1) A new Region instance with new resource configuration.
- * 2) An estimated cost of the workflow with the new resource
configuration,
+ * 1) A resource configuration.
+ * 2) An estimated cost of the workflow with the resource
configuration,
* represented as a Double value (currently set to 0, but will be
* updated in the future).
*/
def allocate(
region: Region
- ): (Region, Double) = {
+ ): (ResourceConfig, Double) = {
val opToOperatorConfigMapping = region.getOperators
.map(physicalOp => physicalOp.id ->
OperatorConfig(generateWorkerConfigs(physicalOp)))
@@ -137,7 +137,7 @@ class DefaultResourceAllocator(
portConfigs
)
- (region.copy(resourceConfig = Some(resourceConfig)), 0)
+ (resourceConfig, 0)
}
/**
diff --git
a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala
b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala
index c4fb8408e6..1700c26e9f 100644
---
a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala
+++
b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala
@@ -25,6 +25,10 @@ import edu.uci.ics.amber.core.storage.{DocumentFactory,
VFSURIFactory}
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity,
WorkflowContext}
+import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
+ DefaultResourceAllocator,
+ ExecutionClusterInfo
+}
import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER
import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow
import edu.uci.ics.amber.operator.TestOperators
@@ -128,9 +132,16 @@ class DefaultCostEstimatorSpec
),
new WorkflowContext()
)
+ val resourceAllocator =
+ new DefaultResourceAllocator(
+ workflow.physicalPlan,
+ new ExecutionClusterInfo(),
+ workflow.context.workflowSettings
+ )
val costEstimator = new DefaultCostEstimator(
workflow.context,
+ resourceAllocator,
CONTROLLER
)
val ports = workflow.physicalPlan.operators.flatMap(op =>
@@ -148,7 +159,7 @@ class DefaultCostEstimatorSpec
ports = ports
)
- val costOfRegion = costEstimator.estimate(region, 1)
+ val (_, costOfRegion) =
costEstimator.allocateResourcesAndEstimateCost(region, 1)
assert(costOfRegion == 0)
}
@@ -215,8 +226,16 @@ class DefaultCostEstimatorSpec
writer.putOne(keywordOpRuntimeStatistics)
writer.close()
+ val resourceAllocator =
+ new DefaultResourceAllocator(
+ workflow.physicalPlan,
+ new ExecutionClusterInfo(),
+ workflow.context.workflowSettings
+ )
+
val costEstimator = new DefaultCostEstimator(
workflow.context,
+ resourceAllocator,
CONTROLLER
)
@@ -235,7 +254,7 @@ class DefaultCostEstimatorSpec
ports = ports
)
- val costOfRegion = costEstimator.estimate(region, 1)
+ val (_, costOfRegion) =
costEstimator.allocateResourcesAndEstimateCost(region, 1)
assert(costOfRegion != 0)
}
@@ -337,12 +356,20 @@ class DefaultCostEstimatorSpec
val keywordRegion =
searchResult.regionDAG.vertexSet().asScala.filter(region =>
region.physicalOps.size == 1).head
+ val resourceAllocator =
+ new DefaultResourceAllocator(
+ workflow.physicalPlan,
+ new ExecutionClusterInfo(),
+ workflow.context.workflowSettings
+ )
+
val costEstimator = new DefaultCostEstimator(
workflow.context,
+ resourceAllocator,
CONTROLLER
)
- val groupByRegionCost = costEstimator.estimate(groupByRegion, 1)
+ val (_, groupByRegionCost) =
costEstimator.allocateResourcesAndEstimateCost(groupByRegion, 1)
val groupByOperatorCost =
(groupByOpRuntimeStatistics.getField(6).asInstanceOf[Long] +
groupByOpRuntimeStatistics.getField(7).asInstanceOf[Long]) / 1e9
@@ -352,7 +379,7 @@ class DefaultCostEstimatorSpec
// The GroupBy operator has a longer running time.
assert(groupByRegionCost == groupByOperatorCost)
- val keywordRegionCost = costEstimator.estimate(keywordRegion, 1)
+ val (_, keywordRegionCost) =
costEstimator.allocateResourcesAndEstimateCost(keywordRegion, 1)
val keywordOperatorCost =
(keywordOpRuntimeStatistics.getField(6).asInstanceOf[Long] +
keywordOpRuntimeStatistics.getField(7).asInstanceOf[Long]) / 1e9