This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 850fd85176 feat: introduce materialized execution mode (#4158)
850fd85176 is described below

commit 850fd8517673626d62808487fa8994be5c1df906
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jan 23 18:15:40 2026 -0800

    feat: introduce materialized execution mode (#4158)
    
    ### What changes were proposed in this PR?
    We are introducing a materialized execution mode alongside the existing
    pipelined execution mode.
    
    To support this, a new workflow setting called execution mode has been
    added, along with a corresponding backend flag. Materialized and
    pipelined execution are treated as the same level and are represented as
    an enum in the configuration.
    
    We also introduce a flag named `defaultExecutionMode`, whose value is
    set to `pipelined`.
    
    | Execution Mode | Demo |
    | ------------- | ------------- |
    | Pipelined
    
|![Pipelined](https://github.com/user-attachments/assets/f4d23d97-7e79-4214-b7d2-ed48391acf8a)
    |
    | Materialized |
    
![Materialized](https://github.com/user-attachments/assets/14dfdf8e-1a07-445e-8f4f-0085f3783104)|
    
    ### Any related issues, documentation, discussions?
    This PR resolves issue #4157.
    
    ### How was this PR tested?
    Tested with the existing test cases and test cases added for
    MATERIALIZED ExecutionMode:
    
    - Execution Tests (`DataProcessingSpec.scala`): Added 10 test cases
    verifying that workflows with various operator combinations (CSV/JSONL
    sources, filters, aggregations, joins) execute correctly in MATERIALIZED
    mode and produce the same results as PIPELINED mode.
    - Region Tests (`CostBasedScheduleGeneratorSpec.scala`): Added 5 test
    cases verifying that CostBasedScheduleGenerator correctly creates one
    region per operator in MATERIALIZED mode, with assertions confirming:
    (1) only 1 state explored, (2) number of regions equals number of
    operators, (3) each region contains exactly 1 operator, and (4) all
    physical links become materialized region boundaries.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    No.
---
 .../scheduling/CostBasedScheduleGenerator.scala    |  36 ++-
 .../apache/texera/workflow/WorkflowCompiler.scala  |   5 +-
 .../CostBasedScheduleGeneratorSpec.scala           | 311 ++++++++++++++++++++-
 .../amber/engine/e2e/DataProcessingSpec.scala      | 145 +++++++++-
 common/config/src/main/resources/gui.conf          |   5 +-
 .../scala/org/apache/texera/config/GuiConfig.scala |   2 +
 .../{WorkflowSettings.scala => ExecutionMode.java} |  12 +-
 .../amber/core/workflow/WorkflowContext.scala      |   4 +-
 .../amber/core/workflow/WorkflowSettings.scala     |   6 +-
 .../texera/service/resource/ConfigResource.scala   |   1 +
 frontend/src/app/app.module.ts                     |   2 +
 .../app/common/service/gui-config.service.mock.ts  |   2 +
 frontend/src/app/common/type/gui-config.ts         |   2 +
 frontend/src/app/common/type/workflow.ts           |   6 +
 .../component/user-dashboard-test-fixtures.ts      |   4 +-
 .../user/user-workflow/user-workflow.component.ts  |   7 +-
 .../left-panel/settings/settings.component.html    |  17 +-
 .../left-panel/settings/settings.component.scss    |   4 -
 .../left-panel/settings/settings.component.ts      |  64 +++--
 .../model/workflow-action.service.ts               |   7 +-
 20 files changed, 587 insertions(+), 55 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 0b68aad221..401ccddc0a 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -304,10 +304,15 @@ class CostBasedScheduleGenerator(
     */
   private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = {
     val searchResultFuture: Future[SearchResult] = Future {
-      if (ApplicationConfig.useTopDownSearch)
-        topDownSearch(globalSearch = ApplicationConfig.useGlobalSearch)
-      else
-        bottomUpSearch(globalSearch = ApplicationConfig.useGlobalSearch)
+      workflowContext.workflowSettings.executionMode match {
+        case ExecutionMode.MATERIALIZED =>
+          getFullyMaterializedSearchState
+        case ExecutionMode.PIPELINED =>
+          if (ApplicationConfig.useTopDownSearch)
+            topDownSearch(globalSearch = ApplicationConfig.useGlobalSearch)
+          else
+            bottomUpSearch(globalSearch = ApplicationConfig.useGlobalSearch)
+      }
     }
     val searchResult = Try(
       Await.result(searchResultFuture, 
ApplicationConfig.searchTimeoutMilliseconds.milliseconds)
@@ -477,6 +482,29 @@ class CostBasedScheduleGenerator(
     )
   }
 
+  /** Constructs a baseline fully materialized region plan (one operator per 
region) and evaluates its cost. */
+  def getFullyMaterializedSearchState: SearchResult = {
+    val startTime = System.nanoTime()
+
+    val (regionDAG, cost) =
+      tryConnectRegionDAG(physicalPlan.links) match {
+        case Left(dag) => (dag, allocateResourcesAndEvaluateCost(dag))
+        case Right(_) =>
+          (
+            new DirectedAcyclicGraph[Region, RegionLink](classOf[RegionLink]),
+            Double.PositiveInfinity
+          )
+      }
+
+    SearchResult(
+      state = Set.empty,
+      regionDAG = regionDAG,
+      cost = cost,
+      searchTimeNanoSeconds = System.nanoTime() - startTime,
+      numStatesExplored = 1
+    )
+  }
+
   /**
     * Another direction to perform the search. Depending on the configuration, 
either a global search or a greedy search
     * will be performed to find an optimal plan. The search starts from a plan 
where all edges are materialized, and
diff --git 
a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala 
b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
index 50d07a9819..b93aa3e4db 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
@@ -151,8 +151,9 @@ class WorkflowCompiler(
     val (physicalPlan, outputPortsNeedingStorage) =
       expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None)
 
-    context.workflowSettings =
-      WorkflowSettings(context.workflowSettings.dataTransferBatchSize, 
outputPortsNeedingStorage)
+    context.workflowSettings = context.workflowSettings.copy(
+      outputPortsNeedingStorage = outputPortsNeedingStorage
+    )
 
     Workflow(context, logicalPlan, physicalPlan)
   }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
index 07c1758d9e..7d5227c36b 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
@@ -19,7 +19,12 @@
 
 package org.apache.texera.amber.engine.architecture.scheduling
 
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.{
+  ExecutionMode,
+  PortIdentity,
+  WorkflowContext,
+  WorkflowSettings
+}
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.texera.amber.engine.e2e.TestUtils.buildWorkflow
 import org.apache.texera.amber.operator.TestOperators
@@ -27,6 +32,8 @@ import org.apache.texera.workflow.LogicalLink
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.flatspec.AnyFlatSpec
 
+import scala.jdk.CollectionConverters._
+
 class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory {
 
   "CostBasedRegionPlanGenerator" should "finish bottom-up search using 
different pruning techniques with correct number of states explored in 
csv->->filter->join->filter2 workflow" in {
@@ -206,4 +213,306 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec 
with MockFactory {
 
   }
 
+  // MATERIALIZED ExecutionMode tests - each operator should be a separate 
region
+  "CostBasedRegionPlanGenerator" should "create separate region for each 
operator in MATERIALIZED mode for simple csv workflow" in {
+    val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+    val materializedContext = new WorkflowContext(
+      workflowSettings = WorkflowSettings(
+        dataTransferBatchSize = 400,
+        executionMode = ExecutionMode.MATERIALIZED
+      )
+    )
+    val workflow = buildWorkflow(
+      List(csvOpDesc),
+      List(),
+      materializedContext
+    )
+
+    val scheduleGenerator = new CostBasedScheduleGenerator(
+      workflow.context,
+      workflow.physicalPlan,
+      CONTROLLER
+    )
+    val result = scheduleGenerator.getFullyMaterializedSearchState
+
+    // Should only explore 1 state (fully materialized)
+    assert(result.numStatesExplored == 1)
+
+    // Each physical operator should be in its own region
+    val regions = result.regionDAG.vertexSet().asScala
+    val numPhysicalOps = workflow.physicalPlan.operators.size
+    assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, 
got ${regions.size}")
+
+    // Each region should contain exactly 1 operator
+    regions.foreach { region =>
+      assert(
+        region.getOperators.size == 1,
+        s"Expected region to have 1 operator, got ${region.getOperators.size}"
+      )
+    }
+  }
+
+  "CostBasedRegionPlanGenerator" should "create separate region for each 
operator in MATERIALIZED mode for csv->keyword workflow" in {
+    val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+    val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+    val materializedContext = new WorkflowContext(
+      workflowSettings = WorkflowSettings(
+        dataTransferBatchSize = 400,
+        executionMode = ExecutionMode.MATERIALIZED
+      )
+    )
+    val workflow = buildWorkflow(
+      List(csvOpDesc, keywordOpDesc),
+      List(
+        LogicalLink(
+          csvOpDesc.operatorIdentifier,
+          PortIdentity(),
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      materializedContext
+    )
+
+    val scheduleGenerator = new CostBasedScheduleGenerator(
+      workflow.context,
+      workflow.physicalPlan,
+      CONTROLLER
+    )
+    val result = scheduleGenerator.getFullyMaterializedSearchState
+
+    // Should only explore 1 state (fully materialized)
+    assert(result.numStatesExplored == 1)
+
+    // Each physical operator should be in its own region
+    val regions = result.regionDAG.vertexSet().asScala
+    val numPhysicalOps = workflow.physicalPlan.operators.size
+    assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, 
got ${regions.size}")
+
+    // Each region should contain exactly 1 operator
+    regions.foreach { region =>
+      assert(
+        region.getOperators.size == 1,
+        s"Expected region to have 1 operator, got ${region.getOperators.size}"
+      )
+    }
+
+    // All links should be materialized (represented as region links)
+    val numRegionLinks = result.regionDAG.edgeSet().asScala.size
+    val numPhysicalLinks = workflow.physicalPlan.links.size
+    assert(
+      numRegionLinks == numPhysicalLinks,
+      s"Expected $numPhysicalLinks region links, got $numRegionLinks"
+    )
+  }
+
+  "CostBasedRegionPlanGenerator" should "create separate region for each 
operator in MATERIALIZED mode for csv->keyword->count workflow" in {
+    val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+    val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+    val countOpDesc = TestOperators.aggregateAndGroupByDesc(
+      "Region",
+      org.apache.texera.amber.operator.aggregate.AggregationFunction.COUNT,
+      List[String]()
+    )
+    val materializedContext = new WorkflowContext(
+      workflowSettings = WorkflowSettings(
+        dataTransferBatchSize = 400,
+        executionMode = ExecutionMode.MATERIALIZED
+      )
+    )
+    val workflow = buildWorkflow(
+      List(csvOpDesc, keywordOpDesc, countOpDesc),
+      List(
+        LogicalLink(
+          csvOpDesc.operatorIdentifier,
+          PortIdentity(),
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity()
+        ),
+        LogicalLink(
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity(),
+          countOpDesc.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      materializedContext
+    )
+
+    val scheduleGenerator = new CostBasedScheduleGenerator(
+      workflow.context,
+      workflow.physicalPlan,
+      CONTROLLER
+    )
+    val result = scheduleGenerator.getFullyMaterializedSearchState
+
+    // Should only explore 1 state (fully materialized)
+    assert(result.numStatesExplored == 1)
+
+    // Each physical operator should be in its own region
+    val regions = result.regionDAG.vertexSet().asScala
+    val numPhysicalOps = workflow.physicalPlan.operators.size
+    assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, 
got ${regions.size}")
+
+    // Each region should contain exactly 1 operator
+    regions.foreach { region =>
+      assert(
+        region.getOperators.size == 1,
+        s"Expected region to have 1 operator, got ${region.getOperators.size}"
+      )
+    }
+
+    // All links should be materialized (represented as region links)
+    val numRegionLinks = result.regionDAG.edgeSet().asScala.size
+    val numPhysicalLinks = workflow.physicalPlan.links.size
+    assert(
+      numRegionLinks == numPhysicalLinks,
+      s"Expected $numPhysicalLinks region links, got $numRegionLinks"
+    )
+  }
+
+  "CostBasedRegionPlanGenerator" should "create separate region for each 
operator in MATERIALIZED mode for join workflow" in {
+    val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
+    val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc()
+    val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
+    val materializedContext = new WorkflowContext(
+      workflowSettings = WorkflowSettings(
+        dataTransferBatchSize = 400,
+        executionMode = ExecutionMode.MATERIALIZED
+      )
+    )
+    val workflow = buildWorkflow(
+      List(
+        headerlessCsvOpDesc1,
+        headerlessCsvOpDesc2,
+        joinOpDesc
+      ),
+      List(
+        LogicalLink(
+          headerlessCsvOpDesc1.operatorIdentifier,
+          PortIdentity(),
+          joinOpDesc.operatorIdentifier,
+          PortIdentity()
+        ),
+        LogicalLink(
+          headerlessCsvOpDesc2.operatorIdentifier,
+          PortIdentity(),
+          joinOpDesc.operatorIdentifier,
+          PortIdentity(1)
+        )
+      ),
+      materializedContext
+    )
+
+    val scheduleGenerator = new CostBasedScheduleGenerator(
+      workflow.context,
+      workflow.physicalPlan,
+      CONTROLLER
+    )
+    val result = scheduleGenerator.getFullyMaterializedSearchState
+
+    // Should only explore 1 state (fully materialized)
+    assert(result.numStatesExplored == 1)
+
+    // Each physical operator should be in its own region
+    val regions = result.regionDAG.vertexSet().asScala
+    val numPhysicalOps = workflow.physicalPlan.operators.size
+    assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, 
got ${regions.size}")
+
+    // Each region should contain exactly 1 operator
+    regions.foreach { region =>
+      assert(
+        region.getOperators.size == 1,
+        s"Expected region to have 1 operator, got ${region.getOperators.size}"
+      )
+    }
+
+    // All links should be materialized (represented as region links)
+    val numRegionLinks = result.regionDAG.edgeSet().asScala.size
+    val numPhysicalLinks = workflow.physicalPlan.links.size
+    assert(
+      numRegionLinks == numPhysicalLinks,
+      s"Expected $numPhysicalLinks region links, got $numRegionLinks"
+    )
+  }
+
+  "CostBasedRegionPlanGenerator" should "create separate region for each 
operator in MATERIALIZED mode for complex csv->->filter->join->filter2 
workflow" in {
+    val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
+    val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
+    val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
+    val keywordOpDesc2 = TestOperators.keywordSearchOpDesc("column-1", "Asia")
+    val materializedContext = new WorkflowContext(
+      workflowSettings = WorkflowSettings(
+        dataTransferBatchSize = 400,
+        executionMode = ExecutionMode.MATERIALIZED
+      )
+    )
+    val workflow = buildWorkflow(
+      List(
+        headerlessCsvOpDesc1,
+        keywordOpDesc,
+        joinOpDesc,
+        keywordOpDesc2
+      ),
+      List(
+        LogicalLink(
+          headerlessCsvOpDesc1.operatorIdentifier,
+          PortIdentity(),
+          joinOpDesc.operatorIdentifier,
+          PortIdentity()
+        ),
+        LogicalLink(
+          headerlessCsvOpDesc1.operatorIdentifier,
+          PortIdentity(),
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity()
+        ),
+        LogicalLink(
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity(),
+          joinOpDesc.operatorIdentifier,
+          PortIdentity(1)
+        ),
+        LogicalLink(
+          joinOpDesc.operatorIdentifier,
+          PortIdentity(),
+          keywordOpDesc2.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      materializedContext
+    )
+
+    val scheduleGenerator = new CostBasedScheduleGenerator(
+      workflow.context,
+      workflow.physicalPlan,
+      CONTROLLER
+    )
+    val result = scheduleGenerator.getFullyMaterializedSearchState
+
+    // Should only explore 1 state (fully materialized)
+    assert(result.numStatesExplored == 1)
+
+    // Each physical operator should be in its own region
+    val regions = result.regionDAG.vertexSet().asScala
+    val numPhysicalOps = workflow.physicalPlan.operators.size
+    assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions, 
got ${regions.size}")
+
+    // Each region should contain exactly 1 operator
+    regions.foreach { region =>
+      assert(
+        region.getOperators.size == 1,
+        s"Expected region to have 1 operator, got ${region.getOperators.size}"
+      )
+    }
+
+    // All links should be materialized (represented as region links)
+    val numRegionLinks = result.regionDAG.edgeSet().asScala.size
+    val numPhysicalLinks = workflow.physicalPlan.links.size
+    assert(
+      numRegionLinks == numPhysicalLinks,
+      s"Expected $numPhysicalLinks region links, got $numRegionLinks"
+    )
+  }
+
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index b341c1a8f6..69ee9c6a5f 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -28,7 +28,12 @@ import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.{AttributeType, Tuple}
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.{
+  ExecutionMode,
+  PortIdentity,
+  WorkflowContext,
+  WorkflowSettings
+}
 import org.apache.texera.amber.engine.architecture.controller._
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
@@ -69,6 +74,13 @@ class DataProcessingSpec
 
   val workflowContext: WorkflowContext = new WorkflowContext()
 
+  val materializedWorkflowContext: WorkflowContext = new WorkflowContext(
+    workflowSettings = WorkflowSettings(
+      dataTransferBatchSize = 400,
+      executionMode = ExecutionMode.MATERIALIZED
+    )
+  )
+
   override protected def beforeEach(): Unit = {
     setUpWorkflowExecutionData()
   }
@@ -340,4 +352,135 @@ class DataProcessingSpec
     )
     executeWorkflow(workflow)
   }
+
+  "Engine" should "execute headerlessCsv->keyword workflow with MATERIALIZED 
mode" in {
+    val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
+    val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
+    val workflow = buildWorkflow(
+      List(headerlessCsvOpDesc, keywordOpDesc),
+      List(
+        LogicalLink(
+          headerlessCsvOpDesc.operatorIdentifier,
+          PortIdentity(),
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      materializedWorkflowContext
+    )
+    executeWorkflow(workflow)
+  }
+
+  "Engine" should "execute csv workflow with MATERIALIZED mode" in {
+    val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+    val workflow = buildWorkflow(
+      List(csvOpDesc),
+      List(),
+      materializedWorkflowContext
+    )
+    executeWorkflow(workflow)
+  }
+
+  "Engine" should "execute csv->keyword workflow with MATERIALIZED mode" in {
+    val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+    val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+    val workflow = buildWorkflow(
+      List(csvOpDesc, keywordOpDesc),
+      List(
+        LogicalLink(
+          csvOpDesc.operatorIdentifier,
+          PortIdentity(),
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      materializedWorkflowContext
+    )
+    executeWorkflow(workflow)
+  }
+
+  "Engine" should "execute csv->keyword->count workflow with MATERIALIZED 
mode" in {
+    val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+    val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+    val countOpDesc =
+      TestOperators.aggregateAndGroupByDesc("Region", 
AggregationFunction.COUNT, List[String]())
+    val workflow = buildWorkflow(
+      List(csvOpDesc, keywordOpDesc, countOpDesc),
+      List(
+        LogicalLink(
+          csvOpDesc.operatorIdentifier,
+          PortIdentity(),
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity()
+        ),
+        LogicalLink(
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity(),
+          countOpDesc.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      materializedWorkflowContext
+    )
+    executeWorkflow(workflow)
+  }
+
+  "Engine" should "execute csv->keyword->averageAndGroupBy workflow with 
MATERIALIZED mode" in {
+    val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+    val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+    val averageAndGroupByOpDesc =
+      TestOperators.aggregateAndGroupByDesc(
+        "Units Sold",
+        AggregationFunction.AVERAGE,
+        List[String]("Country")
+      )
+    val workflow = buildWorkflow(
+      List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc),
+      List(
+        LogicalLink(
+          csvOpDesc.operatorIdentifier,
+          PortIdentity(),
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity()
+        ),
+        LogicalLink(
+          keywordOpDesc.operatorIdentifier,
+          PortIdentity(),
+          averageAndGroupByOpDesc.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      materializedWorkflowContext
+    )
+    executeWorkflow(workflow)
+  }
+
+  "Engine" should "execute csv->(csv->)->join workflow with MATERIALIZED mode" 
in {
+    val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
+    val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc()
+    val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
+    val workflow = buildWorkflow(
+      List(
+        headerlessCsvOpDesc1,
+        headerlessCsvOpDesc2,
+        joinOpDesc
+      ),
+      List(
+        LogicalLink(
+          headerlessCsvOpDesc1.operatorIdentifier,
+          PortIdentity(),
+          joinOpDesc.operatorIdentifier,
+          PortIdentity()
+        ),
+        LogicalLink(
+          headerlessCsvOpDesc2.operatorIdentifier,
+          PortIdentity(),
+          joinOpDesc.operatorIdentifier,
+          PortIdentity(1)
+        )
+      ),
+      materializedWorkflowContext
+    )
+    executeWorkflow(workflow)
+  }
 }
diff --git a/common/config/src/main/resources/gui.conf 
b/common/config/src/main/resources/gui.conf
index 8039441b13..d58d94ac7b 100644
--- a/common/config/src/main/resources/gui.conf
+++ b/common/config/src/main/resources/gui.conf
@@ -63,8 +63,11 @@ gui {
     default-data-transfer-batch-size = 400
     default-data-transfer-batch-size = 
${?GUI_WORKFLOW_WORKSPACE_DEFAULT_DATA_TRANSFER_BATCH_SIZE}
 
+    # default execution mode for workflows, can be either MATERIALIZED or 
PIPELINED
+    default-execution-mode = PIPELINED
+    default-execution-mode = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_EXECUTION_MODE}
+
     # whether selecting files from datasets instead of the local file system.
-    # The user system must be enabled to make this flag work!
     selecting-files-from-datasets-enabled = true
     selecting-files-from-datasets-enabled = 
${?GUI_WORKFLOW_WORKSPACE_SELECTING_FILES_FROM_DATASETS_ENABLED}
 
diff --git 
a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala 
b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala
index 14016f4374..adc789c984 100644
--- a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala
+++ b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala
@@ -47,6 +47,8 @@ object GuiConfig {
     conf.getBoolean("gui.workflow-workspace.auto-attribute-correction-enabled")
   val guiWorkflowWorkspaceDefaultDataTransferBatchSize: Int =
     conf.getInt("gui.workflow-workspace.default-data-transfer-batch-size")
+  val guiWorkflowWorkspaceDefaultExecutionMode: String =
+    conf.getString("gui.workflow-workspace.default-execution-mode")
   val guiWorkflowWorkspaceSelectingFilesFromDatasetsEnabled: Boolean =
     
conf.getBoolean("gui.workflow-workspace.selecting-files-from-datasets-enabled")
   val guiWorkflowWorkspaceWorkflowExecutionsTrackingEnabled: Boolean =
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java
similarity index 80%
copy from 
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
copy to 
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java
index 88ebcb068f..c02690e4e5 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java
@@ -17,9 +17,11 @@
  * under the License.
  */
 
-package org.apache.texera.amber.core.workflow
+package org.apache.texera.amber.core.workflow;
 
-case class WorkflowSettings(
-    dataTransferBatchSize: Int,
-    outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty
-)
+public enum ExecutionMode {
+    PIPELINED,
+    MATERIALIZED;
+
+    public static ExecutionMode fromString(String value) { return 
valueOf(value); }
+}
\ No newline at end of file
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
index ee7659d0ca..bf2911a4e5 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
@@ -29,9 +29,7 @@ import org.apache.texera.amber.core.workflow.WorkflowContext.{
 object WorkflowContext {
   val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L)
   val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L)
-  val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings(
-    dataTransferBatchSize = 400 // TODO: make this configurable
-  )
+  val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings()
 }
 class WorkflowContext(
     var workflowId: WorkflowIdentity = DEFAULT_WORKFLOW_ID,
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
index 88ebcb068f..c4a86d3538 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
@@ -19,7 +19,11 @@
 
 package org.apache.texera.amber.core.workflow
 
+import org.apache.texera.config.GuiConfig
+
 case class WorkflowSettings(
-    dataTransferBatchSize: Int,
+    dataTransferBatchSize: Int = 400,
+    executionMode: ExecutionMode =
+      
ExecutionMode.valueOf(GuiConfig.guiWorkflowWorkspaceDefaultExecutionMode),
     outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty
 )
diff --git 
a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala
 
b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala
index 30c657746f..b7517d81eb 100644
--- 
a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala
+++ 
b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala
@@ -46,6 +46,7 @@ class ConfigResource {
       "timetravelEnabled" -> GuiConfig.guiWorkflowWorkspaceTimetravelEnabled,
       "productionSharedEditingServer" -> 
GuiConfig.guiWorkflowWorkspaceProductionSharedEditingServer,
       "defaultDataTransferBatchSize" -> 
GuiConfig.guiWorkflowWorkspaceDefaultDataTransferBatchSize,
+      "defaultExecutionMode" -> 
GuiConfig.guiWorkflowWorkspaceDefaultExecutionMode,
       "workflowEmailNotificationEnabled" -> 
GuiConfig.guiWorkflowWorkspaceWorkflowEmailNotificationEnabled,
       "sharingComputingUnitEnabled" -> 
ComputingUnitConfig.sharingComputingUnitEnabled,
       "operatorConsoleMessageBufferSize" -> 
GuiConfig.guiWorkflowWorkspaceOperatorConsoleMessageBufferSize,
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index 73feecfdba..9ddf4bbcc2 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -181,6 +181,7 @@ import { AdminSettingsComponent } from 
"./dashboard/component/admin/settings/adm
 import { FormlyRepeatDndComponent } from 
"./common/formly/repeat-dnd/repeat-dnd.component";
 import { NzInputNumberModule } from "ng-zorro-antd/input-number";
 import { NzCheckboxModule } from "ng-zorro-antd/checkbox";
+import { NzRadioModule } from "ng-zorro-antd/radio";
 
 registerLocaleData(en);
 
@@ -344,6 +345,7 @@ registerLocaleData(en);
     NzProgressModule,
     NzInputNumberModule,
     NzCheckboxModule,
+    NzRadioModule,
   ],
   providers: [
     provideNzI18n(en_US),
diff --git a/frontend/src/app/common/service/gui-config.service.mock.ts 
b/frontend/src/app/common/service/gui-config.service.mock.ts
index daa8adfd22..179259c5a9 100644
--- a/frontend/src/app/common/service/gui-config.service.mock.ts
+++ b/frontend/src/app/common/service/gui-config.service.mock.ts
@@ -20,6 +20,7 @@
 import { Injectable } from "@angular/core";
 import { Observable, of } from "rxjs";
 import { GuiConfig } from "../type/gui-config";
+import { ExecutionMode } from "../type/workflow";
 
 /**
  * Mock GuiConfigService for testing purposes.
@@ -42,6 +43,7 @@ export class MockGuiConfigService {
     productionSharedEditingServer: false,
     pythonLanguageServerPort: "3000",
     defaultDataTransferBatchSize: 100,
+    defaultExecutionMode: ExecutionMode.PIPELINED,
     workflowEmailNotificationEnabled: false,
     sharingComputingUnitEnabled: false,
     operatorConsoleMessageBufferSize: 1000,
diff --git a/frontend/src/app/common/type/gui-config.ts 
b/frontend/src/app/common/type/gui-config.ts
index b47dfa0ab1..d8786c1dc0 100644
--- a/frontend/src/app/common/type/gui-config.ts
+++ b/frontend/src/app/common/type/gui-config.ts
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+import { ExecutionMode } from "./workflow";
 
 // Please refer to core/config/src/main/resources/gui.conf for the definition 
of each config item
 export interface GuiConfig {
@@ -33,6 +34,7 @@ export interface GuiConfig {
   productionSharedEditingServer: boolean;
   pythonLanguageServerPort: string;
   defaultDataTransferBatchSize: number;
+  defaultExecutionMode: ExecutionMode;
   workflowEmailNotificationEnabled: boolean;
   sharingComputingUnitEnabled: boolean;
   operatorConsoleMessageBufferSize: number;
diff --git a/frontend/src/app/common/type/workflow.ts 
b/frontend/src/app/common/type/workflow.ts
index 6792df9d7d..8e1c1c7e85 100644
--- a/frontend/src/app/common/type/workflow.ts
+++ b/frontend/src/app/common/type/workflow.ts
@@ -20,8 +20,14 @@
 import { WorkflowMetadata } from 
"../../dashboard/type/workflow-metadata.interface";
 import { CommentBox, OperatorLink, OperatorPredicate, Point } from 
"../../workspace/types/workflow-common.interface";
 
+export enum ExecutionMode {
+  PIPELINED = "PIPELINED",
+  MATERIALIZED = "MATERIALIZED",
+}
+
 export interface WorkflowSettings {
   dataTransferBatchSize: number;
+  executionMode: ExecutionMode;
 }
 
 /**
diff --git 
a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts 
b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts
index e87b5acd22..09a158e336 100644
--- a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts
+++ b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts
@@ -19,7 +19,7 @@
 
 //All times in test Workflows are in PST because our local machine's timezone 
is PST
 
-import { Workflow, WorkflowContent } from "../../common/type/workflow";
+import { ExecutionMode, Workflow, WorkflowContent } from 
"../../common/type/workflow";
 import { DashboardEntry } from "../type/dashboard-entry";
 import { DashboardProject } from "../type/dashboard-project.interface";
 
@@ -39,7 +39,7 @@ export const testWorkflowContent = (operatorTypes: string[]): 
WorkflowContent =>
   commentBoxes: [],
   links: [],
   operatorPositions: {},
-  settings: { dataTransferBatchSize: 400 },
+  settings: { dataTransferBatchSize: 400, executionMode: 
ExecutionMode.PIPELINED },
 });
 
 export const testWorkflow1: Workflow = {
diff --git 
a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
 
b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
index 4903e70bcb..75f61a223c 100644
--- 
a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
+++ 
b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
@@ -31,7 +31,7 @@ import { DashboardEntry, UserInfo } from 
"../../../type/dashboard-entry";
 import { UserService } from "../../../../common/service/user/user.service";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
 import { NotificationService } from 
"../../../../common/service/notification/notification.service";
-import { WorkflowContent } from "../../../../common/type/workflow";
+import { ExecutionMode, WorkflowContent } from 
"../../../../common/type/workflow";
 import { NzUploadFile } from "ng-zorro-antd/upload";
 import * as JSZip from "jszip";
 import { FiltersComponent } from "../filters/filters.component";
@@ -230,7 +230,10 @@ export class UserWorkflowComponent implements 
AfterViewInit {
       commentBoxes: [],
       links: [],
       operatorPositions: {},
-      settings: { dataTransferBatchSize: 
this.config.env.defaultDataTransferBatchSize },
+      settings: {
+        dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize,
+        executionMode: this.config.env.defaultExecutionMode,
+      },
     };
     let localPid = this.pid;
     this.workflowPersistService
diff --git 
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
 
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
index af16bb0dee..ceeea480af 100644
--- 
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
+++ 
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
@@ -16,12 +16,25 @@
  specific language governing permissions and limitations
  under the License.
 -->
-
 <div class="settings-container">
-  <h4>Workflow Settings</h4>
   <form
     [formGroup]="settingsForm"
     class="form-inline">
+    <b>Execution Mode:</b>
+    <nz-radio-group formControlName="executionMode">
+      <label
+        nz-radio
+        [nzValue]="ExecutionMode.PIPELINED"
+        >Pipelined</label
+      >
+      <br />
+      <label
+        nz-radio
+        [nzValue]="ExecutionMode.MATERIALIZED"
+        >Materialized</label
+      >
+    </nz-radio-group>
+    <br />
     <div class="form-group">
       <label for="dataTransferBatchSize">Data Transfer Batch Size:</label>
       <input
diff --git 
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss
 
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss
index 3dfd30e046..16a468e92d 100644
--- 
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss
+++ 
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss
@@ -21,10 +21,6 @@
   padding: 10px;
 }
 
-h4 {
-  margin-bottom: 15px;
-}
-
 .form-inline {
   display: flex;
   flex-direction: column;
diff --git 
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
 
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
index ead28857ff..d7ab644a89 100644
--- 
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
+++ 
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
@@ -24,7 +24,7 @@ import { WorkflowActionService } from 
"../../../service/workflow-graph/model/wor
 import { WorkflowPersistService } from 
"src/app/common/service/workflow-persist/workflow-persist.service";
 import { UserService } from "../../../../common/service/user/user.service";
 import { NotificationService } from 
"src/app/common/service/notification/notification.service";
-import { GuiConfigService } from 
"../../../../common/service/gui-config.service";
+import { ExecutionMode } from "../../../../common/type/workflow";
 
 @UntilDestroy()
 @Component({
@@ -33,43 +33,50 @@ import { GuiConfigService } from 
"../../../../common/service/gui-config.service"
   styleUrls: ["./settings.component.scss"],
 })
 export class SettingsComponent implements OnInit {
-  settingsForm!: FormGroup;
-  currentDataTransferBatchSize!: number;
-  isSaving: boolean = false;
+  settingsForm: FormGroup;
 
   constructor(
     private fb: FormBuilder,
     private workflowActionService: WorkflowActionService,
     private workflowPersistService: WorkflowPersistService,
     private userService: UserService,
-    private notificationService: NotificationService,
-    private config: GuiConfigService
-  ) {}
-
-  ngOnInit(): void {
-    this.currentDataTransferBatchSize =
-      
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize 
||
-      this.config.env.defaultDataTransferBatchSize;
-
+    private notificationService: NotificationService
+  ) {
     this.settingsForm = this.fb.group({
-      dataTransferBatchSize: [this.currentDataTransferBatchSize, 
[Validators.required, Validators.min(1)]],
+      dataTransferBatchSize: [
+        
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize,
+        [Validators.required, Validators.min(1)],
+      ],
+      executionMode: 
[this.workflowActionService.getWorkflowContent().settings.executionMode],
     });
+  }
 
-    this.settingsForm.valueChanges.pipe(untilDestroyed(this)).subscribe(value 
=> {
-      if (this.settingsForm.valid) {
-        this.confirmUpdateDataTransferBatchSize(value.dataTransferBatchSize);
-      }
-    });
+  ngOnInit(): void {
+    this.settingsForm
+      .get("dataTransferBatchSize")!
+      .valueChanges.pipe(untilDestroyed(this))
+      .subscribe((batchSize: number) => {
+        if (this.settingsForm.get("dataTransferBatchSize")!.valid) {
+          this.confirmUpdateDataTransferBatchSize(batchSize);
+        }
+      });
+
+    this.settingsForm
+      .get("executionMode")!
+      .valueChanges.pipe(untilDestroyed(this))
+      .subscribe((mode: ExecutionMode) => {
+        this.updateExecutionMode(mode);
+      });
 
     this.workflowActionService
       .workflowChanged()
       .pipe(untilDestroyed(this))
       .subscribe(() => {
-        this.currentDataTransferBatchSize =
-          
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize 
||
-          this.config.env.defaultDataTransferBatchSize;
         this.settingsForm.patchValue(
-          { dataTransferBatchSize: this.currentDataTransferBatchSize },
+          {
+            dataTransferBatchSize: 
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize,
+            executionMode: 
this.workflowActionService.getWorkflowContent().settings.executionMode,
+          },
           { emitEvent: false }
         );
       });
@@ -85,13 +92,18 @@ export class SettingsComponent implements OnInit {
   }
 
   public persistWorkflow(): void {
-    this.isSaving = true;
     this.workflowPersistService
       .persistWorkflow(this.workflowActionService.getWorkflow())
       .pipe(untilDestroyed(this))
       .subscribe({
         error: (e: unknown) => this.notificationService.error((e as 
Error).message),
-      })
-      .add(() => (this.isSaving = false));
+      });
   }
+
+  public updateExecutionMode(mode: ExecutionMode) {
+    this.workflowActionService.updateExecutionMode(mode);
+    this.persistWorkflow();
+  }
+
+  protected readonly ExecutionMode = ExecutionMode;
 }
diff --git 
a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
 
b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
index ddee22465a..32a5e3a2db 100644
--- 
a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
+++ 
b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
@@ -21,7 +21,7 @@ import { Injectable } from "@angular/core";
 
 import * as joint from "jointjs";
 import { BehaviorSubject, merge, Observable, Subject } from "rxjs";
-import { Workflow, WorkflowContent, WorkflowSettings } from 
"../../../../common/type/workflow";
+import { ExecutionMode, Workflow, WorkflowContent, WorkflowSettings } from 
"../../../../common/type/workflow";
 import { WorkflowMetadata } from 
"../../../../dashboard/type/workflow-metadata.interface";
 import {
   Comment,
@@ -127,6 +127,7 @@ export class WorkflowActionService {
   private getDefaultSettings(): WorkflowSettings {
     return {
       dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize,
+      executionMode: this.config.env.defaultExecutionMode,
     };
   }
 
@@ -807,6 +808,10 @@ export class WorkflowActionService {
     }
   }
 
+  public updateExecutionMode(mode: ExecutionMode): void {
+    this.setWorkflowSettings({ ...this.workflowSettings, executionMode: mode 
});
+  }
+
   public clearWorkflow(): void {
     this.destroySharedModel();
     this.setWorkflowMetadata(undefined);


Reply via email to