This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-loop-region
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 83b9b7d65bcbf607ed0007895755e4b204516acc
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Sep 9 23:06:34 2025 -0700

    init
---
 .../engine/architecture/controller/WorkflowScheduler.scala     | 10 +++++++++-
 .../architecture/controller/execution/RegionExecution.scala    |  2 +-
 .../architecture/controller/execution/WorkflowExecution.scala  |  5 +----
 .../worker/promisehandlers/AssignPortHandler.scala             |  4 ++--
 .../dashboard/user/workflow/WorkflowExecutionsResource.scala   |  5 +++++
 5 files changed, 18 insertions(+), 8 deletions(-)

diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala
index 732b970f03..a705067bb9 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -19,6 +19,7 @@
 
 package edu.uci.ics.amber.engine.architecture.controller
 
+import edu.uci.ics.amber.core.executor.OpExecWithClassName
 import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity
 import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
 import edu.uci.ics.amber.engine.architecture.scheduling.{
@@ -33,6 +34,7 @@ class WorkflowScheduler(
 ) extends java.io.Serializable {
   var physicalPlan: PhysicalPlan = _
   private var schedule: Schedule = _
+  var lastRegion: Set[Region] = _
 
   /**
     * Update the schedule to be executed, based on the given physicalPlan.
@@ -50,6 +52,12 @@ class WorkflowScheduler(
     this.physicalPlan = updatedPhysicalPlan
   }
 
-  def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else 
schedule.next()
+  def getNextRegions: Set[Region] = {
+    val region: Set[Region] = if (!schedule.hasNext) Set() else schedule.next()
+    val isAgg = region.head.physicalOps.exists { op 
=>op.opExecInitInfo.asInstanceOf[OpExecWithClassName].className.contains("Aggregate")}
+    if (isAgg) lastRegion = region
+    println("ergergerg", lastRegion)
+    if (lastRegion != null) lastRegion else region
+  }
 
 }
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/RegionExecution.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/RegionExecution.scala
index e78bac2956..a00532df27 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/RegionExecution.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/RegionExecution.scala
@@ -59,7 +59,7 @@ case class RegionExecution(region: Region) {
       physicalOpId: PhysicalOpIdentity,
       inheritOperatorExecution: Option[OperatorExecution] = None
   ): OperatorExecution = {
-    assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution 
already exists.")
+    //assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution 
already exists.")
 
     operatorExecutions.getOrElseUpdate(
       physicalOpId,
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/WorkflowExecution.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index b36e8b27b4..4120090e6e 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -45,10 +45,7 @@ case class WorkflowExecution() {
     */
   def initRegionExecution(region: Region): RegionExecution = {
     // ensure the region execution hasn't been initialized already.
-    assert(
-      !regionExecutions.contains(region.id),
-      s"RegionExecution of ${region.id} already initialized."
-    )
+    //assert(!regionExecutions.contains(region.id), s"RegionExecution of 
${region.id} already initialized.")
     regionExecutions.getOrElseUpdate(region.id, RegionExecution(region))
   }
 
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
index f28735a295..13eb8361c9 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
@@ -28,7 +28,7 @@ import 
edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{
 }
 import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import 
edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
-import 
edu.uci.ics.amber.engine.architecture.worker.statistics.WorkerState.{PAUSED, 
READY, RUNNING}
+import 
edu.uci.ics.amber.engine.architecture.worker.statistics.WorkerState.{PAUSED, 
READY, RUNNING, COMPLETED}
 import 
edu.uci.ics.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage
 
 import java.net.URI
@@ -51,7 +51,7 @@ trait AssignPortHandler {
         // Same as AddInputChannelHandler
         dp.inputGateway.getChannel(channelId).setPortId(msg.portId)
         dp.inputManager.getPort(msg.portId).channels.add(channelId)
-        dp.stateManager.assertState(READY, RUNNING, PAUSED)
+        dp.stateManager.assertState(READY, RUNNING, PAUSED, COMPLETED)
       }
     } else {
       val storageURIOption: Option[URI] = msg.storageUris.head match {
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index c3b63a2df8..619c2a6182 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -116,6 +116,11 @@ object WorkflowExecutionsResource {
           OPERATOR_PORT_EXECUTIONS.RESULT_URI
         )
         .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString)
+        .onConflict(
+          OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID,
+          OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID
+        )
+        .doNothing()
         .execute()
     } else {
       ExecutionResourcesMapping.addResourceUri(eid, uri)

Reply via email to