This is an automated email from the ASF dual-hosted git repository.
xiaozhenliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 84f696e1ed refactor(config): remove user-sys enabled flag (#3831)
84f696e1ed is described below
commit 84f696e1ed5319a968e5a52d81b012f803b11c83
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Thu Oct 9 20:47:55 2025 -0700
refactor(config): remove user-sys enabled flag (#3831)
# Purpose
This PR is a successor of #3782. As the non-user system mode is no
longer used or maintained, we can remove the flag to switch between
user-system being enabled/disabled, and keep only the mode of
user-system being enabled.
# Content
- Removed the `user-sys.enabled` flag, both in the frontend and backend.
- Removed all the if-else statements based on this flag in the codebase.
Only the cases of user system being enabled are kept.
- Removed `ExecutionResourceMapping` in the backend as it is no longer
needed.
- Removed `WorkflowCacheService` in the frontend as it is no longer
needed.
---------
Co-authored-by: Xinyuan Lin <[email protected]>
---
.../uci/ics/texera/web/ComputingUnitMaster.scala | 44 ++--
.../uci/ics/texera/web/TexeraWebApplication.scala | 4 +-
.../edu/uci/ics/texera/web/auth/JwtAuth.scala | 31 +--
.../texera/web/resource/auth/AuthResource.scala | 4 -
.../web/resource/auth/GoogleAuthResource.scala | 2 -
.../user/workflow/WorkflowExecutionsResource.scala | 187 +++++++---------
.../web/service/ExecutionConsoleService.scala | 31 ++-
.../texera/web/service/ExecutionStatsService.scala | 122 +++++------
.../service/ExecutionsMetadataPersistService.scala | 3 -
.../web/service/WorkflowExecutionService.scala | 3 -
.../ics/texera/web/service/WorkflowService.scala | 49 ++---
.../texera/service/resource/ConfigResource.scala | 3 +-
core/config/src/main/resources/user-system.conf | 3 -
.../uci/ics/texera/config/UserSystemConfig.scala | 1 -
core/gui/src/app/app-routing.module.ts | 4 +-
.../app/common/service/gui-config.service.mock.ts | 1 -
.../src/app/common/service/gui-config.service.ts | 6 +-
.../app/common/service/user/auth-guard.service.ts | 2 +-
.../src/app/common/service/user/user.service.ts | 7 +-
core/gui/src/app/common/type/gui-config.ts | 1 -
.../input-autocomplete.component.ts | 2 +-
.../component/left-panel/left-panel.component.ts | 5 +-
.../workspace/component/menu/menu.component.html | 23 +-
.../app/workspace/component/menu/menu.component.ts | 2 +-
.../operator-property-edit-frame.component.ts | 1 -
.../app/workspace/component/workspace.component.ts | 108 +++-------
.../workflow-cache/workflow-cache.service.spec.ts | 234 ---------------------
.../workflow-cache/workflow-cache.service.ts | 41 ----
.../workflow-websocket.service.ts | 4 +-
.../storage/result/ExecutionResourcesMapping.scala | 75 -------
30 files changed, 244 insertions(+), 759 deletions(-)
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
index 7760267311..6de230ddd5 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
@@ -156,30 +156,28 @@ class ComputingUnitMaster extends
io.dropwizard.Application[Configuration] with
new
WebsocketPayloadSizeTuner(ApplicationConfig.maxWorkflowWebsocketRequestPayloadSizeKb)
)
- if (UserSystemConfig.isUserSystemEnabled) {
- val timeToLive: Int = ApplicationConfig.sinkStorageTTLInSecs
- if (ApplicationConfig.cleanupAllExecutionResults) {
- // do one time cleanup of collections that were not closed gracefully
before restart/crash
- // retrieve all executions that were executing before the reboot.
- val allExecutionsBeforeRestart: List[WorkflowExecutions] =
- WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(-1)
- cleanExecutions(
- allExecutionsBeforeRestart,
- statusByte => {
- if (statusByte != maptoStatusCode(COMPLETED)) {
- maptoStatusCode(FAILED) // for incomplete executions, mark them
as failed.
- } else {
- statusByte
- }
+ val timeToLive: Int = ApplicationConfig.sinkStorageTTLInSecs
+ if (ApplicationConfig.cleanupAllExecutionResults) {
+ // do one time cleanup of collections that were not closed gracefully
before restart/crash
+ // retrieve all executions that were executing before the reboot.
+ val allExecutionsBeforeRestart: List[WorkflowExecutions] =
+ WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(-1)
+ cleanExecutions(
+ allExecutionsBeforeRestart,
+ statusByte => {
+ if (statusByte != maptoStatusCode(COMPLETED)) {
+ maptoStatusCode(FAILED) // for incomplete executions, mark them as
failed.
+ } else {
+ statusByte
}
- )
- }
- scheduleRecurringCallThroughActorSystem(
- 2.seconds,
- ApplicationConfig.sinkStorageCleanUpCheckIntervalInSecs.seconds
- ) {
- recurringCheckExpiredResults(timeToLive)
- }
+ }
+ )
+ }
+ scheduleRecurringCallThroughActorSystem(
+ 2.seconds,
+ ApplicationConfig.sinkStorageCleanUpCheckIntervalInSecs.seconds
+ ) {
+ recurringCheckExpiredResults(timeToLive)
}
environment.jersey.register(classOf[WorkflowExecutionsResource])
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala
index a74b40c671..577c4aff4f 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala
@@ -152,8 +152,6 @@ class TexeraWebApplication
environment.jersey.register(classOf[AdminSettingsResource])
environment.jersey.register(classOf[AIAssistantResource])
- if (UserSystemConfig.isUserSystemEnabled) {
- AuthResource.createAdminUser()
- }
+ AuthResource.createAdminUser()
}
}
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala
index 0847102929..3a4488c3fb 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala
@@ -30,26 +30,17 @@ import io.dropwizard.setup.Environment
@Deprecated
object JwtAuth {
def setupJwtAuth(environment: Environment): Unit = {
- if (UserSystemConfig.isUserSystemEnabled) {
- // register JWT Auth layer
- environment.jersey.register(
- new AuthDynamicFeature(
- new JwtAuthFilter.Builder[SessionUser]()
- .setJwtConsumer(jwtConsumer)
- .setRealm("realm")
- .setPrefix("Bearer")
- .setAuthenticator(UserAuthenticator)
- .setAuthorizer(UserRoleAuthorizer)
- .buildAuthFilter()
- )
+ // register JWT Auth layer
+ environment.jersey.register(
+ new AuthDynamicFeature(
+ new JwtAuthFilter.Builder[SessionUser]()
+ .setJwtConsumer(jwtConsumer)
+ .setRealm("realm")
+ .setPrefix("Bearer")
+ .setAuthenticator(UserAuthenticator)
+ .setAuthorizer(UserRoleAuthorizer)
+ .buildAuthFilter()
)
- } else {
- // register Guest Auth layer
- environment.jersey.register(
- new AuthDynamicFeature(
- new
GuestAuthFilter.Builder().setAuthorizer(UserRoleAuthorizer).buildAuthFilter()
- )
- )
- }
+ )
}
}
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala
index ccccdd768a..23e0bb04e8 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala
@@ -89,8 +89,6 @@ class AuthResource {
@POST
@Path("/login")
def login(request: UserLoginRequest): TokenIssueResponse = {
- if (!UserSystemConfig.isUserSystemEnabled)
- throw new NotAcceptableException("User System is disabled on the
backend!")
retrieveUserByUsernameAndPassword(request.username, request.password)
match {
case Some(user) =>
TokenIssueResponse(jwtToken(jwtClaims(user,
TOKEN_EXPIRE_TIME_IN_MINUTES)))
@@ -101,8 +99,6 @@ class AuthResource {
@POST
@Path("/register")
def register(request: UserRegistrationRequest): TokenIssueResponse = {
- if (!UserSystemConfig.isUserSystemEnabled)
- throw new NotAcceptableException("User System is disabled on the
backend!")
val username = request.username
if (username == null) throw new NotAcceptableException("Username cannot be
null.")
if (username.trim.isEmpty) throw new NotAcceptableException("Username
cannot be empty.")
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala
index d49d4d41a5..3bdc0f02eb 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala
@@ -57,8 +57,6 @@ class GoogleAuthResource {
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/login")
def login(credential: String): TokenIssueResponse = {
- if (!UserSystemConfig.isUserSystemEnabled)
- throw new NotAcceptableException("User System is disabled on the
backend!")
val idToken =
new GoogleIdTokenVerifier.Builder(new NetHttpTransport,
GsonFactory.getDefaultInstance)
.setAudience(
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index c3b63a2df8..24433b76cd 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -19,7 +19,6 @@
package edu.uci.ics.texera.web.resource.dashboard.user.workflow
-import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSResourceType,
VFSURIFactory}
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.virtualidentity._
@@ -28,13 +27,12 @@ import
edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, Repla
import edu.uci.ics.amber.engine.common.Utils.{maptoStatusCode,
stringToAggregatedState}
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
+import edu.uci.ics.texera.auth.SessionUser
import edu.uci.ics.texera.dao.SqlServer
+import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.dao.jooq.generated.Tables._
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
-import edu.uci.ics.texera.auth.SessionUser
-import edu.uci.ics.texera.config.UserSystemConfig
-import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.web.model.http.request.result.ResultExportRequest
import
edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
import edu.uci.ics.texera.web.service.{ExecutionsMetadataPersistService,
ResultExportService}
@@ -107,19 +105,15 @@ object WorkflowExecutionsResource {
globalPortId: GlobalPortIdentity,
uri: URI
): Unit = {
- if (UserSystemConfig.isUserSystemEnabled) {
- context
- .insertInto(OPERATOR_PORT_EXECUTIONS)
- .columns(
- OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID,
- OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID,
- OPERATOR_PORT_EXECUTIONS.RESULT_URI
- )
- .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString)
- .execute()
- } else {
- ExecutionResourcesMapping.addResourceUri(eid, uri)
- }
+ context
+ .insertInto(OPERATOR_PORT_EXECUTIONS)
+ .columns(
+ OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID,
+ OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID,
+ OPERATOR_PORT_EXECUTIONS.RESULT_URI
+ )
+ .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString)
+ .execute()
}
def insertOperatorExecutions(
@@ -158,45 +152,37 @@ object WorkflowExecutionsResource {
}
def getResultUrisByExecutionId(eid: ExecutionIdentity): List[URI] = {
- if (UserSystemConfig.isUserSystemEnabled) {
- context
- .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI)
- .from(OPERATOR_PORT_EXECUTIONS)
- .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
- .fetchInto(classOf[String])
- .asScala
- .toList
- .filter(uri => uri != null && uri.nonEmpty)
- .map(URI.create)
- } else {
- ExecutionResourcesMapping.getResourceURIs(eid)
- }
+ context
+ .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI)
+ .from(OPERATOR_PORT_EXECUTIONS)
+ .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+ .fetchInto(classOf[String])
+ .asScala
+ .toList
+ .filter(uri => uri != null && uri.nonEmpty)
+ .map(URI.create)
}
def getConsoleMessagesUriByExecutionId(eid: ExecutionIdentity): List[URI] =
- if (UserSystemConfig.isUserSystemEnabled)
- context
- .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
- .from(OPERATOR_EXECUTIONS)
- .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
- .fetchInto(classOf[String])
- .asScala
- .toList
- .filter(uri => uri != null && uri.nonEmpty)
- .map(URI.create)
- else Nil
+ context
+ .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
+ .from(OPERATOR_EXECUTIONS)
+ .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+ .fetchInto(classOf[String])
+ .asScala
+ .toList
+ .filter(uri => uri != null && uri.nonEmpty)
+ .map(URI.create)
def getRuntimeStatsUriByExecutionId(eid: ExecutionIdentity): Option[URI] =
- if (UserSystemConfig.isUserSystemEnabled)
- Option(
- context
- .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
- .from(WORKFLOW_EXECUTIONS)
- .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
- .fetchOneInto(classOf[String])
- ).filter(_.nonEmpty)
- .map(URI.create)
- else None
+ Option(
+ context
+ .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
+ .from(WORKFLOW_EXECUTIONS)
+ .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
+ .fetchOneInto(classOf[String])
+ ).filter(_.nonEmpty)
+ .map(URI.create)
def getWorkflowExecutions(
wid: Integer,
@@ -239,18 +225,14 @@ object WorkflowExecutionsResource {
}
def deleteConsoleMessageAndExecutionResultUris(eid: ExecutionIdentity): Unit
= {
- if (UserSystemConfig.isUserSystemEnabled) {
- context
- .delete(OPERATOR_PORT_EXECUTIONS)
- .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
- .execute()
- context
- .delete(OPERATOR_EXECUTIONS)
- .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
- .execute()
- } else {
- ExecutionResourcesMapping.removeExecutionResources(eid)
- }
+ context
+ .delete(OPERATOR_PORT_EXECUTIONS)
+ .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+ .execute()
+ context
+ .delete(OPERATOR_EXECUTIONS)
+ .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+ .execute()
}
/**
@@ -316,22 +298,20 @@ object WorkflowExecutionsResource {
* @param eid Execution ID associated with the runtime statistics document.
*/
def updateRuntimeStatsSize(eid: ExecutionIdentity): Unit = {
- if (UserSystemConfig.isUserSystemEnabled) {
- val statsUriOpt = context
- .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
- .from(WORKFLOW_EXECUTIONS)
- .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
- .fetchOptionalInto(classOf[String])
- .map(URI.create)
+ val statsUriOpt = context
+ .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
+ .from(WORKFLOW_EXECUTIONS)
+ .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
+ .fetchOptionalInto(classOf[String])
+ .map(URI.create)
- if (statsUriOpt.isPresent) {
- val size =
DocumentFactory.openDocument(statsUriOpt.get)._1.getTotalFileSize
- context
- .update(WORKFLOW_EXECUTIONS)
- .set(WORKFLOW_EXECUTIONS.RUNTIME_STATS_SIZE,
Integer.valueOf(size.toInt))
- .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
- .execute()
- }
+ if (statsUriOpt.isPresent) {
+ val size =
DocumentFactory.openDocument(statsUriOpt.get)._1.getTotalFileSize
+ context
+ .update(WORKFLOW_EXECUTIONS)
+ .set(WORKFLOW_EXECUTIONS.RUNTIME_STATS_SIZE,
Integer.valueOf(size.toInt))
+ .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
+ .execute()
}
}
@@ -342,24 +322,22 @@ object WorkflowExecutionsResource {
* @param opId Operator ID of the corresponding operator.
*/
def updateConsoleMessageSize(eid: ExecutionIdentity, opId:
OperatorIdentity): Unit = {
- if (UserSystemConfig.isUserSystemEnabled) {
- val uriOpt = context
- .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
- .from(OPERATOR_EXECUTIONS)
+ val uriOpt = context
+ .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
+ .from(OPERATOR_EXECUTIONS)
+ .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+ .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id))
+ .fetchOptionalInto(classOf[String])
+ .map(URI.create)
+
+ if (uriOpt.isPresent) {
+ val size = DocumentFactory.openDocument(uriOpt.get)._1.getTotalFileSize
+ context
+ .update(OPERATOR_EXECUTIONS)
+ .set(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_SIZE,
Integer.valueOf(size.toInt))
.where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
.and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id))
- .fetchOptionalInto(classOf[String])
- .map(URI.create)
-
- if (uriOpt.isPresent) {
- val size = DocumentFactory.openDocument(uriOpt.get)._1.getTotalFileSize
- context
- .update(OPERATOR_EXECUTIONS)
- .set(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_SIZE,
Integer.valueOf(size.toInt))
- .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
- .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id))
- .execute()
- }
+ .execute()
}
}
@@ -368,7 +346,6 @@ object WorkflowExecutionsResource {
* this method finds the URI for a globalPortId that both: 1. matches the
logicalOpId and outputPortId, and
* 2. is an external port. Currently the lookup is O(n), where n is the
number of globalPortIds for this execution.
* TODO: Optimize the lookup once the frontend also has information about
physical operators.
- * TODO: Remove the case of using ExecutionResourceMapping when user system
is permenantly enabled even in dev mode.
*/
def getResultUriByLogicalPortId(
eid: ExecutionIdentity,
@@ -386,18 +363,14 @@ object WorkflowExecutionsResource {
}
val urisOfEid: List[URI] =
- if (UserSystemConfig.isUserSystemEnabled) {
- context
- .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI)
- .from(OPERATOR_PORT_EXECUTIONS)
-
.where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
- .fetchInto(classOf[String])
- .asScala
- .toList
- .map(URI.create)
- } else {
- ExecutionResourcesMapping.getResourceURIs(eid)
- }
+ context
+ .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI)
+ .from(OPERATOR_PORT_EXECUTIONS)
+ .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+ .fetchInto(classOf[String])
+ .asScala
+ .toList
+ .map(URI.create)
urisOfEid.find(isMatchingExternalPortURI)
}
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala
index f69a1c81d3..929689fcf7 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala
@@ -139,8 +139,7 @@ class ExecutionConsoleService(
private val consoleMessageOpIdToWriterMap: mutable.Map[String,
BufferedItemWriter[Tuple]] =
mutable.Map()
- private val consoleWriterThread: Option[ExecutorService] =
-
Option.when(UserSystemConfig.isUserSystemEnabled)(Executors.newSingleThreadExecutor())
+ private val consoleWriterThread: ExecutorService =
Executors.newSingleThreadExecutor()
private def getOrCreateWriter(opId: OperatorIdentity):
BufferedItemWriter[Tuple] = {
consoleMessageOpIdToWriterMap.getOrElseUpdate(
@@ -261,21 +260,19 @@ class ExecutionConsoleService(
consoleMessage: ConsoleMessage
): ExecutionConsoleStore = {
// Write the original full message to the database
- consoleWriterThread.foreach { thread =>
- thread.execute(() => {
- val writer = getOrCreateWriter(OperatorIdentity(opId))
- try {
- val tuple = new Tuple(
- ResultSchema.consoleMessagesSchema,
- Array(consoleMessage.toProtoString)
- )
- writer.putOne(tuple)
- } catch {
- case e: Exception =>
- logger.error(s"Error while writing console message for operator
$opId", e)
- }
- })
- }
+ consoleWriterThread.execute(() => {
+ val writer = getOrCreateWriter(OperatorIdentity(opId))
+ try {
+ val tuple = new Tuple(
+ ResultSchema.consoleMessagesSchema,
+ Array(consoleMessage.toProtoString)
+ )
+ writer.putOne(tuple)
+ } catch {
+ case e: Exception =>
+ logger.error(s"Error while writing console message for operator
$opId", e)
+ }
+ })
// Process the message (truncate if needed) and update store
val truncatedMessage = processConsoleMessage(consoleMessage)
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala
index 6aabe7c4eb..9235fdaabf 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala
@@ -72,30 +72,26 @@ class ExecutionStatsService(
) extends SubscriptionManager
with LazyLogging {
private val (metricsPersistThread, runtimeStatsWriter) = {
- if (UserSystemConfig.isUserSystemEnabled) {
- val thread = Executors.newSingleThreadExecutor()
- val uri = VFSURIFactory.createRuntimeStatisticsURI(
- workflowContext.workflowId,
- workflowContext.executionId
- )
- val writer = DocumentFactory
- .createDocument(uri, ResultSchema.runtimeStatisticsSchema)
- .writer("runtime_statistics")
- .asInstanceOf[BufferedItemWriter[Tuple]]
- WorkflowExecutionsResource.updateRuntimeStatsUri(
- workflowContext.workflowId.id,
- workflowContext.executionId.id,
- uri
- )
- writer.open()
- (Some(thread), Some(writer))
- } else {
- (None, None)
- }
+ val thread = Executors.newSingleThreadExecutor()
+ val uri = VFSURIFactory.createRuntimeStatisticsURI(
+ workflowContext.workflowId,
+ workflowContext.executionId
+ )
+ val writer = DocumentFactory
+ .createDocument(uri, ResultSchema.runtimeStatisticsSchema)
+ .writer("runtime_statistics")
+ .asInstanceOf[BufferedItemWriter[Tuple]]
+ WorkflowExecutionsResource.updateRuntimeStatsUri(
+ workflowContext.workflowId.id,
+ workflowContext.executionId.id,
+ uri
+ )
+ writer.open()
+ (thread, writer)
}
- private var lastPersistedMetrics: Option[Map[String, OperatorMetrics]] =
- Option.when(UserSystemConfig.isUserSystemEnabled)(Map.empty[String,
OperatorMetrics])
+ private var lastPersistedMetrics: Map[String, OperatorMetrics] =
+ Map.empty[String, OperatorMetrics]
registerCallbacks()
@@ -189,12 +185,10 @@ class ExecutionStatsService(
stateStore.statsStore.updateState { statsStore =>
statsStore.withOperatorInfo(evt.operatorMetrics)
}
- metricsPersistThread.foreach { thread =>
- thread.execute(() => {
- storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics))
- lastPersistedMetrics = Some(evt.operatorMetrics)
- })
- }
+ metricsPersistThread.execute(() => {
+ storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics))
+ lastPersistedMetrics = evt.operatorMetrics
+ })
})
)
}
@@ -204,13 +198,11 @@ class ExecutionStatsService(
case ExecutionStateUpdate(state: WorkflowAggregatedState.Recognized)
if Set(COMPLETED, FAILED, KILLED).contains(state) =>
logger.info("Workflow execution terminated. Commit runtime
statistics.")
- runtimeStatsWriter.foreach { writer =>
- try {
- writer.close()
- } catch {
- case e: Exception =>
- logger.error("Failed to close runtime statistics writer", e)
- }
+ try {
+ runtimeStatsWriter.close()
+ } catch {
+ case e: Exception =>
+ logger.error("Failed to close runtime statistics writer", e)
}
case _ =>
}
@@ -225,15 +217,12 @@ class ExecutionStatsService(
OperatorStatistics(Seq.empty, Seq.empty, 0, 0, 0, 0)
)
- // Retrieve the last persisted metrics or default to an empty map
- val lastMetrics = lastPersistedMetrics.getOrElse(Map.empty)
-
// Determine new and old keys
- val newKeys = newMetrics.keySet.diff(lastMetrics.keySet)
- val oldKeys = lastMetrics.keySet.diff(newMetrics.keySet)
+ val newKeys = newMetrics.keySet.diff(lastPersistedMetrics.keySet)
+ val oldKeys = lastPersistedMetrics.keySet.diff(newMetrics.keySet)
// Update last metrics with default metrics for new keys
- val updatedLastMetrics = lastMetrics ++ newKeys.map(_ -> defaultMetrics)
+ val updatedLastMetrics = lastPersistedMetrics ++ newKeys.map(_ ->
defaultMetrics)
// Combine new metrics with old metrics for keys that are no longer present
val completeMetricsMap = newMetrics ++ oldKeys.map(key => key ->
updatedLastMetrics(key))
@@ -258,34 +247,29 @@ class ExecutionStatsService(
private def storeRuntimeStatistics(
operatorStatistics: scala.collection.immutable.Map[String,
OperatorMetrics]
): Unit = {
- runtimeStatsWriter match {
- case Some(writer) =>
- try {
- operatorStatistics.foreach {
- case (operatorId, stat) =>
- val runtimeStats = new Tuple(
- ResultSchema.runtimeStatisticsSchema,
- Array(
- operatorId,
- new java.sql.Timestamp(System.currentTimeMillis()),
-
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum,
-
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum,
-
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum,
-
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum,
- stat.operatorStatistics.dataProcessingTime,
- stat.operatorStatistics.controlProcessingTime,
- stat.operatorStatistics.idleTime,
- stat.operatorStatistics.numWorkers,
- maptoStatusCode(stat.operatorState).toInt
- )
- )
- writer.putOne(runtimeStats)
- }
- } catch {
- case err: Throwable => logger.error("error occurred when storing
runtime statistics", err)
- }
- case None =>
- logger.warn("Runtime statistics writer is not available.")
+ try {
+ operatorStatistics.foreach {
+ case (operatorId, stat) =>
+ val runtimeStats = new Tuple(
+ ResultSchema.runtimeStatisticsSchema,
+ Array(
+ operatorId,
+ new java.sql.Timestamp(System.currentTimeMillis()),
+
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum,
+
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum,
+
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum,
+
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum,
+ stat.operatorStatistics.dataProcessingTime,
+ stat.operatorStatistics.controlProcessingTime,
+ stat.operatorStatistics.idleTime,
+ stat.operatorStatistics.numWorkers,
+ maptoStatusCode(stat.operatorState).toInt
+ )
+ )
+ runtimeStatsWriter.putOne(runtimeStats)
+ }
+ } catch {
+ case err: Throwable => logger.error("error occurred when storing runtime
statistics", err)
}
}
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala
index c1f49a8470..16f7e3cd7a 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala
@@ -57,7 +57,6 @@ object ExecutionsMetadataPersistService extends LazyLogging {
environmentVersion: String,
computingUnitId: Integer
): ExecutionIdentity = {
- if (!UserSystemConfig.isUserSystemEnabled) return DEFAULT_EXECUTION_ID
// first retrieve the latest version of this workflow
val vid = getLatestVersion(workflowId.id.toInt)
val newExecution = new WorkflowExecutions()
@@ -77,7 +76,6 @@ object ExecutionsMetadataPersistService extends LazyLogging {
}
def tryGetExistingExecution(executionId: ExecutionIdentity):
Option[WorkflowExecutions] = {
- if (!UserSystemConfig.isUserSystemEnabled) return None
try {
Some(workflowExecutionsDao.fetchOneByEid(executionId.id.toInt))
} catch {
@@ -90,7 +88,6 @@ object ExecutionsMetadataPersistService extends LazyLogging {
def tryUpdateExistingExecution(
executionId: ExecutionIdentity
)(updateFunc: WorkflowExecutions => Unit): Unit = {
- if (!UserSystemConfig.isUserSystemEnabled) return
try {
val execution = workflowExecutionsDao.fetchOneByEid(executionId.id.toInt)
updateFunc(execution)
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
index b6c8cbb88e..4430180d4b 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
@@ -50,9 +50,6 @@ object WorkflowExecutionService {
workflowId: WorkflowIdentity,
computingUnitId: Int
): Option[ExecutionIdentity] = {
- if (!UserSystemConfig.isUserSystemEnabled) {
- return Some(DEFAULT_EXECUTION_ID)
- }
WorkflowExecutionsResource
.getLatestExecutionID(workflowId.id.toInt, computingUnitId)
.map(eid => new ExecutionIdentity(eid.longValue()))
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala
index 1fee982fde..a5f47e1153 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala
@@ -207,35 +207,32 @@ class WorkflowService(
req.computingUnitId
)
- if (UserSystemConfig.isUserSystemEnabled) {
- // enable only if we have mysql
- if (ApplicationConfig.faultToleranceLogRootFolder.isDefined) {
- val writeLocation =
ApplicationConfig.faultToleranceLogRootFolder.get.resolve(
- s"${workflowContext.workflowId}/${workflowContext.executionId}/"
- )
-
ExecutionsMetadataPersistService.tryUpdateExistingExecution(workflowContext.executionId)
{
- execution => execution.setLogLocation(writeLocation.toString)
- }
- controllerConf = controllerConf.copy(faultToleranceConfOpt =
- Some(FaultToleranceConfig(writeTo = writeLocation))
- )
+ if (ApplicationConfig.faultToleranceLogRootFolder.isDefined) {
+ val writeLocation =
ApplicationConfig.faultToleranceLogRootFolder.get.resolve(
+ s"${workflowContext.workflowId}/${workflowContext.executionId}/"
+ )
+
ExecutionsMetadataPersistService.tryUpdateExistingExecution(workflowContext.executionId)
{
+ execution => execution.setLogLocation(writeLocation.toString)
}
- if (req.replayFromExecution.isDefined) {
- val replayInfo = req.replayFromExecution.get
- ExecutionsMetadataPersistService
- .tryGetExistingExecution(ExecutionIdentity(replayInfo.eid))
- .foreach { execution =>
- val readLocation = new URI(execution.getLogLocation)
- controllerConf = controllerConf.copy(stateRestoreConfOpt =
- Some(
- StateRestoreConfig(
- readFrom = readLocation,
- replayDestination =
EmbeddedControlMessageIdentity(replayInfo.interaction)
- )
+ controllerConf = controllerConf.copy(faultToleranceConfOpt =
+ Some(FaultToleranceConfig(writeTo = writeLocation))
+ )
+ }
+ if (req.replayFromExecution.isDefined) {
+ val replayInfo = req.replayFromExecution.get
+ ExecutionsMetadataPersistService
+ .tryGetExistingExecution(ExecutionIdentity(replayInfo.eid))
+ .foreach { execution =>
+ val readLocation = new URI(execution.getLogLocation)
+ controllerConf = controllerConf.copy(stateRestoreConfOpt =
+ Some(
+ StateRestoreConfig(
+ readFrom = readLocation,
+ replayDestination =
EmbeddedControlMessageIdentity(replayInfo.interaction)
)
)
- }
- }
+ )
+ }
}
val executionStateStore = new ExecutionStateStore()
diff --git
a/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala
b/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala
index 3270e39af1..269001e6a3 100644
---
a/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala
+++
b/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala
@@ -65,7 +65,6 @@ class ConfigResource {
def getUserSystemConfig: Map[String, Any] =
Map(
// flags from the user-system.conf
- "inviteOnly" -> UserSystemConfig.inviteOnly,
- "userSystemEnabled" -> UserSystemConfig.isUserSystemEnabled
+ "inviteOnly" -> UserSystemConfig.inviteOnly
)
}
diff --git a/core/config/src/main/resources/user-system.conf
b/core/config/src/main/resources/user-system.conf
index 72f5e23d23..166ce2fc8f 100644
--- a/core/config/src/main/resources/user-system.conf
+++ b/core/config/src/main/resources/user-system.conf
@@ -17,9 +17,6 @@
# See PR https://github.com/Texera/texera/pull/3326 for configuration
guidelines.
user-sys {
- enabled = true
- enabled = ${?USER_SYS_ENABLED}
-
admin-username = "texera"
admin-username = ${?USER_SYS_ADMIN_USERNAME}
diff --git
a/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala
b/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala
index d64ee7ff4c..eb7b91ad6e 100644
---
a/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala
+++
b/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala
@@ -27,7 +27,6 @@ object UserSystemConfig {
private val logger = Logger.getLogger(getClass.getName)
// User system
- val isUserSystemEnabled: Boolean = conf.getBoolean("user-sys.enabled")
val adminUsername: String = conf.getString("user-sys.admin-username")
val adminPassword: String = conf.getString("user-sys.admin-password")
val googleClientId: String = conf.getString("user-sys.google.clientId")
diff --git a/core/gui/src/app/app-routing.module.ts
b/core/gui/src/app/app-routing.module.ts
index fffe3f8e42..6d7a27868c 100644
--- a/core/gui/src/app/app-routing.module.ts
+++ b/core/gui/src/app/app-routing.module.ts
@@ -49,9 +49,7 @@ const rootRedirectGuard: CanActivateFn = () => {
const config = inject(GuiConfigService);
const router = inject(Router);
try {
- if (config.env.userSystemEnabled) {
- return router.parseUrl(DASHBOARD_ABOUT);
- }
+ return router.parseUrl(DASHBOARD_ABOUT);
} catch {
// config not loaded yet, swallow the error and let the app handle it
}
diff --git a/core/gui/src/app/common/service/gui-config.service.mock.ts
b/core/gui/src/app/common/service/gui-config.service.mock.ts
index f16a3697f0..392f8447ee 100644
--- a/core/gui/src/app/common/service/gui-config.service.mock.ts
+++ b/core/gui/src/app/common/service/gui-config.service.mock.ts
@@ -30,7 +30,6 @@ export class MockGuiConfigService {
private _config: GuiConfig = {
exportExecutionResultEnabled: false,
autoAttributeCorrectionEnabled: false,
- userSystemEnabled: true,
selectingFilesFromDatasetsEnabled: false,
localLogin: true,
googleLogin: true,
diff --git a/core/gui/src/app/common/service/gui-config.service.ts
b/core/gui/src/app/common/service/gui-config.service.ts
index 65fd2f59af..a2e164efeb 100644
--- a/core/gui/src/app/common/service/gui-config.service.ts
+++ b/core/gui/src/app/common/service/gui-config.service.ts
@@ -31,10 +31,8 @@ export class GuiConfigService {
load(): Observable<GuiConfig> {
// Fetch both GUI config and user system config in parallel
- const guiConfig$ = this.http.get<Omit<GuiConfig, "userSystemEnabled" |
"inviteOnly">>(
- `${AppSettings.getApiEndpoint()}/config/gui`
- );
- const userSystemConfig$ = this.http.get<{ userSystemEnabled: boolean;
inviteOnly: boolean }>(
+ const guiConfig$ = this.http.get<Omit<GuiConfig,
"inviteOnly">>(`${AppSettings.getApiEndpoint()}/config/gui`);
+ const userSystemConfig$ = this.http.get<{ inviteOnly: boolean }>(
`${AppSettings.getApiEndpoint()}/config/user-system`
);
diff --git a/core/gui/src/app/common/service/user/auth-guard.service.ts
b/core/gui/src/app/common/service/user/auth-guard.service.ts
index ba6ef4dd9a..c013735417 100644
--- a/core/gui/src/app/common/service/user/auth-guard.service.ts
+++ b/core/gui/src/app/common/service/user/auth-guard.service.ts
@@ -35,7 +35,7 @@ export class AuthGuardService implements CanActivate {
private config: GuiConfigService
) {}
canActivate(route: ActivatedRouteSnapshot, state: RouterStateSnapshot):
boolean {
- if (this.userService.isLogin() || !this.config.env.userSystemEnabled) {
+ if (this.userService.isLogin()) {
return true;
} else {
this.router.navigate([DASHBOARD_ABOUT], { queryParams: { returnUrl:
state.url === "/" ? null : state.url } });
diff --git a/core/gui/src/app/common/service/user/user.service.ts
b/core/gui/src/app/common/service/user/user.service.ts
index 0d42910cc1..88ab020c08 100644
--- a/core/gui/src/app/common/service/user/user.service.ts
+++ b/core/gui/src/app/common/service/user/user.service.ts
@@ -41,12 +41,9 @@ export class UserService {
private authService: AuthService,
private config: GuiConfigService
) {
- if (this.config.env.userSystemEnabled) {
- const user = this.authService.loginWithExistingToken();
- this.changeUser(user);
- }
+ const user = this.authService.loginWithExistingToken();
+ this.changeUser(user);
}
-
public getCurrentUser(): User | undefined {
return this.currentUser;
}
diff --git a/core/gui/src/app/common/type/gui-config.ts
b/core/gui/src/app/common/type/gui-config.ts
index f7c1181b18..c634ebb6fe 100644
--- a/core/gui/src/app/common/type/gui-config.ts
+++ b/core/gui/src/app/common/type/gui-config.ts
@@ -21,7 +21,6 @@
export interface GuiConfig {
exportExecutionResultEnabled: boolean;
autoAttributeCorrectionEnabled: boolean;
- userSystemEnabled: boolean;
selectingFilesFromDatasetsEnabled: boolean;
localLogin: boolean;
googleLogin: boolean;
diff --git
a/core/gui/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
b/core/gui/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
index 0d510624d9..38839f377e 100644
---
a/core/gui/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
+++
b/core/gui/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
@@ -68,7 +68,7 @@ export class InputAutoCompleteComponent extends
FieldType<FieldTypeConfig> {
}
get enableDatasetSource(): boolean {
- return this.config.env.userSystemEnabled &&
this.config.env.selectingFilesFromDatasetsEnabled;
+ return this.config.env.selectingFilesFromDatasetsEnabled;
}
get isFileSelectionEnabled(): boolean {
diff --git
a/core/gui/src/app/workspace/component/left-panel/left-panel.component.ts
b/core/gui/src/app/workspace/component/left-panel/left-panel.component.ts
index 6c8f32c4b9..23cd41c122 100644
--- a/core/gui/src/app/workspace/component/left-panel/left-panel.component.ts
+++ b/core/gui/src/app/workspace/component/left-panel/left-panel.component.ts
@@ -49,7 +49,7 @@ export class LeftPanelComponent implements OnDestroy, OnInit,
AfterViewInit {
items = [
{ component: null, title: "", icon: "", enabled: true },
{ component: OperatorMenuComponent, title: "Operators", icon: "appstore",
enabled: true },
- { component: VersionsListComponent, title: "Versions", icon: "schedule",
enabled: false },
+ { component: VersionsListComponent, title: "Versions", icon: "schedule",
enabled: true },
{
component: SettingsComponent,
title: "Settings",
@@ -93,9 +93,8 @@ export class LeftPanelComponent implements OnDestroy, OnInit,
AfterViewInit {
}
private updateItemsWithConfig(): void {
- this.items[2].enabled = this.config.env.userSystemEnabled; // Versions
this.items[4].enabled = this.config.env.workflowExecutionsTrackingEnabled;
// Execution History
- this.items[5].enabled = this.config.env.userSystemEnabled &&
this.config.env.timetravelEnabled; // Time Travel
+ this.items[5].enabled = this.config.env.timetravelEnabled; // Time Travel
}
ngOnInit(): void {
diff --git a/core/gui/src/app/workspace/component/menu/menu.component.html
b/core/gui/src/app/workspace/component/menu/menu.component.html
index 7abddf5df9..d75d918d43 100644
--- a/core/gui/src/app/workspace/component/menu/menu.component.html
+++ b/core/gui/src/app/workspace/component/menu/menu.component.html
@@ -18,20 +18,8 @@
-->
<div id="menu-container">
- <div
- *ngIf="!this.config.env.userSystemEnabled"
- id="logo">
- <a href="{{this.config.env.userSystemEnabled?'/dashboard/workflow':'/'}}">
- <img
- alt="Texera"
- height="{{this.config.env.userSystemEnabled?75:38}}"
- src="assets/logos/full_logo_small.png" />
- </a>
- </div>
<div id="menu-content">
- <div
- id="menu-user"
- *ngIf="this.config.env.userSystemEnabled">
+ <div id="menu-user">
<div id="metadata">
<button
(click)="closeParticularVersionDisplay()"
@@ -99,7 +87,6 @@
*ngIf="!displayParticularWorkflowVersion">
<a [routerLink]="DASHBOARD_USER_WORKFLOW">
<button
- *ngIf="this.config.env.userSystemEnabled"
nz-button
title="dashboard">
<i
@@ -109,7 +96,6 @@
</a>
<button
(click)="onClickCreateNewWorkflow()"
- *ngIf="this.config.env.userSystemEnabled"
nz-button
title="create new">
<i
@@ -118,7 +104,6 @@
</button>
<button
(click)="persistWorkflow()"
- *ngIf="this.config.env.userSystemEnabled"
[disabled]="!userService.isLogin() || isSaving ||
!isWorkflowModifiable"
nz-button
title="save">
@@ -361,7 +346,7 @@
(click)="onClickRunHandler()"
nz-popover
nzPopoverTitle="Execution Settings"
- [nzPopoverTrigger]="this.config.env.userSystemEnabled?'hover':null"
+ [nzPopoverTrigger]="'hover'"
[nzPopoverContent]="executionSettings"
nzPopoverPlacement="bottom"
[disabled]="runDisable || (!workflowWebsocketService.isConnected &&
computingUnitStatus !== ComputingUnitState.NoComputingUnit) ||
displayParticularWorkflowVersion || selectedComputingUnit?.accessPrivilege !==
Privilege.WRITE"
@@ -374,10 +359,10 @@
<span> {{ runButtonText }}</span>
</button>
<button
- *ngIf="this.config.env.userSystemEnabled &&
this.config.env.timetravelEnabled"
+ *ngIf="this.config.env.timetravelEnabled"
(click)="handleCheckpoint()"
title="take checkpoint"
- [disabled]="!this.config.env.userSystemEnabled || executionState !==
ExecutionState.Paused"
+ [disabled]="executionState !== ExecutionState.Paused"
id="checkpoint-button"
nz-button
nzType="primary">
diff --git a/core/gui/src/app/workspace/component/menu/menu.component.ts
b/core/gui/src/app/workspace/component/menu/menu.component.ts
index bdeea578b3..6f257bffbf 100644
--- a/core/gui/src/app/workspace/component/menu/menu.component.ts
+++ b/core/gui/src/app/workspace/component/menu/menu.component.ts
@@ -730,7 +730,7 @@ export class MenuComponent implements OnInit, OnDestroy {
// Regular workflow execution - already connected
this.executeWorkflowService.executeWorkflowWithEmailNotification(
this.currentExecutionName || "Untitled Execution",
- this.config.env.workflowEmailNotificationEnabled &&
this.config.env.userSystemEnabled
+ this.config.env.workflowEmailNotificationEnabled
);
}
diff --git
a/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
b/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
index 8cc463ff8f..5d457e9050 100644
---
a/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
+++
b/core/gui/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
@@ -461,7 +461,6 @@ export class OperatorPropertyEditFrameComponent implements
OnInit, OnChanges, On
}
// if presetService is ready and operator property allows presets, setup
formly field to display presets
if (
- this.config.env.userSystemEnabled &&
this.config.env.userPresetEnabled &&
mapSource["enable-presets"] !== undefined &&
this.currentOperatorId !== undefined
diff --git a/core/gui/src/app/workspace/component/workspace.component.ts
b/core/gui/src/app/workspace/component/workspace.component.ts
index bf66fe0c04..8958f08df1 100644
--- a/core/gui/src/app/workspace/component/workspace.component.ts
+++ b/core/gui/src/app/workspace/component/workspace.component.ts
@@ -25,7 +25,6 @@ import { WorkflowPersistService } from
"../../common/service/workflow-persist/wo
import { Workflow } from "../../common/type/workflow";
import { OperatorMetadataService } from
"../service/operator-metadata/operator-metadata.service";
import { UndoRedoService } from "../service/undo-redo/undo-redo.service";
-import { WorkflowCacheService } from
"../service/workflow-cache/workflow-cache.service";
import { WorkflowActionService } from
"../service/workflow-graph/model/workflow-action.service";
import { NzMessageService } from "ng-zorro-antd/message";
import { debounceTime, distinctUntilChanged, filter, switchMap, throttleTime }
from "rxjs/operators";
@@ -78,7 +77,6 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
private operatorReuseCacheStatusService: OperatorReuseCacheStatusService,
// end of additional services
private undoRedoService: UndoRedoService,
- private workflowCacheService: WorkflowCacheService,
private workflowPersistService: WorkflowPersistService,
private workflowActionService: WorkflowActionService,
private location: Location,
@@ -111,9 +109,8 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
ngAfterViewInit(): void {
/**
- * On initialization of the workspace, there could be three cases:
+ * On initialization of the workspace, there could be two cases:
*
- * - with userSystem enabled, usually during prod mode:
* 1. Accessed by URL `/`, no workflow is in the URL (Cold Start):
- - A new `WorkflowActionService.DEFAULT_WORKFLOW` is created, which
is an empty workflow with undefined id.
* - After an Auto-persist being triggered by a WorkflowAction event,
it will create a new workflow in the database
@@ -122,29 +119,19 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
* - It will retrieve the workflow from database with the given ID.
Because it has an ID, it will be linked to the database
* - Auto-persist will be triggered upon all workspace events.
*
- * - with userSystem disabled, during dev mode:
- * 1. Accessed by URL `/`, with a workflow cached (refresh manually):
- * - This will trigger the WorkflowCacheService to load the workflow
from cache.
- * - Auto-cache will be triggered upon all workspace events.
- *
- * WorkflowActionService is the single source of the workflow
representation. Both WorkflowCacheService and WorkflowPersistService are
- * reflecting changes from WorkflowActionService.
+ * WorkflowActionService is the single source of the workflow
representation. WorkflowPersistService reflects
+ * changes from WorkflowActionService.
*/
// clear the current workspace, reset as
`WorkflowActionService.DEFAULT_WORKFLOW`
this.workflowActionService.resetAsNewWorkflow();
-
- if (this.config.env.userSystemEnabled) {
- // if a workflow id is present in the route, display loading spinner
immediately while loading
- const widInRoute = this.route.snapshot.params.id;
- if (widInRoute) {
- this.isLoading = true;
- this.workflowActionService.disableWorkflowModification();
- }
-
- this.onWIDChange();
- this.updateViewCount();
+ // if a workflow id is present in the route, display loading spinner
immediately while loading
+ const widInRoute = this.route.snapshot.params.id;
+ if (widInRoute) {
+ this.isLoading = true;
+ this.workflowActionService.disableWorkflowModification();
}
-
+ this.onWIDChange();
+ this.updateViewCount();
this.registerLoadOperatorMetadata();
this.codeEditorService.vc = this.codeEditorViewRef;
}
@@ -160,16 +147,6 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
this.workflowActionService.clearWorkflow();
}
- registerAutoCacheWorkFlow(): void {
- this.workflowActionService
- .workflowChanged()
- .pipe(debounceTime(SAVE_DEBOUNCE_TIME_IN_MS))
- .pipe(untilDestroyed(this))
- .subscribe(() => {
-
this.workflowCacheService.setCacheWorkflow(this.workflowActionService.getWorkflow());
- });
- }
-
registerAutoPersistWorkflow(): void {
// make sure it is only registered once
if (this.autoPersistRegistered) {
@@ -262,58 +239,25 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
.pipe(untilDestroyed(this))
.subscribe(() => {
let wid = this.route.snapshot.params.id;
- if (this.config.env.userSystemEnabled) {
- // load workflow with wid if presented in the URL
- if (wid) {
- // show loading spinner right away while waiting for workflow to
load
- this.isLoading = true;
- // temporarily disable modification to prevent editing an empty
workflow before real data is loaded
- this.workflowActionService.disableWorkflowModification();
- // if wid is present in the url, load it from the backend once the
user info is ready
- this.userService
- .userChanged()
- .pipe(untilDestroyed(this))
- .subscribe(() => {
- this.loadWorkflowWithId(wid);
- });
- } else {
- // no workflow to load; directly register auto persist for
brand-new workflow
- this.registerAutoPersistWorkflow();
- }
+ // load workflow with wid if presented in the URL
+ if (wid) {
+ // show loading spinner right away while waiting for workflow to load
+ this.isLoading = true;
+ // temporarily disable modification to prevent editing an empty
workflow before real data is loaded
+ this.workflowActionService.disableWorkflowModification();
+ // if wid is present in the url, load it from the backend once the
user info is ready
+ this.userService
+ .userChanged()
+ .pipe(untilDestroyed(this))
+ .subscribe(() => {
+ this.loadWorkflowWithId(wid);
+ });
} else {
- // remember URL fragment
- const fragment = this.route.snapshot.fragment;
- // fetch the cached workflow first
- const cachedWorkflow = this.workflowCacheService.getCachedWorkflow();
- // responsible for saving the existing workflow in cache
- this.registerAutoCacheWorkFlow();
- // load the cached workflow
- this.workflowActionService.reloadWorkflow(cachedWorkflow);
- // set the URL fragment to previous value
- // because reloadWorkflow will highlight/unhighlight all elements
- // which will change the URL fragment
- this.router.navigate([], {
- relativeTo: this.route,
- fragment: fragment !== null ? fragment : undefined,
- preserveFragment: false,
- });
- // highlight the operator, comment box, or link in the URL fragment
- if (fragment) {
- if
(this.workflowActionService.getTexeraGraph().hasElementWithID(fragment)) {
- this.workflowActionService.highlightElements(false, fragment);
- } else {
- this.notificationService.error(`Element ${fragment} doesn't
exist`);
- // remove the fragment from the URL
- this.router.navigate([], { relativeTo: this.route });
- }
- }
- // clear stack
- this.undoRedoService.clearUndoStack();
- this.undoRedoService.clearRedoStack();
+ // no workflow to load; directly register auto persist for brand-new
workflow
+ this.registerAutoPersistWorkflow();
}
});
}
-
onWIDChange() {
this.workflowActionService
.workflowMetaDataChanged()
@@ -327,7 +271,6 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
this.writeAccess = !metadata.readonly;
});
}
-
updateViewCount() {
let wid = this.route.snapshot.params.id;
let uid = this.userService.getCurrentUser()?.uid;
@@ -337,7 +280,6 @@ export class WorkspaceComponent implements AfterViewInit,
OnInit, OnDestroy {
.pipe(untilDestroyed(this))
.subscribe();
}
-
public triggerCenter(): void {
this.workflowActionService.getTexeraGraph().triggerCenterEvent();
}
diff --git
a/core/gui/src/app/workspace/service/workflow-cache/workflow-cache.service.spec.ts
b/core/gui/src/app/workspace/service/workflow-cache/workflow-cache.service.spec.ts
deleted file mode 100644
index 332731d9d9..0000000000
---
a/core/gui/src/app/workspace/service/workflow-cache/workflow-cache.service.spec.ts
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// import { inject, TestBed } from '@angular/core/testing';
-
-// import { CacheWorkflowService } from './cache-workflow.service';
-// import {
-// mockPoint,
-// mockResultPredicate,
-// mockScanPredicate,
-// mockScanResultLink
-// } from '../workflow-graph/model/mock-workflow-data';
-// import { WorkflowActionService } from
'../workflow-graph/model/workflow-action.service';
-// import { UndoRedoService } from '../undo-redo/undo-redo.service';
-// import { marbles } from 'rxjs-marbles';
-// import { OperatorLink, OperatorPredicate, Point } from
'../../types/workflow-common.interface';
-// import { OperatorMetadataService } from
'../operator-metadata/operator-metadata.service';
-// import { HttpClient } from '@angular/common/http';
-// import { JointUIService } from '../joint-ui/joint-ui.service';
-// import { StubOperatorMetadataService } from
'../operator-metadata/stub-operator-metadata.service';
-// import { WorkflowUtilService } from
'../workflow-graph/util/workflow-util.service';
-// import { WorkflowInfo } from '../../../common/type/workflow';
-// import { PlainGroup } from '../workflow-graph/model/operator-group';
-
-// describe('CacheWorkFlowService', () => {
-// let autoSaveWorkflowService: CacheWorkflowService;
-// let workflowActionService: WorkflowActionService;
-// beforeEach(() => {
-// TestBed.configureTestingModule({
-// providers: [
-// CacheWorkflowService,
-// WorkflowActionService,
-// UndoRedoService,
-// JointUIService,
-// WorkflowUtilService,
-// {provide: OperatorMetadataService, useClass:
StubOperatorMetadataService},
-// {provide: HttpClient}
-// ]
-// });
-
-// // remove all items in local storage before each test
-// localStorage.clear();
-// autoSaveWorkflowService = TestBed.inject(CacheWorkflowService);
-// workflowActionService = TestBed.inject(WorkflowActionService);
-// });
-
-// it('should be created', inject([CacheWorkflowService], (service:
CacheWorkflowService) => {
-// expect(service).toBeTruthy();
-// }));
-
-// it('should check if the local storage is updated when operator add event
is triggered', marbles((m) => {
-// autoSaveWorkflowService.handleAutoCacheWorkFlow();
-// m.hot('-e-').do(() =>
workflowActionService.addOperator(mockScanPredicate, mockPoint))
-// .delay(100).subscribe(
-// () => {
-// // get items in the storage
-// const savedWorkflowJson = localStorage.getItem('workflow');
-// if (!savedWorkflowJson) {
-// expect(false).toBeTruthy();
-// return;
-// }
-
-// const savedWorkflow: WorkflowInfo = JSON.parse(savedWorkflowJson);
-// expect(savedWorkflow.operators.length).toEqual(1);
-//
expect(savedWorkflow.operators[0].operatorID).toEqual(mockScanPredicate.operatorID);
-// expect(savedWorkflow.operators[0]).toEqual(mockScanPredicate);
-//
expect(savedWorkflow.operatorPositions[mockScanPredicate.operatorID]).toEqual(mockPoint);
-// }
-// );
-// }));
-
-// it('should check if the local storage is updated when operator delete
event is triggered', marbles((m) => {
-// autoSaveWorkflowService.handleAutoCacheWorkFlow();
-// m.hot('-e-').do(() => {
-// workflowActionService.addOperator(mockScanPredicate, mockPoint);
-// workflowActionService.deleteOperator(mockScanPredicate.operatorID);
-// })
-// .delay(100).subscribe(
-// () => {
-// // get items in the storage
-// const savedWorkflowJson = localStorage.getItem('workflow');
-// if (!savedWorkflowJson) {
-// expect(false).toBeTruthy();
-// return;
-// }
-
-// const savedWorkflow: WorkflowInfo = JSON.parse(savedWorkflowJson);
-// expect(savedWorkflow.operators.length).toEqual(0);
-// }
-// );
-// }));
-
-// it('should check if the local storage is updated when link add event is
triggered', marbles((m) => {
-// autoSaveWorkflowService.handleAutoCacheWorkFlow();
-// m.hot('-e-').do(() => {
-// workflowActionService.addOperator(mockScanPredicate, mockPoint);
-// workflowActionService.addOperator(mockResultPredicate, mockPoint);
-// workflowActionService.addLink(mockScanResultLink);
-// })
-// .delay(100).subscribe(
-// () => {
-// // get items in the storage
-// const savedWorkflowJson = localStorage.getItem('workflow');
-// if (!savedWorkflowJson) {
-// expect(false).toBeTruthy();
-// return;
-// }
-
-// const savedWorkflow: WorkflowInfo = JSON.parse(savedWorkflowJson);
-// expect(savedWorkflow.operators.length).toEqual(2);
-// expect(savedWorkflow.links.length).toEqual(1);
-// expect(savedWorkflow.links[0]).toEqual(mockScanResultLink);
-// }
-// );
-// }));
-
-// it('should check if the local storage is updated when link delete event
is triggered', marbles((m) => {
-// autoSaveWorkflowService.handleAutoCacheWorkFlow();
-// m.hot('-e-').do(() => {
-// workflowActionService.addOperator(mockScanPredicate, mockPoint);
-// workflowActionService.addOperator(mockResultPredicate, mockPoint);
-// workflowActionService.addLink(mockScanResultLink);
-// workflowActionService.deleteLink(mockScanResultLink.source,
mockScanResultLink.target);
-// })
-// .delay(100).subscribe(
-// () => {
-// // get items in the storage
-// const savedWorkflowJson = localStorage.getItem('workflow');
-// if (!savedWorkflowJson) {
-// expect(false).toBeTruthy();
-// return;
-// }
-
-// const savedWorkflow: WorkflowInfo = JSON.parse(savedWorkflowJson);
-// expect(savedWorkflow.operators.length).toEqual(2);
-// expect(savedWorkflow.links.length).toEqual(0);
-// }
-// );
-// }));
-
-// it(`should check if the local storage is updated when operator delete
event is triggered when there
-// exists a link on the deleted operator`, marbles((m) => {
-// autoSaveWorkflowService.handleAutoCacheWorkFlow();
-// m.hot('-e-').do(() => {
-// workflowActionService.addOperator(mockScanPredicate, mockPoint);
-// workflowActionService.addOperator(mockResultPredicate, mockPoint);
-// workflowActionService.addLink(mockScanResultLink);
-// workflowActionService.deleteOperator(mockScanPredicate.operatorID);
-// })
-// .delay(100).subscribe(
-// () => {
-// // get items in the storage
-// const savedWorkflowJson = localStorage.getItem('workflow');
-// if (!savedWorkflowJson) {
-// expect(false).toBeTruthy();
-// return;
-// }
-
-// const savedWorkflow: WorkflowInfo = JSON.parse(savedWorkflowJson);
-// expect(savedWorkflow.operators.length).toEqual(1);
-// expect(savedWorkflow.operators[0]).toEqual(mockResultPredicate);
-// expect(savedWorkflow.links.length).toEqual(0);
-// }
-// );
-// }));
-
-// it('should check if the local storage is updated when operator property
change event is triggered', marbles((m) => {
-// autoSaveWorkflowService.handleAutoCacheWorkFlow();
-// const mockProperties = {tableName: 'mockTableName'};
-// m.hot('-e-').do(() => {
-// workflowActionService.addOperator(mockScanPredicate, mockPoint);
-//
workflowActionService.setOperatorProperty(mockScanPredicate.operatorID,
mockProperties);
-// })
-// .delay(100).subscribe(
-// () => {
-// // get items in the storage
-// const savedWorkflowJson = localStorage.getItem('workflow');
-// if (!savedWorkflowJson) {
-// expect(false).toBeTruthy();
-// return;
-// }
-
-// const savedWorkflow: WorkflowInfo = JSON.parse(savedWorkflowJson);
-// expect(savedWorkflow.operators.length).toEqual(1);
-//
expect(savedWorkflow.operators[0].operatorProperties).toEqual(mockProperties);
-// }
-// );
-// }));
-
-// it('should successfully loaded what is stored inside local storage when
"loadWorkflow()" is called ', marbles((m) => {
-// const operatorPositions: { [key: string]: Point } = {};
-// operatorPositions[mockScanPredicate.operatorID] = mockPoint;
-// const operators: OperatorPredicate[] = [];
-// operators.push(mockScanPredicate);
-// const links: OperatorLink[] = [];
-// const groups: PlainGroup[] = [];
-
-// const mockWorkflow: WorkflowInfo = {
-// operators, operatorPositions, links, groups, breakpoints: {}
-// };
-
-// localStorage.setItem('workflow', JSON.stringify(mockWorkflow));
-
-// autoSaveWorkflowService.loadWorkflow();
-
-// const savedWorkflowJson = localStorage.getItem('workflow');
-// if (!savedWorkflowJson) {
-// expect(false).toBeTruthy();
-// return;
-// }
-
-// const savedWorkflow: WorkflowInfo = JSON.parse(savedWorkflowJson);
-
-// expect(savedWorkflow.operators.length).toEqual(1);
-// expect(savedWorkflow.operators[0]).toEqual(mockScanPredicate);
-
-// }));
-// });
diff --git
a/core/gui/src/app/workspace/service/workflow-cache/workflow-cache.service.ts
b/core/gui/src/app/workspace/service/workflow-cache/workflow-cache.service.ts
deleted file mode 100644
index ff751fa858..0000000000
---
a/core/gui/src/app/workspace/service/workflow-cache/workflow-cache.service.ts
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import { Injectable } from "@angular/core";
-import { Workflow } from "../../../common/type/workflow";
-import { localGetObject, localRemoveObject, localSetObject } from
"../../../common/util/storage";
-
-@Injectable({
- providedIn: "root",
-})
-export class WorkflowCacheService {
- private static readonly WORKFLOW_KEY: string = "workflow";
-
- public getCachedWorkflow(): Readonly<Workflow> | undefined {
- return localGetObject<Workflow>(WorkflowCacheService.WORKFLOW_KEY);
- }
-
- public resetCachedWorkflow() {
- localRemoveObject(WorkflowCacheService.WORKFLOW_KEY);
- }
-
- public setCacheWorkflow(workflow: Workflow | undefined): void {
- localSetObject(WorkflowCacheService.WORKFLOW_KEY, workflow);
- }
-}
diff --git
a/core/gui/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
b/core/gui/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
index 201d797f41..90029227dd 100644
---
a/core/gui/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
+++
b/core/gui/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
@@ -108,9 +108,7 @@ export class WorkflowWebsocketService {
"&uid=" +
uId +
(isDefined(cuId) ? `&cuid=${cuId}` : "") +
- (this.config.env.userSystemEnabled && AuthService.getAccessToken() !==
null
- ? "&access-token=" + AuthService.getAccessToken()
- : "");
+ (AuthService.getAccessToken() !== null ? "&access-token=" +
AuthService.getAccessToken() : "");
console.log("websocketUrl", websocketUrl);
this.websocket = webSocket<TexeraWebsocketEvent |
TexeraWebsocketRequest>(websocketUrl);
// setup reconnection logic
diff --git
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala
deleted file mode 100644
index faf7183473..0000000000
---
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package edu.uci.ics.amber.core.storage.result
-
-import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
-
-import java.net.URI
-import scala.collection.mutable
-
-/**
- * ExecutionResourcesMapping is a singleton for keeping track of resources
associated with each execution.
- * It maintains a mapping from execution ID to a list of URIs, which point
to resources like the result storage.
- *
- * Currently, this mapping is only used during the resource clean-up phase.
- *
- * This design has one limitation: the singleton is only accessible on the
master node.
- * While this aligns with the current system design, improvements are needed
in the
- * future to enhance scalability and flexibility.
- *
- * TODO: Move the mappings to an external, distributed, and persistent
location to eliminate the master-node
- * dependency.
- */
-object ExecutionResourcesMapping {
-
- private val executionIdToExecutionResourcesMapping:
mutable.Map[ExecutionIdentity, List[URI]] =
- mutable.Map.empty
-
- /**
- * Get the URIs of given execution Id
- * @param executionIdentity the target execution id
- * @return
- */
- def getResourceURIs(executionIdentity: ExecutionIdentity): List[URI] = {
- executionIdToExecutionResourcesMapping.getOrElseUpdate(executionIdentity,
List())
- }
-
- /**
- * Add the URI to the mapping
- * @param executionIdentity the target execution
- * @param uri the URI of the resource
- */
- def addResourceUri(executionIdentity: ExecutionIdentity, uri: URI): Unit = {
- executionIdToExecutionResourcesMapping.updateWith(executionIdentity) {
- case Some(existingUris) => Some(uri :: existingUris) // Prepend URI to
the existing list
- case None => Some(List(uri)) // Create a new list if key
doesn't exist
- }
- }
-
- /**
- * Remove all resources associated with a given execution ID.
- *
- * @param executionIdentity the target execution ID
- * @return true if the entry was removed, false if it did not exist
- */
- def removeExecutionResources(executionIdentity: ExecutionIdentity): Boolean
= {
- executionIdToExecutionResourcesMapping.remove(executionIdentity).isDefined
- }
-}