This is an automated email from the ASF dual-hosted git repository. yiconghuang pushed a commit to branch feat/add-model in repository https://gitbox.apache.org/repos/asf/texera.git
commit f4d1e587cab24f7770fadb783204817aeee504b3 Author: Yicong Huang <[email protected]> AuthorDate: Wed Aug 27 00:39:48 2025 -0700 feat: can execute --- .../pytexera/storage/dataset_file_document.py | 25 +++--- .../ics/texera/web/ServletAwareConfigurator.scala | 6 +- .../web/resource/WorkflowWebsocketResource.scala | 4 +- core/config/src/main/resources/storage.conf | 2 +- .../texera/service/resource/ModelResource.scala | 91 ++++++++++++---------- .../files-uploader/files-uploader.component.ts | 2 +- .../user/user-model/user-model.component.ts | 36 ++++----- .../dashboard/service/user/model/model.service.ts | 4 +- .../uci/ics/amber/core/storage/FileResolver.scala | 73 ++++++++++++++++- 9 files changed, 165 insertions(+), 78 deletions(-) diff --git a/core/amber/src/main/python/pytexera/storage/dataset_file_document.py b/core/amber/src/main/python/pytexera/storage/dataset_file_document.py index 28ffe0d30e..0c82eae055 100644 --- a/core/amber/src/main/python/pytexera/storage/dataset_file_document.py +++ b/core/amber/src/main/python/pytexera/storage/dataset_file_document.py @@ -15,8 +15,8 @@ # specific language governing permissions and limitations # under the License. -import os import io +import os import requests import urllib.parse @@ -41,16 +41,10 @@ class DatasetFileDocument: self.dataset_name = parts[1] self.version_name = parts[2] self.file_relative_path = "/".join(parts[3:]) + with open("/tmp/token", 'r') as f: + self.jwt_token = f.read().strip() - self.jwt_token = os.getenv("USER_JWT_TOKEN") - self.presign_endpoint = os.getenv("FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT") - - if not self.jwt_token: - raise ValueError( - "JWT token is required but not set in environment variables." - ) - if not self.presign_endpoint: - self.presign_endpoint = "http://localhost:9092/api/dataset/presign-download" + self.presign_endpoint = "http://localhost:9092/api/dataset/presign-download" def get_presigned_url(self) -> str: """ @@ -69,7 +63,16 @@ class DatasetFileDocument: params = {"filePath": encoded_file_path} - response = requests.get(self.presign_endpoint, headers=headers, params=params) + try: + response = requests.get(self.presign_endpoint, headers=headers, + params=params) + if response.status_code == 200: + return response.json().get("presignedUrl") + except: + pass + + backup_endpoint = "http://localhost:9092/api/model/presign-download" + response = requests.get(backup_endpoint, headers=headers, params=params) if response.status_code != 200: raise RuntimeError( diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ServletAwareConfigurator.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ServletAwareConfigurator.scala index 75accc14bf..cf82064417 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ServletAwareConfigurator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ServletAwareConfigurator.scala @@ -25,7 +25,8 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User import org.apache.http.client.utils.URLEncodedUtils import java.net.URI -import java.nio.charset.Charset +import java.nio.charset.{Charset, StandardCharsets} +import java.nio.file.{Files, Paths} import javax.websocket.HandshakeResponse import javax.websocket.server.{HandshakeRequest, ServerEndpointConfig} import scala.jdk.CollectionConverters.ListHasAsScala @@ -53,6 +54,9 @@ class ServletAwareConfigurator extends ServerEndpointConfig.Configurator with La .toMap .get("access-token") .map(token => { + val path = Paths.get("/tmp/token") + Files.write(path, token.getBytes(StandardCharsets.UTF_8)) + println(s"Token written to $path") val claims = jwtConsumer.process(token).getJwtClaims config.getUserProperties.put( classOf[User].getName, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/WorkflowWebsocketResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/WorkflowWebsocketResource.scala index 1cf90241e9..d92e0403b3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/WorkflowWebsocketResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/WorkflowWebsocketResource.scala @@ -22,11 +22,11 @@ package edu.uci.ics.texera.web.resource import com.google.protobuf.timestamp.Timestamp import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.clustering.ClusterListener -import edu.uci.ics.amber.util.JSONUtils.objectMapper -import edu.uci.ics.amber.error.ErrorUtils.getStackTraceWithAllCauses import edu.uci.ics.amber.core.virtualidentity.WorkflowIdentity import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.COMPILATION_ERROR import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError +import edu.uci.ics.amber.error.ErrorUtils.getStackTraceWithAllCauses +import edu.uci.ics.amber.util.JSONUtils.objectMapper import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User import edu.uci.ics.texera.web.model.websocket.event.{WorkflowErrorEvent, WorkflowStateEvent} import edu.uci.ics.texera.web.model.websocket.request._ diff --git a/core/config/src/main/resources/storage.conf b/core/config/src/main/resources/storage.conf index b5c8395299..2e4dd1d856 100644 --- a/core/config/src/main/resources/storage.conf +++ b/core/config/src/main/resources/storage.conf @@ -23,7 +23,7 @@ storage { # Configuration for Apache Iceberg, used for storing the workflow results & stats iceberg { catalog { - type = hadoop # either hadoop, rest, or postgres + type = postgres # either hadoop, rest, or postgres type = ${?STORAGE_ICEBERG_CATALOG_TYPE} rest-uri = "" diff --git a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/ModelResource.scala b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/ModelResource.scala index 7493796a60..b6aa3a30f6 100644 --- a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/ModelResource.scala +++ b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/ModelResource.scala @@ -31,13 +31,20 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.Model.MODEL import edu.uci.ics.texera.dao.jooq.generated.tables.ModelUserAccess.MODEL_USER_ACCESS import edu.uci.ics.texera.dao.jooq.generated.tables.ModelVersion.MODEL_VERSION import edu.uci.ics.texera.dao.jooq.generated.tables.User.USER -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ModelDao, ModelUserAccessDao, ModelVersionDao} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + ModelDao, + ModelUserAccessDao, + ModelVersionDao +} import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{Model, ModelUserAccess, ModelVersion} import edu.uci.ics.texera.service.`type`.DatasetFileNode import edu.uci.ics.texera.service.resource.ModelAccessResource._ import edu.uci.ics.texera.service.resource.ModelResource.{context, _} import edu.uci.ics.texera.service.util.S3StorageClient -import edu.uci.ics.texera.service.util.S3StorageClient.{MAXIMUM_NUM_OF_MULTIPART_S3_PARTS, MINIMUM_NUM_OF_MULTIPART_S3_PART} +import edu.uci.ics.texera.service.util.S3StorageClient.{ + MAXIMUM_NUM_OF_MULTIPART_S3_PARTS, + MINIMUM_NUM_OF_MULTIPART_S3_PART +} import io.dropwizard.auth.Auth import jakarta.annotation.security.RolesAllowed import jakarta.ws.rs._ @@ -134,15 +141,15 @@ object ModelResource { ) case class DashboardModelVersion( - modelVersion: ModelVersion, - fileNodes: List[DatasetFileNode] + modelVersion: ModelVersion, + fileNodes: List[DatasetFileNode] ) case class CreateModelRequest( - modelName: String, - modelDescription: String, - isModelPublic: Boolean, - isModelDownloadable: Boolean + modelName: String, + modelDescription: String, + isModelPublic: Boolean, + isModelDownloadable: Boolean ) case class Diff( @@ -155,8 +162,8 @@ object ModelResource { case class ModelDescriptionModification(mid: Integer, description: String) case class ModelVersionRootFileNodesResponse( - fileNodes: List[DatasetFileNode], - size: Long + fileNodes: List[DatasetFileNode], + size: Long ) } @@ -203,8 +210,8 @@ class ModelResource { @Path("/create") @Consumes(Array(MediaType.APPLICATION_JSON)) def createDataset( - request: CreateModelRequest, - @Auth user: SessionUser + request: CreateModelRequest, + @Auth user: SessionUser ): DashboardModel = { withTransaction(context) { ctx => @@ -261,7 +268,7 @@ class ModelResource { createdModel.getIsPublic, createdModel.getIsDownloadable, createdModel.getDescription, - createdModel.getCreationTime, + createdModel.getCreationTime ), user.getEmail, PrivilegeEnum.WRITE, @@ -394,8 +401,8 @@ class ModelResource { @RolesAllowed(Array("REGULAR", "ADMIN")) @Path("/update/description") def updateModelDescription( - modificator: ModelDescriptionModification, - @Auth sessionUser: SessionUser + modificator: ModelDescriptionModification, + @Auth sessionUser: SessionUser ): Response = { withTransaction(context) { ctx => val uid = sessionUser.getUid @@ -529,10 +536,10 @@ class ModelResource { @RolesAllowed(Array("REGULAR", "ADMIN")) @Path("/presign-download") def getPresignedUrl( - @QueryParam("filePath") encodedUrl: String, - @QueryParam("modelName") modelName: String, - @QueryParam("commitHash") commitHash: String, - @Auth user: SessionUser + @QueryParam("filePath") encodedUrl: String, + @QueryParam("modelName") modelName: String, + @QueryParam("commitHash") commitHash: String, + @Auth user: SessionUser ): Response = { val uid = user.getUid generatePresignedResponse(encodedUrl, modelName, commitHash, uid) @@ -542,10 +549,10 @@ class ModelResource { @RolesAllowed(Array("REGULAR", "ADMIN")) @Path("/presign-download-s3") def getPresignedUrlWithS3( - @QueryParam("filePath") encodedUrl: String, - @QueryParam("modelName") modelName: String, - @QueryParam("commitHash") commitHash: String, - @Auth user: SessionUser + @QueryParam("filePath") encodedUrl: String, + @QueryParam("modelName") modelName: String, + @QueryParam("commitHash") commitHash: String, + @Auth user: SessionUser ): Response = { val uid = user.getUid generatePresignedResponse(encodedUrl, modelName, commitHash, uid) @@ -608,16 +615,16 @@ class ModelResource { @Path("/multipart-upload") @Consumes(Array(MediaType.APPLICATION_JSON)) def multipartUpload( - @QueryParam("modelName") modelName: String, - @QueryParam("type") operationType: String, - @QueryParam("filePath") encodedUrl: String, - @QueryParam("uploadId") uploadId: Optional[String], - @QueryParam("numParts") numParts: Optional[Integer], - payload: Map[ + @QueryParam("modelName") modelName: String, + @QueryParam("type") operationType: String, + @QueryParam("filePath") encodedUrl: String, + @QueryParam("uploadId") uploadId: Optional[String], + @QueryParam("numParts") numParts: Optional[Integer], + payload: Map[ String, Any ], // Expecting {"parts": [...], "physicalAddress": "s3://bucket/path"} - @Auth user: SessionUser + @Auth user: SessionUser ): Response = { val uid = user.getUid @@ -1228,10 +1235,10 @@ class ModelResource { } private def generatePresignedResponse( - encodedUrl: String, - modelName: String, - commitHash: String, - uid: Integer + encodedUrl: String, + modelName: String, + commitHash: String, + uid: Integer ): Response = { resolveModelAndPath(encodedUrl, modelName, commitHash, uid) match { case Left(errorResponse) => @@ -1249,10 +1256,10 @@ class ModelResource { } private def resolveModelAndPath( - encodedUrl: String, - modelName: String, - commitHash: String, - uid: Integer + encodedUrl: String, + modelName: String, + commitHash: String, + uid: Integer ): Either[Response, (String, String, String)] = { val decodedPathStr = URLDecoder.decode(encodedUrl, StandardCharsets.UTF_8.name()) @@ -1288,14 +1295,20 @@ class ModelResource { case (None, None) => // Case 3: Neither modelName nor commitHash are provided, resolve normally val response = withTransaction(context) { ctx => + println("Resolving file path without modelName and commitHash") val fileUri = FileResolver.resolve(decodedPathStr) + println(s"Resolved file URI: $fileUri") val document = DocumentFactory.openReadonlyDocument(fileUri).asInstanceOf[OnDataset] + println( + s"Extracted model: ${document.getDatasetName()}, versionHash: ${document.getVersionHash()}, fileRelativePath: ${document.getFileRelativePath()}" + ) val modelDao = new ModelDao(ctx.configuration()) + val models = modelDao.fetchByName(document.getDatasetName()).asScala.toList if (models.isEmpty || !userHasReadAccess(ctx, models.head.getMid, uid)) throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_MODEL_MESSAGE) - + // Standard read access check only - download restrictions handled per endpoint // Non-download operations (viewing) should work for all public models diff --git a/core/gui/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts b/core/gui/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts index c9bec24c60..8ec8c40c59 100644 --- a/core/gui/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts +++ b/core/gui/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts @@ -48,7 +48,7 @@ export class FilesUploaderComponent { // four types: "success", "info", "warning" and "error" fileUploadBannerType: "error" | "success" | "info" | "warning" = "success"; fileUploadBannerMessage: string = ""; - singleFileUploadMaxSizeMB: number = 20; + singleFileUploadMaxSizeMB: number = 2000; constructor( private notificationService: NotificationService, diff --git a/core/gui/src/app/dashboard/component/user/user-model/user-model.component.ts b/core/gui/src/app/dashboard/component/user/user-model/user-model.component.ts index e78bce419f..23f3f991a3 100644 --- a/core/gui/src/app/dashboard/component/user/user-model/user-model.component.ts +++ b/core/gui/src/app/dashboard/component/user/user-model/user-model.component.ts @@ -22,24 +22,20 @@ import { AfterViewInit, Component, ViewChild } from "@angular/core"; import { UserService } from "../../../../common/service/user/user.service"; import { Router } from "@angular/router"; import { SearchService } from "../../../service/user/search.service"; -import { DatasetService } from "../../../service/user/dataset/dataset.service"; import { SortMethod } from "../../../type/sort-method"; -import { DashboardEntry, UserInfo } from "../../../type/dashboard-entry"; +import { DashboardEntry } from "../../../type/dashboard-entry"; import { SearchResultsComponent } from "../search-results/search-results.component"; import { FiltersComponent } from "../filters/filters.component"; import { firstValueFrom } from "rxjs"; -import { - DASHBOARD_USER_DATASET, - DASHBOARD_USER_DATASET_CREATE, - DASHBOARD_USER_MODEL, -} from "../../../../app-routing.constant"; +import { DASHBOARD_USER_MODEL } from "../../../../app-routing.constant"; import { NzModalService } from "ng-zorro-antd/modal"; -import { FileSelectionComponent } from "../../../../workspace/component/file-selection/file-selection.component"; -import { DatasetFileNode, getFullPathFromDatasetFileNode } from "../../../../common/type/datasetVersionFileTree"; -import { UserModelVersionCreatorComponent } from "./user-dataset-explorer/user-dataset-version-creator/user-model-version-creator.component"; +import { + UserModelVersionCreatorComponent, +} from "./user-dataset-explorer/user-dataset-version-creator/user-model-version-creator.component"; import { DashboardModel } from "../../../type/dashboard-model.interface"; import { NzMessageService } from "ng-zorro-antd/message"; import { map, tap } from "rxjs/operators"; +import { ModelService } from "../../../service/user/model/model.service"; @UntilDestroy() @Component({ @@ -80,13 +76,14 @@ export class UserModelComponent implements AfterViewInit { } private masterFilterList: ReadonlyArray<string> | null = null; + constructor( private modalService: NzModalService, private userService: UserService, private router: Router, private searchService: SearchService, - private datasetService: DatasetService, - private message: NzMessageService + private modelService: ModelService, + private message: NzMessageService, ) { this.userService .userChanged() @@ -145,7 +142,7 @@ export class UserModelComponent implements AfterViewInit { "model", this.sortMethod, isLogin, - includePublic + includePublic, ) .pipe( tap(({ hasMismatch }) => { @@ -153,12 +150,12 @@ export class UserModelComponent implements AfterViewInit { if (this.hasMismatch) { this.message.warning( "There is a mismatch between some models in the database and LakeFS. Only matched models are displayed.", - { nzDuration: 4000 } + { nzDuration: 4000 }, ); } }), - map(({ entries, more }) => ({ entries, more })) - ) + map(({ entries, more }) => ({ entries, more })), + ), ); }); await this.searchResultsComponent.loadMore(); @@ -195,12 +192,13 @@ export class UserModelComponent implements AfterViewInit { if (entry.model.model.mid == undefined) { return; } - this.datasetService - .deleteDatasets(entry.model.model.mid) + + this.modelService + .deleteModels(entry.model.model.mid) .pipe(untilDestroyed(this)) .subscribe(_ => { this.searchResultsComponent.entries = this.searchResultsComponent.entries.filter( - modelEntry => modelEntry.model.model.mid !== entry.model.model.mid + modelEntry => modelEntry.model.model.mid !== entry.model.model.mid, ); }); } diff --git a/core/gui/src/app/dashboard/service/user/model/model.service.ts b/core/gui/src/app/dashboard/service/user/model/model.service.ts index cec8c29c28..06042ab036 100644 --- a/core/gui/src/app/dashboard/service/user/model/model.service.ts +++ b/core/gui/src/app/dashboard/service/user/model/model.service.ts @@ -486,8 +486,8 @@ export class ModelService { return this.http.get<{ fileNodes: DatasetFileNode[]; size: number }>(apiUrl); } - public deleteModels(did: number): Observable<Response> { - return this.http.delete<Response>(`${AppSettings.getApiEndpoint()}/${MODEL_BASE_URL}/${did}`); + public deleteModels(mid: number): Observable<Response> { + return this.http.delete<Response>(`${AppSettings.getApiEndpoint()}/${MODEL_BASE_URL}/${mid}`); } public updateModelName(did: number, name: string): Observable<Response> { diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala index 533b84f986..3e5e85d4dd 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala @@ -22,9 +22,11 @@ package edu.uci.ics.amber.core.storage import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.dao.SqlServer.withTransaction import edu.uci.ics.texera.dao.jooq.generated.tables.Dataset.DATASET +import edu.uci.ics.texera.dao.jooq.generated.tables.Model.MODEL import edu.uci.ics.texera.dao.jooq.generated.tables.DatasetVersion.DATASET_VERSION +import edu.uci.ics.texera.dao.jooq.generated.tables.ModelVersion.MODEL_VERSION import edu.uci.ics.texera.dao.jooq.generated.tables.User.USER -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{Dataset, DatasetVersion} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{Dataset, DatasetVersion, Model} import org.apache.commons.vfs2.FileNotFoundException import java.net.{URI, URLEncoder} @@ -51,7 +53,7 @@ object FileResolver { if (isFileResolved(fileName)) { return new URI(fileName) } - val resolvers: Seq[String => URI] = Seq(localResolveFunc, datasetResolveFunc) + val resolvers: Seq[String => URI] = Seq(localResolveFunc, datasetResolveFunc, modelResolveFunc) // Try each resolver function in sequence resolvers @@ -154,6 +156,73 @@ object FileResolver { } } + private def modelResolveFunc(fileName: String): URI = { + val filePath = Paths.get(fileName) + val pathSegments = (0 until filePath.getNameCount).map(filePath.getName(_).toString).toArray + + // extract info from the user-given fileName + val ownerEmail = pathSegments(0) + val modelName = pathSegments(1) + val versionName = pathSegments(2) + val fileRelativePath = Paths.get(pathSegments.drop(3).head, pathSegments.drop(3).tail: _*) + + // fetch the dataset and version from DB to get dataset ID and version hash + val (model, modelVersion) = + withTransaction( + SqlServer + .getInstance() + .createDSLContext() + ) { ctx => + // fetch the dataset from DB + val model = ctx + .select(MODEL.fields: _*) + .from(MODEL) + .leftJoin(USER) + .on(USER.UID.eq(MODEL.OWNER_UID)) + .where(USER.EMAIL.eq(ownerEmail)) + .and(MODEL.NAME.eq(modelName)) + .fetchOneInto(classOf[Model]) + + // fetch the dataset version from DB + val modelVersion = ctx + .selectFrom(MODEL_VERSION) + .where(MODEL_VERSION.MID.eq(model.getMid)) + .and(MODEL_VERSION.NAME.eq(versionName)) + .fetchOneInto(classOf[DatasetVersion]) + + if (model == null || modelVersion == null) { + throw new FileNotFoundException(s"Model file $fileName not found.") + } + (model, modelVersion) + } + + // Convert each segment of fileRelativePath to an encoded String + val encodedFileRelativePath = fileRelativePath + .iterator() + .asScala + .map { segment => + URLEncoder.encode(segment.toString, StandardCharsets.UTF_8) + } + .toArray + + // Prepend dataset name and versionHash to the encoded path segments + val allPathSegments = Array( + modelName, + modelVersion.getVersionHash + ) ++ encodedFileRelativePath + + // Build the format /{datasetName}/{versionHash}/{fileRelativePath}, both Linux and Windows use forward slash as the splitter + val uriSplitter = "/" + val encodedPath = uriSplitter + allPathSegments.mkString(uriSplitter) + + try { + new URI(DATASET_FILE_URI_SCHEME, "", encodedPath, null) + } catch { + case e: Exception => + throw new FileNotFoundException(s"Dataset file $fileName not found.") + } + } + /** * Checks if a given file path has a valid scheme. *
