This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 850fd85176 feat: introduce materialized execution mode (#4158)
850fd85176 is described below
commit 850fd8517673626d62808487fa8994be5c1df906
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jan 23 18:15:40 2026 -0800
feat: introduce materialized execution mode (#4158)
### What changes were proposed in this PR?
We are introducing a materialized execution mode alongside the existing
pipelined execution mode.
To support this, a new workflow setting called execution mode has been
added, along with a corresponding backend flag. Materialized and
pipelined execution are treated as the same level and are represented as
an enum in the configuration.
We also introduce a flag named `defaultExecutionMode`, whose value is
set to `pipelined`.
| Execution Mode | Demo |
| ------------- | ------------- |
| Pipelined
|
|
| Materialized |
|
### Any related issues, documentation, discussions?
This PR resolves issue #4157.
### How was this PR tested?
Tested with the existing test cases and test cases added for
MATERIALIZED ExecutionMode:
- Execution Tests (`DataProcessingSpec.scala`): Added 10 test cases
verifying that workflows with various operator combinations (CSV/JSONL
sources, filters, aggregations, joins) execute correctly in MATERIALIZED
mode and produce the same results as PIPELINED mode.
- Region Tests (`CostBasedScheduleGeneratorSpec.scala`): Added 5 test
cases verifying that CostBasedScheduleGenerator correctly creates one
region per operator in MATERIALIZED mode, with assertions confirming:
(1) only 1 state explored, (2) number of regions equals number of
operators, (3) each region contains exactly 1 operator, and (4) all
physical links become materialized region boundaries.
### Was this PR authored or co-authored using generative AI tooling?
No.
---
.../scheduling/CostBasedScheduleGenerator.scala | 36 ++-
.../apache/texera/workflow/WorkflowCompiler.scala | 5 +-
.../CostBasedScheduleGeneratorSpec.scala | 311 ++++++++++++++++++++-
.../amber/engine/e2e/DataProcessingSpec.scala | 145 +++++++++-
common/config/src/main/resources/gui.conf | 5 +-
.../scala/org/apache/texera/config/GuiConfig.scala | 2 +
.../{WorkflowSettings.scala => ExecutionMode.java} | 12 +-
.../amber/core/workflow/WorkflowContext.scala | 4 +-
.../amber/core/workflow/WorkflowSettings.scala | 6 +-
.../texera/service/resource/ConfigResource.scala | 1 +
frontend/src/app/app.module.ts | 2 +
.../app/common/service/gui-config.service.mock.ts | 2 +
frontend/src/app/common/type/gui-config.ts | 2 +
frontend/src/app/common/type/workflow.ts | 6 +
.../component/user-dashboard-test-fixtures.ts | 4 +-
.../user/user-workflow/user-workflow.component.ts | 7 +-
.../left-panel/settings/settings.component.html | 17 +-
.../left-panel/settings/settings.component.scss | 4 -
.../left-panel/settings/settings.component.ts | 64 +++--
.../model/workflow-action.service.ts | 7 +-
20 files changed, 587 insertions(+), 55 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 0b68aad221..401ccddc0a 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -304,10 +304,15 @@ class CostBasedScheduleGenerator(
*/
private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = {
val searchResultFuture: Future[SearchResult] = Future {
- if (ApplicationConfig.useTopDownSearch)
- topDownSearch(globalSearch = ApplicationConfig.useGlobalSearch)
- else
- bottomUpSearch(globalSearch = ApplicationConfig.useGlobalSearch)
+ workflowContext.workflowSettings.executionMode match {
+ case ExecutionMode.MATERIALIZED =>
+ getFullyMaterializedSearchState
+ case ExecutionMode.PIPELINED =>
+ if (ApplicationConfig.useTopDownSearch)
+ topDownSearch(globalSearch = ApplicationConfig.useGlobalSearch)
+ else
+ bottomUpSearch(globalSearch = ApplicationConfig.useGlobalSearch)
+ }
}
val searchResult = Try(
Await.result(searchResultFuture,
ApplicationConfig.searchTimeoutMilliseconds.milliseconds)
@@ -477,6 +482,29 @@ class CostBasedScheduleGenerator(
)
}
+ /** Constructs a baseline fully materialized region plan (one operator per
region) and evaluates its cost. */
+ def getFullyMaterializedSearchState: SearchResult = {
+ val startTime = System.nanoTime()
+
+ val (regionDAG, cost) =
+ tryConnectRegionDAG(physicalPlan.links) match {
+ case Left(dag) => (dag, allocateResourcesAndEvaluateCost(dag))
+ case Right(_) =>
+ (
+ new DirectedAcyclicGraph[Region, RegionLink](classOf[RegionLink]),
+ Double.PositiveInfinity
+ )
+ }
+
+ SearchResult(
+ state = Set.empty,
+ regionDAG = regionDAG,
+ cost = cost,
+ searchTimeNanoSeconds = System.nanoTime() - startTime,
+ numStatesExplored = 1
+ )
+ }
+
/**
* Another direction to perform the search. Depending on the configuration,
either a global search or a greedy search
* will be performed to find an optimal plan. The search starts from a plan
where all edges are materialized, and
diff --git
a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
index 50d07a9819..b93aa3e4db 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
@@ -151,8 +151,9 @@ class WorkflowCompiler(
val (physicalPlan, outputPortsNeedingStorage) =
expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None)
- context.workflowSettings =
- WorkflowSettings(context.workflowSettings.dataTransferBatchSize,
outputPortsNeedingStorage)
+ context.workflowSettings = context.workflowSettings.copy(
+ outputPortsNeedingStorage = outputPortsNeedingStorage
+ )
Workflow(context, logicalPlan, physicalPlan)
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
index 07c1758d9e..7d5227c36b 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
@@ -19,7 +19,12 @@
package org.apache.texera.amber.engine.architecture.scheduling
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.{
+ ExecutionMode,
+ PortIdentity,
+ WorkflowContext,
+ WorkflowSettings
+}
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
import org.apache.texera.amber.engine.e2e.TestUtils.buildWorkflow
import org.apache.texera.amber.operator.TestOperators
@@ -27,6 +32,8 @@ import org.apache.texera.workflow.LogicalLink
import org.scalamock.scalatest.MockFactory
import org.scalatest.flatspec.AnyFlatSpec
+import scala.jdk.CollectionConverters._
+
class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory {
"CostBasedRegionPlanGenerator" should "finish bottom-up search using
different pruning techniques with correct number of states explored in
csv->->filter->join->filter2 workflow" in {
@@ -206,4 +213,306 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec
with MockFactory {
}
+ // MATERIALIZED ExecutionMode tests - each operator should be a separate
region
+ "CostBasedRegionPlanGenerator" should "create separate region for each
operator in MATERIALIZED mode for simple csv workflow" in {
+ val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+ val materializedContext = new WorkflowContext(
+ workflowSettings = WorkflowSettings(
+ dataTransferBatchSize = 400,
+ executionMode = ExecutionMode.MATERIALIZED
+ )
+ )
+ val workflow = buildWorkflow(
+ List(csvOpDesc),
+ List(),
+ materializedContext
+ )
+
+ val scheduleGenerator = new CostBasedScheduleGenerator(
+ workflow.context,
+ workflow.physicalPlan,
+ CONTROLLER
+ )
+ val result = scheduleGenerator.getFullyMaterializedSearchState
+
+ // Should only explore 1 state (fully materialized)
+ assert(result.numStatesExplored == 1)
+
+ // Each physical operator should be in its own region
+ val regions = result.regionDAG.vertexSet().asScala
+ val numPhysicalOps = workflow.physicalPlan.operators.size
+ assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions,
got ${regions.size}")
+
+ // Each region should contain exactly 1 operator
+ regions.foreach { region =>
+ assert(
+ region.getOperators.size == 1,
+ s"Expected region to have 1 operator, got ${region.getOperators.size}"
+ )
+ }
+ }
+
+ "CostBasedRegionPlanGenerator" should "create separate region for each
operator in MATERIALIZED mode for csv->keyword workflow" in {
+ val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+ val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+ val materializedContext = new WorkflowContext(
+ workflowSettings = WorkflowSettings(
+ dataTransferBatchSize = 400,
+ executionMode = ExecutionMode.MATERIALIZED
+ )
+ )
+ val workflow = buildWorkflow(
+ List(csvOpDesc, keywordOpDesc),
+ List(
+ LogicalLink(
+ csvOpDesc.operatorIdentifier,
+ PortIdentity(),
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity()
+ )
+ ),
+ materializedContext
+ )
+
+ val scheduleGenerator = new CostBasedScheduleGenerator(
+ workflow.context,
+ workflow.physicalPlan,
+ CONTROLLER
+ )
+ val result = scheduleGenerator.getFullyMaterializedSearchState
+
+ // Should only explore 1 state (fully materialized)
+ assert(result.numStatesExplored == 1)
+
+ // Each physical operator should be in its own region
+ val regions = result.regionDAG.vertexSet().asScala
+ val numPhysicalOps = workflow.physicalPlan.operators.size
+ assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions,
got ${regions.size}")
+
+ // Each region should contain exactly 1 operator
+ regions.foreach { region =>
+ assert(
+ region.getOperators.size == 1,
+ s"Expected region to have 1 operator, got ${region.getOperators.size}"
+ )
+ }
+
+ // All links should be materialized (represented as region links)
+ val numRegionLinks = result.regionDAG.edgeSet().asScala.size
+ val numPhysicalLinks = workflow.physicalPlan.links.size
+ assert(
+ numRegionLinks == numPhysicalLinks,
+ s"Expected $numPhysicalLinks region links, got $numRegionLinks"
+ )
+ }
+
+ "CostBasedRegionPlanGenerator" should "create separate region for each
operator in MATERIALIZED mode for csv->keyword->count workflow" in {
+ val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+ val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+ val countOpDesc = TestOperators.aggregateAndGroupByDesc(
+ "Region",
+ org.apache.texera.amber.operator.aggregate.AggregationFunction.COUNT,
+ List[String]()
+ )
+ val materializedContext = new WorkflowContext(
+ workflowSettings = WorkflowSettings(
+ dataTransferBatchSize = 400,
+ executionMode = ExecutionMode.MATERIALIZED
+ )
+ )
+ val workflow = buildWorkflow(
+ List(csvOpDesc, keywordOpDesc, countOpDesc),
+ List(
+ LogicalLink(
+ csvOpDesc.operatorIdentifier,
+ PortIdentity(),
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity()
+ ),
+ LogicalLink(
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity(),
+ countOpDesc.operatorIdentifier,
+ PortIdentity()
+ )
+ ),
+ materializedContext
+ )
+
+ val scheduleGenerator = new CostBasedScheduleGenerator(
+ workflow.context,
+ workflow.physicalPlan,
+ CONTROLLER
+ )
+ val result = scheduleGenerator.getFullyMaterializedSearchState
+
+ // Should only explore 1 state (fully materialized)
+ assert(result.numStatesExplored == 1)
+
+ // Each physical operator should be in its own region
+ val regions = result.regionDAG.vertexSet().asScala
+ val numPhysicalOps = workflow.physicalPlan.operators.size
+ assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions,
got ${regions.size}")
+
+ // Each region should contain exactly 1 operator
+ regions.foreach { region =>
+ assert(
+ region.getOperators.size == 1,
+ s"Expected region to have 1 operator, got ${region.getOperators.size}"
+ )
+ }
+
+ // All links should be materialized (represented as region links)
+ val numRegionLinks = result.regionDAG.edgeSet().asScala.size
+ val numPhysicalLinks = workflow.physicalPlan.links.size
+ assert(
+ numRegionLinks == numPhysicalLinks,
+ s"Expected $numPhysicalLinks region links, got $numRegionLinks"
+ )
+ }
+
+ "CostBasedRegionPlanGenerator" should "create separate region for each
operator in MATERIALIZED mode for join workflow" in {
+ val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
+ val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc()
+ val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
+ val materializedContext = new WorkflowContext(
+ workflowSettings = WorkflowSettings(
+ dataTransferBatchSize = 400,
+ executionMode = ExecutionMode.MATERIALIZED
+ )
+ )
+ val workflow = buildWorkflow(
+ List(
+ headerlessCsvOpDesc1,
+ headerlessCsvOpDesc2,
+ joinOpDesc
+ ),
+ List(
+ LogicalLink(
+ headerlessCsvOpDesc1.operatorIdentifier,
+ PortIdentity(),
+ joinOpDesc.operatorIdentifier,
+ PortIdentity()
+ ),
+ LogicalLink(
+ headerlessCsvOpDesc2.operatorIdentifier,
+ PortIdentity(),
+ joinOpDesc.operatorIdentifier,
+ PortIdentity(1)
+ )
+ ),
+ materializedContext
+ )
+
+ val scheduleGenerator = new CostBasedScheduleGenerator(
+ workflow.context,
+ workflow.physicalPlan,
+ CONTROLLER
+ )
+ val result = scheduleGenerator.getFullyMaterializedSearchState
+
+ // Should only explore 1 state (fully materialized)
+ assert(result.numStatesExplored == 1)
+
+ // Each physical operator should be in its own region
+ val regions = result.regionDAG.vertexSet().asScala
+ val numPhysicalOps = workflow.physicalPlan.operators.size
+ assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions,
got ${regions.size}")
+
+ // Each region should contain exactly 1 operator
+ regions.foreach { region =>
+ assert(
+ region.getOperators.size == 1,
+ s"Expected region to have 1 operator, got ${region.getOperators.size}"
+ )
+ }
+
+ // All links should be materialized (represented as region links)
+ val numRegionLinks = result.regionDAG.edgeSet().asScala.size
+ val numPhysicalLinks = workflow.physicalPlan.links.size
+ assert(
+ numRegionLinks == numPhysicalLinks,
+ s"Expected $numPhysicalLinks region links, got $numRegionLinks"
+ )
+ }
+
+ "CostBasedRegionPlanGenerator" should "create separate region for each
operator in MATERIALIZED mode for complex csv->->filter->join->filter2
workflow" in {
+ val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
+ val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
+ val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
+ val keywordOpDesc2 = TestOperators.keywordSearchOpDesc("column-1", "Asia")
+ val materializedContext = new WorkflowContext(
+ workflowSettings = WorkflowSettings(
+ dataTransferBatchSize = 400,
+ executionMode = ExecutionMode.MATERIALIZED
+ )
+ )
+ val workflow = buildWorkflow(
+ List(
+ headerlessCsvOpDesc1,
+ keywordOpDesc,
+ joinOpDesc,
+ keywordOpDesc2
+ ),
+ List(
+ LogicalLink(
+ headerlessCsvOpDesc1.operatorIdentifier,
+ PortIdentity(),
+ joinOpDesc.operatorIdentifier,
+ PortIdentity()
+ ),
+ LogicalLink(
+ headerlessCsvOpDesc1.operatorIdentifier,
+ PortIdentity(),
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity()
+ ),
+ LogicalLink(
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity(),
+ joinOpDesc.operatorIdentifier,
+ PortIdentity(1)
+ ),
+ LogicalLink(
+ joinOpDesc.operatorIdentifier,
+ PortIdentity(),
+ keywordOpDesc2.operatorIdentifier,
+ PortIdentity()
+ )
+ ),
+ materializedContext
+ )
+
+ val scheduleGenerator = new CostBasedScheduleGenerator(
+ workflow.context,
+ workflow.physicalPlan,
+ CONTROLLER
+ )
+ val result = scheduleGenerator.getFullyMaterializedSearchState
+
+ // Should only explore 1 state (fully materialized)
+ assert(result.numStatesExplored == 1)
+
+ // Each physical operator should be in its own region
+ val regions = result.regionDAG.vertexSet().asScala
+ val numPhysicalOps = workflow.physicalPlan.operators.size
+ assert(regions.size == numPhysicalOps, s"Expected $numPhysicalOps regions,
got ${regions.size}")
+
+ // Each region should contain exactly 1 operator
+ regions.foreach { region =>
+ assert(
+ region.getOperators.size == 1,
+ s"Expected region to have 1 operator, got ${region.getOperators.size}"
+ )
+ }
+
+ // All links should be materialized (represented as region links)
+ val numRegionLinks = result.regionDAG.edgeSet().asScala.size
+ val numPhysicalLinks = workflow.physicalPlan.links.size
+ assert(
+ numRegionLinks == numPhysicalLinks,
+ s"Expected $numPhysicalLinks region links, got $numRegionLinks"
+ )
+ }
+
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index b341c1a8f6..69ee9c6a5f 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -28,7 +28,12 @@ import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.{AttributeType, Tuple}
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.core.workflow.{
+ ExecutionMode,
+ PortIdentity,
+ WorkflowContext,
+ WorkflowSettings
+}
import org.apache.texera.amber.engine.architecture.controller._
import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
@@ -69,6 +74,13 @@ class DataProcessingSpec
val workflowContext: WorkflowContext = new WorkflowContext()
+ val materializedWorkflowContext: WorkflowContext = new WorkflowContext(
+ workflowSettings = WorkflowSettings(
+ dataTransferBatchSize = 400,
+ executionMode = ExecutionMode.MATERIALIZED
+ )
+ )
+
override protected def beforeEach(): Unit = {
setUpWorkflowExecutionData()
}
@@ -340,4 +352,135 @@ class DataProcessingSpec
)
executeWorkflow(workflow)
}
+
+ "Engine" should "execute headerlessCsv->keyword workflow with MATERIALIZED
mode" in {
+ val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
+ val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
+ val workflow = buildWorkflow(
+ List(headerlessCsvOpDesc, keywordOpDesc),
+ List(
+ LogicalLink(
+ headerlessCsvOpDesc.operatorIdentifier,
+ PortIdentity(),
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity()
+ )
+ ),
+ materializedWorkflowContext
+ )
+ executeWorkflow(workflow)
+ }
+
+ "Engine" should "execute csv workflow with MATERIALIZED mode" in {
+ val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+ val workflow = buildWorkflow(
+ List(csvOpDesc),
+ List(),
+ materializedWorkflowContext
+ )
+ executeWorkflow(workflow)
+ }
+
+ "Engine" should "execute csv->keyword workflow with MATERIALIZED mode" in {
+ val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+ val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+ val workflow = buildWorkflow(
+ List(csvOpDesc, keywordOpDesc),
+ List(
+ LogicalLink(
+ csvOpDesc.operatorIdentifier,
+ PortIdentity(),
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity()
+ )
+ ),
+ materializedWorkflowContext
+ )
+ executeWorkflow(workflow)
+ }
+
+ "Engine" should "execute csv->keyword->count workflow with MATERIALIZED
mode" in {
+ val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+ val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+ val countOpDesc =
+ TestOperators.aggregateAndGroupByDesc("Region",
AggregationFunction.COUNT, List[String]())
+ val workflow = buildWorkflow(
+ List(csvOpDesc, keywordOpDesc, countOpDesc),
+ List(
+ LogicalLink(
+ csvOpDesc.operatorIdentifier,
+ PortIdentity(),
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity()
+ ),
+ LogicalLink(
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity(),
+ countOpDesc.operatorIdentifier,
+ PortIdentity()
+ )
+ ),
+ materializedWorkflowContext
+ )
+ executeWorkflow(workflow)
+ }
+
+ "Engine" should "execute csv->keyword->averageAndGroupBy workflow with
MATERIALIZED mode" in {
+ val csvOpDesc = TestOperators.smallCsvScanOpDesc()
+ val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
+ val averageAndGroupByOpDesc =
+ TestOperators.aggregateAndGroupByDesc(
+ "Units Sold",
+ AggregationFunction.AVERAGE,
+ List[String]("Country")
+ )
+ val workflow = buildWorkflow(
+ List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc),
+ List(
+ LogicalLink(
+ csvOpDesc.operatorIdentifier,
+ PortIdentity(),
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity()
+ ),
+ LogicalLink(
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity(),
+ averageAndGroupByOpDesc.operatorIdentifier,
+ PortIdentity()
+ )
+ ),
+ materializedWorkflowContext
+ )
+ executeWorkflow(workflow)
+ }
+
+ "Engine" should "execute csv->(csv->)->join workflow with MATERIALIZED mode"
in {
+ val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
+ val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc()
+ val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
+ val workflow = buildWorkflow(
+ List(
+ headerlessCsvOpDesc1,
+ headerlessCsvOpDesc2,
+ joinOpDesc
+ ),
+ List(
+ LogicalLink(
+ headerlessCsvOpDesc1.operatorIdentifier,
+ PortIdentity(),
+ joinOpDesc.operatorIdentifier,
+ PortIdentity()
+ ),
+ LogicalLink(
+ headerlessCsvOpDesc2.operatorIdentifier,
+ PortIdentity(),
+ joinOpDesc.operatorIdentifier,
+ PortIdentity(1)
+ )
+ ),
+ materializedWorkflowContext
+ )
+ executeWorkflow(workflow)
+ }
}
diff --git a/common/config/src/main/resources/gui.conf
b/common/config/src/main/resources/gui.conf
index 8039441b13..d58d94ac7b 100644
--- a/common/config/src/main/resources/gui.conf
+++ b/common/config/src/main/resources/gui.conf
@@ -63,8 +63,11 @@ gui {
default-data-transfer-batch-size = 400
default-data-transfer-batch-size =
${?GUI_WORKFLOW_WORKSPACE_DEFAULT_DATA_TRANSFER_BATCH_SIZE}
+ # default execution mode for workflows, can be either MATERIALIZED or
PIPELINED
+ default-execution-mode = PIPELINED
+ default-execution-mode = ${?GUI_WORKFLOW_WORKSPACE_DEFAULT_EXECUTION_MODE}
+
# whether selecting files from datasets instead of the local file system.
- # The user system must be enabled to make this flag work!
selecting-files-from-datasets-enabled = true
selecting-files-from-datasets-enabled =
${?GUI_WORKFLOW_WORKSPACE_SELECTING_FILES_FROM_DATASETS_ENABLED}
diff --git
a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala
b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala
index 14016f4374..adc789c984 100644
--- a/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala
+++ b/common/config/src/main/scala/org/apache/texera/config/GuiConfig.scala
@@ -47,6 +47,8 @@ object GuiConfig {
conf.getBoolean("gui.workflow-workspace.auto-attribute-correction-enabled")
val guiWorkflowWorkspaceDefaultDataTransferBatchSize: Int =
conf.getInt("gui.workflow-workspace.default-data-transfer-batch-size")
+ val guiWorkflowWorkspaceDefaultExecutionMode: String =
+ conf.getString("gui.workflow-workspace.default-execution-mode")
val guiWorkflowWorkspaceSelectingFilesFromDatasetsEnabled: Boolean =
conf.getBoolean("gui.workflow-workspace.selecting-files-from-datasets-enabled")
val guiWorkflowWorkspaceWorkflowExecutionsTrackingEnabled: Boolean =
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java
similarity index 80%
copy from
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
copy to
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java
index 88ebcb068f..c02690e4e5 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/ExecutionMode.java
@@ -17,9 +17,11 @@
* under the License.
*/
-package org.apache.texera.amber.core.workflow
+package org.apache.texera.amber.core.workflow;
-case class WorkflowSettings(
- dataTransferBatchSize: Int,
- outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty
-)
+public enum ExecutionMode {
+ PIPELINED,
+ MATERIALIZED;
+
+ public static ExecutionMode fromString(String value) { return
valueOf(value); }
+}
\ No newline at end of file
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
index ee7659d0ca..bf2911a4e5 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
@@ -29,9 +29,7 @@ import org.apache.texera.amber.core.workflow.WorkflowContext.{
object WorkflowContext {
val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L)
val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L)
- val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings(
- dataTransferBatchSize = 400 // TODO: make this configurable
- )
+ val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings()
}
class WorkflowContext(
var workflowId: WorkflowIdentity = DEFAULT_WORKFLOW_ID,
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
index 88ebcb068f..c4a86d3538 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
@@ -19,7 +19,11 @@
package org.apache.texera.amber.core.workflow
+import org.apache.texera.config.GuiConfig
+
case class WorkflowSettings(
- dataTransferBatchSize: Int,
+ dataTransferBatchSize: Int = 400,
+ executionMode: ExecutionMode =
+
ExecutionMode.valueOf(GuiConfig.guiWorkflowWorkspaceDefaultExecutionMode),
outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty
)
diff --git
a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala
b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala
index 30c657746f..b7517d81eb 100644
---
a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala
+++
b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala
@@ -46,6 +46,7 @@ class ConfigResource {
"timetravelEnabled" -> GuiConfig.guiWorkflowWorkspaceTimetravelEnabled,
"productionSharedEditingServer" ->
GuiConfig.guiWorkflowWorkspaceProductionSharedEditingServer,
"defaultDataTransferBatchSize" ->
GuiConfig.guiWorkflowWorkspaceDefaultDataTransferBatchSize,
+ "defaultExecutionMode" ->
GuiConfig.guiWorkflowWorkspaceDefaultExecutionMode,
"workflowEmailNotificationEnabled" ->
GuiConfig.guiWorkflowWorkspaceWorkflowEmailNotificationEnabled,
"sharingComputingUnitEnabled" ->
ComputingUnitConfig.sharingComputingUnitEnabled,
"operatorConsoleMessageBufferSize" ->
GuiConfig.guiWorkflowWorkspaceOperatorConsoleMessageBufferSize,
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index 73feecfdba..9ddf4bbcc2 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -181,6 +181,7 @@ import { AdminSettingsComponent } from
"./dashboard/component/admin/settings/adm
import { FormlyRepeatDndComponent } from
"./common/formly/repeat-dnd/repeat-dnd.component";
import { NzInputNumberModule } from "ng-zorro-antd/input-number";
import { NzCheckboxModule } from "ng-zorro-antd/checkbox";
+import { NzRadioModule } from "ng-zorro-antd/radio";
registerLocaleData(en);
@@ -344,6 +345,7 @@ registerLocaleData(en);
NzProgressModule,
NzInputNumberModule,
NzCheckboxModule,
+ NzRadioModule,
],
providers: [
provideNzI18n(en_US),
diff --git a/frontend/src/app/common/service/gui-config.service.mock.ts
b/frontend/src/app/common/service/gui-config.service.mock.ts
index daa8adfd22..179259c5a9 100644
--- a/frontend/src/app/common/service/gui-config.service.mock.ts
+++ b/frontend/src/app/common/service/gui-config.service.mock.ts
@@ -20,6 +20,7 @@
import { Injectable } from "@angular/core";
import { Observable, of } from "rxjs";
import { GuiConfig } from "../type/gui-config";
+import { ExecutionMode } from "../type/workflow";
/**
* Mock GuiConfigService for testing purposes.
@@ -42,6 +43,7 @@ export class MockGuiConfigService {
productionSharedEditingServer: false,
pythonLanguageServerPort: "3000",
defaultDataTransferBatchSize: 100,
+ defaultExecutionMode: ExecutionMode.PIPELINED,
workflowEmailNotificationEnabled: false,
sharingComputingUnitEnabled: false,
operatorConsoleMessageBufferSize: 1000,
diff --git a/frontend/src/app/common/type/gui-config.ts
b/frontend/src/app/common/type/gui-config.ts
index b47dfa0ab1..d8786c1dc0 100644
--- a/frontend/src/app/common/type/gui-config.ts
+++ b/frontend/src/app/common/type/gui-config.ts
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+import { ExecutionMode } from "./workflow";
// Please refer to core/config/src/main/resources/gui.conf for the definition
of each config item
export interface GuiConfig {
@@ -33,6 +34,7 @@ export interface GuiConfig {
productionSharedEditingServer: boolean;
pythonLanguageServerPort: string;
defaultDataTransferBatchSize: number;
+ defaultExecutionMode: ExecutionMode;
workflowEmailNotificationEnabled: boolean;
sharingComputingUnitEnabled: boolean;
operatorConsoleMessageBufferSize: number;
diff --git a/frontend/src/app/common/type/workflow.ts
b/frontend/src/app/common/type/workflow.ts
index 6792df9d7d..8e1c1c7e85 100644
--- a/frontend/src/app/common/type/workflow.ts
+++ b/frontend/src/app/common/type/workflow.ts
@@ -20,8 +20,14 @@
import { WorkflowMetadata } from
"../../dashboard/type/workflow-metadata.interface";
import { CommentBox, OperatorLink, OperatorPredicate, Point } from
"../../workspace/types/workflow-common.interface";
+export enum ExecutionMode {
+ PIPELINED = "PIPELINED",
+ MATERIALIZED = "MATERIALIZED",
+}
+
export interface WorkflowSettings {
dataTransferBatchSize: number;
+ executionMode: ExecutionMode;
}
/**
diff --git
a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts
b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts
index e87b5acd22..09a158e336 100644
--- a/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts
+++ b/frontend/src/app/dashboard/component/user-dashboard-test-fixtures.ts
@@ -19,7 +19,7 @@
//All times in test Workflows are in PST because our local machine's timezone
is PST
-import { Workflow, WorkflowContent } from "../../common/type/workflow";
+import { ExecutionMode, Workflow, WorkflowContent } from
"../../common/type/workflow";
import { DashboardEntry } from "../type/dashboard-entry";
import { DashboardProject } from "../type/dashboard-project.interface";
@@ -39,7 +39,7 @@ export const testWorkflowContent = (operatorTypes: string[]):
WorkflowContent =>
commentBoxes: [],
links: [],
operatorPositions: {},
- settings: { dataTransferBatchSize: 400 },
+ settings: { dataTransferBatchSize: 400, executionMode:
ExecutionMode.PIPELINED },
});
export const testWorkflow1: Workflow = {
diff --git
a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
index 4903e70bcb..75f61a223c 100644
---
a/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
+++
b/frontend/src/app/dashboard/component/user/user-workflow/user-workflow.component.ts
@@ -31,7 +31,7 @@ import { DashboardEntry, UserInfo } from
"../../../type/dashboard-entry";
import { UserService } from "../../../../common/service/user/user.service";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { NotificationService } from
"../../../../common/service/notification/notification.service";
-import { WorkflowContent } from "../../../../common/type/workflow";
+import { ExecutionMode, WorkflowContent } from
"../../../../common/type/workflow";
import { NzUploadFile } from "ng-zorro-antd/upload";
import * as JSZip from "jszip";
import { FiltersComponent } from "../filters/filters.component";
@@ -230,7 +230,10 @@ export class UserWorkflowComponent implements
AfterViewInit {
commentBoxes: [],
links: [],
operatorPositions: {},
- settings: { dataTransferBatchSize:
this.config.env.defaultDataTransferBatchSize },
+ settings: {
+ dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize,
+ executionMode: this.config.env.defaultExecutionMode,
+ },
};
let localPid = this.pid;
this.workflowPersistService
diff --git
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
index af16bb0dee..ceeea480af 100644
---
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
+++
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.html
@@ -16,12 +16,25 @@
specific language governing permissions and limitations
under the License.
-->
-
<div class="settings-container">
- <h4>Workflow Settings</h4>
<form
[formGroup]="settingsForm"
class="form-inline">
+ <b>Execution Mode:</b>
+ <nz-radio-group formControlName="executionMode">
+ <label
+ nz-radio
+ [nzValue]="ExecutionMode.PIPELINED"
+ >Pipelined</label
+ >
+ <br />
+ <label
+ nz-radio
+ [nzValue]="ExecutionMode.MATERIALIZED"
+ >Materialized</label
+ >
+ </nz-radio-group>
+ <br />
<div class="form-group">
<label for="dataTransferBatchSize">Data Transfer Batch Size:</label>
<input
diff --git
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss
index 3dfd30e046..16a468e92d 100644
---
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss
+++
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.scss
@@ -21,10 +21,6 @@
padding: 10px;
}
-h4 {
- margin-bottom: 15px;
-}
-
.form-inline {
display: flex;
flex-direction: column;
diff --git
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
index ead28857ff..d7ab644a89 100644
---
a/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
+++
b/frontend/src/app/workspace/component/left-panel/settings/settings.component.ts
@@ -24,7 +24,7 @@ import { WorkflowActionService } from
"../../../service/workflow-graph/model/wor
import { WorkflowPersistService } from
"src/app/common/service/workflow-persist/workflow-persist.service";
import { UserService } from "../../../../common/service/user/user.service";
import { NotificationService } from
"src/app/common/service/notification/notification.service";
-import { GuiConfigService } from
"../../../../common/service/gui-config.service";
+import { ExecutionMode } from "../../../../common/type/workflow";
@UntilDestroy()
@Component({
@@ -33,43 +33,50 @@ import { GuiConfigService } from
"../../../../common/service/gui-config.service"
styleUrls: ["./settings.component.scss"],
})
export class SettingsComponent implements OnInit {
- settingsForm!: FormGroup;
- currentDataTransferBatchSize!: number;
- isSaving: boolean = false;
+ settingsForm: FormGroup;
constructor(
private fb: FormBuilder,
private workflowActionService: WorkflowActionService,
private workflowPersistService: WorkflowPersistService,
private userService: UserService,
- private notificationService: NotificationService,
- private config: GuiConfigService
- ) {}
-
- ngOnInit(): void {
- this.currentDataTransferBatchSize =
-
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize
||
- this.config.env.defaultDataTransferBatchSize;
-
+ private notificationService: NotificationService
+ ) {
this.settingsForm = this.fb.group({
- dataTransferBatchSize: [this.currentDataTransferBatchSize,
[Validators.required, Validators.min(1)]],
+ dataTransferBatchSize: [
+
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize,
+ [Validators.required, Validators.min(1)],
+ ],
+ executionMode:
[this.workflowActionService.getWorkflowContent().settings.executionMode],
});
+ }
- this.settingsForm.valueChanges.pipe(untilDestroyed(this)).subscribe(value
=> {
- if (this.settingsForm.valid) {
- this.confirmUpdateDataTransferBatchSize(value.dataTransferBatchSize);
- }
- });
+ ngOnInit(): void {
+ this.settingsForm
+ .get("dataTransferBatchSize")!
+ .valueChanges.pipe(untilDestroyed(this))
+ .subscribe((batchSize: number) => {
+ if (this.settingsForm.get("dataTransferBatchSize")!.valid) {
+ this.confirmUpdateDataTransferBatchSize(batchSize);
+ }
+ });
+
+ this.settingsForm
+ .get("executionMode")!
+ .valueChanges.pipe(untilDestroyed(this))
+ .subscribe((mode: ExecutionMode) => {
+ this.updateExecutionMode(mode);
+ });
this.workflowActionService
.workflowChanged()
.pipe(untilDestroyed(this))
.subscribe(() => {
- this.currentDataTransferBatchSize =
-
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize
||
- this.config.env.defaultDataTransferBatchSize;
this.settingsForm.patchValue(
- { dataTransferBatchSize: this.currentDataTransferBatchSize },
+ {
+ dataTransferBatchSize:
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize,
+ executionMode:
this.workflowActionService.getWorkflowContent().settings.executionMode,
+ },
{ emitEvent: false }
);
});
@@ -85,13 +92,18 @@ export class SettingsComponent implements OnInit {
}
public persistWorkflow(): void {
- this.isSaving = true;
this.workflowPersistService
.persistWorkflow(this.workflowActionService.getWorkflow())
.pipe(untilDestroyed(this))
.subscribe({
error: (e: unknown) => this.notificationService.error((e as
Error).message),
- })
- .add(() => (this.isSaving = false));
+ });
}
+
+ public updateExecutionMode(mode: ExecutionMode) {
+ this.workflowActionService.updateExecutionMode(mode);
+ this.persistWorkflow();
+ }
+
+ protected readonly ExecutionMode = ExecutionMode;
}
diff --git
a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
index ddee22465a..32a5e3a2db 100644
---
a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
+++
b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
@@ -21,7 +21,7 @@ import { Injectable } from "@angular/core";
import * as joint from "jointjs";
import { BehaviorSubject, merge, Observable, Subject } from "rxjs";
-import { Workflow, WorkflowContent, WorkflowSettings } from
"../../../../common/type/workflow";
+import { ExecutionMode, Workflow, WorkflowContent, WorkflowSettings } from
"../../../../common/type/workflow";
import { WorkflowMetadata } from
"../../../../dashboard/type/workflow-metadata.interface";
import {
Comment,
@@ -127,6 +127,7 @@ export class WorkflowActionService {
private getDefaultSettings(): WorkflowSettings {
return {
dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize,
+ executionMode: this.config.env.defaultExecutionMode,
};
}
@@ -807,6 +808,10 @@ export class WorkflowActionService {
}
}
+ public updateExecutionMode(mode: ExecutionMode): void {
+ this.setWorkflowSettings({ ...this.workflowSettings, executionMode: mode
});
+ }
+
public clearWorkflow(): void {
this.destroySharedModel();
this.setWorkflowMetadata(undefined);