This is an automated email from the ASF dual-hosted git repository.

xiaozhenliu pushed a commit to branch 
xiaozhen-resource-allocator-in-cost-estimator
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 8bd36d680a02705341d0607220c8f6194e723da1
Author: Xiao-zhen-Liu <[email protected]>
AuthorDate: Tue Aug 19 19:57:03 2025 -0700

    refactoring to address comments.
---
 .../scheduling/CostBasedScheduleGenerator.scala      |  6 ++++--
 .../architecture/scheduling/CostEstimator.scala      | 20 ++++++++------------
 .../resourcePolicies/ResourceAllocator.scala         | 10 +++++-----
 3 files changed, 17 insertions(+), 19 deletions(-)

diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 1fa013049b..2d26c3fed5 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -606,9 +606,11 @@ class CostBasedScheduleGenerator(
       .map(level =>
         level
           .map(region => {
-            val (newRegion, regionCost) = 
costEstimator.allocateResourcesAndEstimateCost(region, 1)
+            val (resourceConfig, regionCost) =
+              costEstimator.allocateResourcesAndEstimateCost(region, 1)
             // Update the region in the regionDAG to be the new region with 
resources allocated.
-            replaceVertex(regionDAG, region, newRegion)
+            val regionWithResourceConfig = region.copy(resourceConfig = 
Some(resourceConfig))
+            replaceVertex(regionDAG, region, regionWithResourceConfig)
             regionCost
           })
           .sum
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
index 0e51328094..78964f57bb 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
@@ -24,13 +24,12 @@ import edu.uci.ics.amber.core.tuple.Tuple
 import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity
 import edu.uci.ics.amber.core.workflow.WorkflowContext
 import 
edu.uci.ics.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST
-import 
edu.uci.ics.amber.engine.architecture.scheduling.SchedulingUtils.replaceVertex
+import edu.uci.ics.amber.engine.architecture.scheduling.config.ResourceConfig
 import 
edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.ResourceAllocator
 import edu.uci.ics.amber.engine.common.AmberLogging
 import edu.uci.ics.texera.dao.SqlServer
 import edu.uci.ics.texera.dao.SqlServer.withTransaction
 import edu.uci.ics.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, 
WORKFLOW_VERSION}
-import org.jgrapht.graph.DirectedAcyclicGraph
 
 import java.net.URI
 import scala.util.{Failure, Success, Try}
@@ -45,9 +44,9 @@ trait CostEstimator {
     *
     * Note currently the ResourceAllocator is not cost-based and thus we use a 
cost model that does not rely on the
     * allocator, i.e., the cost estimation process is external to the 
ResourceAllocator.
-    * @return An updated region with allocated resources and an estimated cost 
of this region.
+    * @return A ResourceConfig for the region and an estimated cost of this 
region.
     */
-  def allocateResourcesAndEstimateCost(region: Region, resourceUnits: Int): 
(Region, Double)
+  def allocateResourcesAndEstimateCost(region: Region, resourceUnits: Int): 
(ResourceConfig, Double)
 }
 
 object DefaultCostEstimator {
@@ -88,9 +87,9 @@ class DefaultCostEstimator(
   override def allocateResourcesAndEstimateCost(
       region: Region,
       resourceUnits: Int
-  ): (Region, Double) = {
+  ): (ResourceConfig, Double) = {
     // Currently the dummy cost from resourceAllocator is discarded.
-    val (newRegion, _) = resourceAllocator.allocate(region)
+    val (resourceConfig, _) = resourceAllocator.allocate(region)
     // We use a cost model that does not rely on the resource allocation.
     // TODO: Once the ResourceAllocator actually calculates a cost, we can use 
its calculated cost.
     val cost = this.operatorEstimatedTimeOption match {
@@ -99,7 +98,7 @@ class DefaultCostEstimator(
         // operator in each region to represent the region's execution time, 
and use the sum of all the regions'
         // execution time as the wall-clock runtime of the workflow.
         // This assumes a schedule is a total-order of the regions.
-        val opExecutionTimes = newRegion.getOperators.map(op => {
+        val opExecutionTimes = region.getOperators.map(op => {
           operatorEstimatedTime.getOrElse(op.id.logicalOpId.id, 
DEFAULT_OPERATOR_COST)
         })
         val longestRunningOpExecutionTime = opExecutionTimes.max
@@ -108,12 +107,9 @@ class DefaultCostEstimator(
         // Without past statistics (e.g., first execution), we use number of 
ports needing storage as the cost.
         // Each port needing storage has a portConfig.
         // This is independent of the schedule / resource allocator.
-        newRegion.resourceConfig match {
-          case Some(config) => config.portConfigs.size
-          case None         => 0
-        }
+        resourceConfig.portConfigs.size
     }
-    (newRegion, cost)
+    (resourceConfig, cost)
   }
 
   /**
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
index 7a7f9add58..39f3f5b9ac 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
@@ -33,7 +33,7 @@ import java.net.URI
 import scala.collection.mutable
 
 trait ResourceAllocator {
-  def allocate(region: Region): (Region, Double)
+  def allocate(region: Region): (ResourceConfig, Double)
 }
 
 class DefaultResourceAllocator(
@@ -58,14 +58,14 @@ class DefaultResourceAllocator(
     *
     * @param region The region for which to allocate resources.
     * @return A tuple containing:
-    *         1) A new Region instance with new resource configuration.
-    *         2) An estimated cost of the workflow with the new resource 
configuration,
+    *         1) A resource configuration.
+    *         2) An estimated cost of the workflow with the resource 
configuration,
     *         represented as a Double value (currently set to 0, but will be
     *         updated in the future).
     */
   def allocate(
       region: Region
-  ): (Region, Double) = {
+  ): (ResourceConfig, Double) = {
 
     val opToOperatorConfigMapping = region.getOperators
       .map(physicalOp => physicalOp.id -> 
OperatorConfig(generateWorkerConfigs(physicalOp)))
@@ -137,7 +137,7 @@ class DefaultResourceAllocator(
       portConfigs
     )
 
-    (region.copy(resourceConfig = Some(resourceConfig)), 0)
+    (resourceConfig, 0)
   }
 
   /**

Reply via email to