This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-remove-storage-mode in repository https://gitbox.apache.org/repos/asf/texera.git
commit b4c897bc08cfb4632a7c0c4b038801c4b753b95a Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Feb 13 14:04:22 2026 -0800 init --- .../websocket/event/WebResultUpdateEvent.scala | 3 +- .../web/service/ExecutionResultService.scala | 5 +- common/config/src/main/resources/storage.conf | 2 - .../amber/config/EnvironmentalVariable.scala | 5 - .../apache/texera/amber/config/StorageConfig.scala | 3 - .../amber/core/storage/DocumentFactory.scala | 102 +++++++++------------ .../result-table-frame.component.ts | 17 +--- .../workflow-result/workflow-result.service.ts | 12 +-- .../workspace/types/execute-workflow.interface.ts | 1 - 9 files changed, 50 insertions(+), 100 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala index 5689eaafbc..7e31be1ca7 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala @@ -23,6 +23,5 @@ import org.apache.texera.web.service.ExecutionResultService.WebResultUpdate case class WebResultUpdateEvent( updates: Map[String, WebResultUpdate], - tableStats: Map[String, Map[String, Map[String, Any]]], - resultStorageMode: String + tableStats: Map[String, Map[String, Map[String, Any]]] ) extends TexeraWebSocketEvent diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala index 8810e9891f..285c836b60 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala @@ -382,7 +382,7 @@ class ExecutionResultService( outputPort.mode == OutputMode.SINGLE_SNAPSHOT } - if (StorageConfig.resultStorageMode == ICEBERG && !hasSingleSnapshot) { + if (!hasSingleSnapshot) { val storageUri = WorkflowExecutionsResource .getResultUriByLogicalPortId( executionId, @@ -408,8 +408,7 @@ class ExecutionResultService( Iterable( WebResultUpdateEvent( buf.toMap, - allTableStats.toMap, - StorageConfig.resultStorageMode.toLowerCase + allTableStats.toMap ) ) }) diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 85a62b77a3..276d1491cd 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -17,8 +17,6 @@ # See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines. storage { - result-storage-mode = iceberg # either mongodb or iceberg, mongodb will be deprecated soon - result-storage-mode = ${?STORAGE_RESULT_MODE} # Configuration for Apache Iceberg, used for storing the workflow results & stats iceberg { diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala index 099e12260d..1adc323305 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala @@ -44,11 +44,6 @@ object EnvironmentalVariable { */ val ENV_USER_JWT_TOKEN = "USER_JWT_TOKEN" - /** - * Variables in storage.conf - */ - val ENV_RESULT_STORAGE_MODE = "STORAGE_RESULT_MODE" - // JDBC val ENV_JDBC_URL = "STORAGE_JDBC_URL" val ENV_JDBC_USERNAME = "STORAGE_JDBC_USERNAME" diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index c5bd330286..3bc1e05a9b 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -28,9 +28,6 @@ object StorageConfig { // Load configuration private val conf: Config = ConfigFactory.parseResources("storage.conf").resolve() - // General storage settings - val resultStorageMode: String = conf.getString("storage.result-storage-mode") - // JDBC specifics val jdbcUrl: String = conf.getString("storage.jdbc.url") val jdbcUrlForTestCases: String = conf.getString("storage.jdbc.url-for-test-cases") diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 4c37c33bb2..15949ef471 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -76,32 +76,25 @@ object DocumentFactory { throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } - StorageConfig.resultStorageMode.toLowerCase match { - case ICEBERG => - val icebergSchema = IcebergUtil.toIcebergSchema(schema) - IcebergUtil.createTable( - IcebergCatalogInstance.getInstance(), - namespace, - storageKey, - icebergSchema, - overrideIfExists = true - ) - val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => - IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) - - new IcebergDocument[Tuple]( - namespace, - storageKey, - icebergSchema, - serde, - deserde - ) - case unsupportedMode => - throw new IllegalArgumentException( - s"Storage mode '$unsupportedMode' is not supported" - ) - } + val icebergSchema = IcebergUtil.toIcebergSchema(schema) + IcebergUtil.createTable( + IcebergCatalogInstance.getInstance(), + namespace, + storageKey, + icebergSchema, + overrideIfExists = true + ) + val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord + val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => + IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) + + new IcebergDocument[Tuple]( + namespace, + storageKey, + icebergSchema, + serde, + deserde + ) case unsupportedScheme => throw new UnsupportedOperationException( s"Unsupported URI scheme: $unsupportedScheme for creating the document" @@ -130,38 +123,31 @@ object DocumentFactory { throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } - StorageConfig.resultStorageMode.toLowerCase match { - case ICEBERG => - val table = IcebergUtil - .loadTableMetadata( - IcebergCatalogInstance.getInstance(), - namespace, - storageKey - ) - .getOrElse( - throw new IllegalArgumentException("No storage is found for the given URI") - ) - - val amberSchema = IcebergUtil.fromIcebergSchema(table.schema()) - val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord - val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => - IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) - - ( - new IcebergDocument[Tuple]( - namespace, - storageKey, - table.schema(), - serde, - deserde - ), - Some(amberSchema) - ) - case mode => - throw new IllegalArgumentException( - s"Storage mode '$mode' is not supported" - ) - } + val table = IcebergUtil + .loadTableMetadata( + IcebergCatalogInstance.getInstance(), + namespace, + storageKey + ) + .getOrElse( + throw new IllegalArgumentException("No storage is found for the given URI") + ) + + val amberSchema = IcebergUtil.fromIcebergSchema(table.schema()) + val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord + val deserde: (IcebergSchema, Record) => Tuple = (schema, record) => + IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema)) + + ( + new IcebergDocument[Tuple]( + namespace, + storageKey, + table.schema(), + serde, + deserde + ), + Some(amberSchema) + ) case unsupportedScheme => throw new UnsupportedOperationException( s"Unsupported URI scheme: $unsupportedScheme for opening the document" diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts index abb6daa882..383c6daa71 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts @@ -29,7 +29,6 @@ import { RowModalComponent } from "../result-panel-modal.component"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { DomSanitizer, SafeHtml } from "@angular/platform-browser"; import { ResultExportationComponent } from "../../result-exportation/result-exportation.component"; -import { SchemaAttribute } from "../../../types/workflow-compiling.interface"; import { WorkflowStatusService } from "../../../service/workflow-status/workflow-status.service"; import { GuiConfigService } from "../../../../common/service/gui-config.service"; @@ -74,8 +73,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { tableStats: Record<string, Record<string, number>> = {}; prevTableStats: Record<string, Record<string, number>> = {}; widthPercent: string = ""; - sinkStorageMode: string = ""; - private schema: ReadonlyArray<SchemaAttribute> = []; isOperatorFinished: boolean = false; constructor( @@ -101,7 +98,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { this.tableStats = paginatedResultService.getStats(); this.prevTableStats = this.tableStats; - this.schema = paginatedResultService.getSchema(); } } } @@ -160,13 +156,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { } }); - this.workflowResultService - .getSinkStorageMode() - .pipe(untilDestroyed(this)) - .subscribe(sinkStorageMode => { - this.sinkStorageMode = sinkStorageMode; - }); - this.resizeService.currentSize.pipe(untilDestroyed(this)).subscribe(size => { this.panelHeight = size.height; this.adjustPageSizeBasedOnPanelSize(size.height); @@ -179,7 +168,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { if (this.operatorId) { const paginatedResultService = this.workflowResultService.getPaginatedResultService(this.operatorId); if (paginatedResultService) { - this.schema = paginatedResultService.getSchema(); } } } @@ -207,8 +195,8 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { compare(field: string, stats: string): SafeHtml { let current = this.tableStats[field][stats]; let previous = this.prevTableStats[field][stats]; - let currentStr = ""; - let previousStr = ""; + let currentStr: string; + let previousStr: string; if (typeof current === "number" && typeof previous === "number") { currentStr = current.toFixed(2); @@ -370,7 +358,6 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { .subscribe(pageData => { if (this.currentPageIndex === pageData.pageIndex) { this.setupResultTable(pageData.table, paginatedResultService.getCurrentTotalNumTuples()); - this.schema = pageData.schema; this.changeDetectorRef.detectChanges(); } }); diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index 96937ccbec..9fd18e0f16 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -29,7 +29,7 @@ import { } from "../../types/execute-workflow.interface"; import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service"; import { PaginatedResultEvent, WorkflowAvailableResultEvent } from "../../types/workflow-websocket.interface"; -import { BehaviorSubject, map, Observable, of, pairwise, ReplaySubject, Subject } from "rxjs"; +import { map, Observable, of, pairwise, ReplaySubject, Subject } from "rxjs"; import { v4 as uuid } from "uuid"; import { IndexableObject } from "../../types/result-table.interface"; import { isDefined } from "../../../common/util/predicate"; @@ -49,13 +49,11 @@ export class WorkflowResultService { private resultUpdateStream = new Subject<Record<string, WebResultUpdate | undefined>>(); private resultTableStats = new ReplaySubject<Record<string, Record<string, Record<string, number>>>>(1); private resultInitiateStream = new Subject<string>(); - private sinkStorageModeSubject = new BehaviorSubject<string>(""); constructor(private wsService: WorkflowWebsocketService) { this.wsService.subscribeToEvent("WebResultUpdateEvent").subscribe(event => { this.handleResultUpdate(event.updates); this.handleTableStatsUpdate(event.tableStats); - this.handleSinkStorageModeUpdate(event.sinkStorageMode); }); this.wsService .subscribeToEvent("WorkflowAvailableResultEvent") @@ -165,14 +163,6 @@ export class WorkflowResultService { this.resultTableStats.next(event); } - private handleSinkStorageModeUpdate(sinkStorageMode: string): void { - this.sinkStorageModeSubject.next(sinkStorageMode); - } - - public getSinkStorageMode(): BehaviorSubject<string> { - return this.sinkStorageModeSubject; - } - private getOrInitPaginatedResultService(operatorID: string): OperatorPaginationResultService { let service = this.getPaginatedResultService(operatorID); if (!service) { diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts b/frontend/src/app/workspace/types/execute-workflow.interface.ts index 23ade23199..1633e4bfdf 100644 --- a/frontend/src/app/workspace/types/execute-workflow.interface.ts +++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts @@ -120,7 +120,6 @@ export interface WorkflowResultUpdateEvent extends Readonly<{ updates: WorkflowResultUpdate; tableStats: WorkflowResultTableStats; - sinkStorageMode: string; }> {} // user-defined type guards to check the type of the result update
