This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-cm-for-loop-mat-dcm
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-cm-for-loop-mat-dcm by
this push:
new f6b64b0fbe update
f6b64b0fbe is described below
commit f6b64b0fbe9f31eeb93f43a9d72487265b05c9bc
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Feb 4 19:58:40 2026 -0800
update
---
.../engine/architecture/controller/WorkflowScheduler.scala | 6 +++++-
.../architecture/controller/execution/RegionExecution.scala | 2 --
.../architecture/controller/execution/WorkflowExecution.scala | 5 -----
.../texera/amber/engine/architecture/scheduling/Schedule.scala | 10 +++++++++-
4 files changed, 14 insertions(+), 9 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index 9dcf3ad4bf..aa51f3f0cc 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -52,6 +52,10 @@ class WorkflowScheduler(
this.physicalPlan = updatedPhysicalPlan
}
- def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else
schedule.next()
+ def getNextRegions: Set[Region] = {
+ val region : Set[Region] = if (!schedule.hasNext) Set() else
schedule.loopNext()
+ println("current Region: " + region)
+ region
+ }
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala
index d5939c2e3b..e905c2b044 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala
@@ -59,8 +59,6 @@ case class RegionExecution(region: Region) {
physicalOpId: PhysicalOpIdentity,
inheritOperatorExecution: Option[OperatorExecution] = None
): OperatorExecution = {
- assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution
already exists.")
-
operatorExecutions.getOrElseUpdate(
physicalOpId,
inheritOperatorExecution
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index dea9b692a4..b8b6d68091 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -44,11 +44,6 @@ case class WorkflowExecution() {
* @throws AssertionError if the `RegionExecution` has already been
initialized.
*/
def initRegionExecution(region: Region): RegionExecution = {
- // ensure the region execution hasn't been initialized already.
- assert(
- !regionExecutions.contains(region.id),
- s"RegionExecution of ${region.id} already initialized."
- )
regionExecutions.getOrElseUpdate(region.id, RegionExecution(region))
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
index 6f34c9ed1e..47474b8478 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
@@ -21,14 +21,22 @@ package
org.apache.texera.amber.engine.architecture.scheduling
case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends
Iterator[Set[Region]] {
private var currentLevel = levelSets.keys.minOption.getOrElse(0)
-
+ private var loopStartLevel = currentLevel
def getRegions: List[Region] = levelSets.values.flatten.toList
override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel)
override def next(): Set[Region] = {
val regions = levelSets(currentLevel)
+
if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-"))))
loopStartLevel = currentLevel
currentLevel += 1
regions
}
+
+ def loopNext(): Set[Region] = {
+ val regions = levelSets(currentLevel)
+
if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))))
currentLevel = loopStartLevel
+ else currentLevel += 1
+ regions
+ }
}