This is an automated email from the ASF dual-hosted git repository. kunwp1 pushed a commit to branch chris-big-object-version-0 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 038ebb725649e46142348a536441aed79c51bd92 Author: Kunwoo Park <[email protected]> AuthorDate: Mon Oct 27 10:27:46 2025 -0700 Add operator id to the table --- .../main/python/core/models/big_object_manager.R | 119 +++++++++++++++++++-- .../InitializeExecutorHandler.scala | 1 + .../amber/core/executor/ExecutionContext.scala | 23 +++- .../source/scan/FileScanSourceOpExec.scala | 8 +- sql/texera_ddl.sql | 1 + sql/updates/big_object.sql | 1 + 6 files changed, 141 insertions(+), 12 deletions(-) diff --git a/amber/src/main/python/core/models/big_object_manager.R b/amber/src/main/python/core/models/big_object_manager.R index c365c11141..9e33f23149 100644 --- a/amber/src/main/python/core/models/big_object_manager.R +++ b/amber/src/main/python/core/models/big_object_manager.R @@ -27,17 +27,40 @@ if (!require("aws.s3", quietly = TRUE)) { # BigObjectStream Reference Class # Provides stream-like access to big object content +# +# Uses S3 streaming connection (downloads on-demand, not upfront!) +# Memory usage: O(1) - only current chunk in memory BigObjectStream <- setRefClass("BigObjectStream", fields = list(conn = "ANY", uri = "character", is_closed = "logical"), methods = list( - initialize = function(raw_bytes, uri_val) { - conn <<- rawConnection(raw_bytes, open = "rb") + initialize = function(s3_conn, uri_val) { + # Store the S3 connection (NOT raw bytes!) + # This enables true streaming (downloads on-demand) + conn <<- s3_conn uri <<- uri_val is_closed <<- FALSE }, read = function(n = -1L) { if (is_closed) stop("Stream is closed") - readBin(conn, "raw", if (n == -1L) 1e9 else n) + + if (n == -1L) { + # Read all remaining data in chunks to avoid memory spike + # Downloads incrementally from S3 (not all at once!) + result <- raw(0) + chunk_size <- 10 * 1024 * 1024 # 10MB chunks + repeat { + chunk <- tryCatch( + readBin(conn, "raw", chunk_size), + error = function(e) raw(0) + ) + if (length(chunk) == 0) break + result <- c(result, chunk) + } + result + } else { + # Read exactly n bytes (downloads only n bytes from S3!) + readBin(conn, "raw", n) + } }, close = function() { if (!is_closed) { @@ -73,6 +96,9 @@ BigObjectManager <- list( parts <- strsplit(sub("^s3://", "", uri), "/", fixed = TRUE)[[1]] if (length(parts) < 2) stop(paste("Invalid S3 URI format:", uri)) + bucket <- parts[1] + key <- paste(parts[-1], collapse = "/") + # Configure S3 credentials from environment variables Sys.setenv( AWS_ACCESS_KEY_ID = Sys.getenv("STORAGE_S3_AUTH_USERNAME", "texera_minio"), @@ -81,19 +107,96 @@ BigObjectManager <- list( AWS_DEFAULT_REGION = Sys.getenv("STORAGE_S3_REGION", "us-west-2") ) - # Fetch object from S3 - raw_bytes <- tryCatch( - aws.s3::get_object( + # Create TRUE STREAMING connection to S3 + # This does NOT download the file - downloads happen on-demand as you read()! + message("BigObjectManager: Opening streaming connection for: ", uri) + start_time <- Sys.time() + + s3_conn <- tryCatch( + aws.s3::s3connection( + object = key, + bucket = bucket, + region = Sys.getenv("AWS_DEFAULT_REGION"), + base_url = Sys.getenv("AWS_S3_ENDPOINT"), + use_https = grepl("^https://", Sys.getenv("AWS_S3_ENDPOINT")) + ), + error = function(e) stop(paste("Failed to open streaming connection for", uri, ":", conditionMessage(e))) + ) + + open_time <- Sys.time() + message(sprintf("BigObjectManager: Streaming connection established in %.2f seconds", + as.numeric(difftime(open_time, start_time, units = "secs")))) + message("BigObjectManager: Data will be downloaded on-demand as you read() - O(1) memory!") + + BigObjectStream$new(s3_conn, uri) + }, + + # Fast path for reading RDS files - downloads directly to disk, skips memory + readRDS = function(pointer_or_uri) { + # Extract from list if needed (for backward compatibility) + if (is.list(pointer_or_uri) && length(pointer_or_uri) == 1) { + pointer_or_uri <- pointer_or_uri[[1]] + } + + # Get URI string + uri <- if (inherits(pointer_or_uri, "BigObjectPointer")) { + pointer_or_uri$uri + } else if (is.character(pointer_or_uri)) { + pointer_or_uri + } else { + stop("Expected BigObjectPointer or character URI") + } + + if (!grepl("^s3://", uri)) stop(paste("Invalid S3 URI:", uri)) + + # Parse s3://bucket/key + parts <- strsplit(sub("^s3://", "", uri), "/", fixed = TRUE)[[1]] + if (length(parts) < 2) stop(paste("Invalid S3 URI format:", uri)) + + # Configure S3 credentials from environment variables + Sys.setenv( + AWS_ACCESS_KEY_ID = Sys.getenv("STORAGE_S3_AUTH_USERNAME", "texera_minio"), + AWS_SECRET_ACCESS_KEY = Sys.getenv("STORAGE_S3_AUTH_PASSWORD", "password"), + AWS_S3_ENDPOINT = Sys.getenv("STORAGE_S3_ENDPOINT", "localhost:9000"), + AWS_DEFAULT_REGION = Sys.getenv("STORAGE_S3_REGION", "us-west-2") + ) + + message("BigObjectManager: Fast readRDS for: ", uri) + start_time <- Sys.time() + + # Download directly to temp file (skips loading into memory!) + temp_file <- tempfile(fileext = ".rds") + tryCatch( + aws.s3::save_object( object = paste(parts[-1], collapse = "/"), bucket = parts[1], + file = temp_file, region = Sys.getenv("AWS_DEFAULT_REGION"), base_url = Sys.getenv("AWS_S3_ENDPOINT"), use_https = grepl("^https://", Sys.getenv("AWS_S3_ENDPOINT")) ), - error = function(e) stop(paste("Failed to open", uri, ":", conditionMessage(e))) + error = function(e) stop(paste("Failed to download", uri, ":", conditionMessage(e))) ) - BigObjectStream$new(raw_bytes, uri) + download_time <- Sys.time() + file_size_gb <- file.info(temp_file)$size / 1e9 + message(sprintf("BigObjectManager: Downloaded %.2f GB to disk in %.2f seconds", + file_size_gb, + as.numeric(difftime(download_time, start_time, units = "secs")))) + + # Read RDS from temp file + result <- base::readRDS(temp_file) + read_time <- Sys.time() + message(sprintf("BigObjectManager: readRDS() took %.2f seconds", + as.numeric(difftime(read_time, download_time, units = "secs")))) + + unlink(temp_file) # Clean up + + total_time <- Sys.time() + message(sprintf("BigObjectManager: Total time: %.2f seconds", + as.numeric(difftime(total_time, start_time, units = "secs")))) + + result } ) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index b1d6b2f609..871d1a2efb 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -45,6 +45,7 @@ trait InitializeExecutorHandler { // Set execution context for this thread ExecutionContext.setExecutionId(req.executionId) + ExecutionContext.setOperatorId(VirtualIdentityUtils.getPhysicalOpId(actorId).logicalOpId.id) dp.executor = req.opExecInitInfo match { case OpExecWithClassName(className, descString) => diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala index af33e84d2a..f448f3eb59 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala @@ -21,10 +21,12 @@ package org.apache.amber.core.executor /** * ExecutionContext provides thread-local access to execution metadata. - * This allows operator executors to access execution ID. + * This allows operator executors to access execution ID and operator ID. */ object ExecutionContext { private val executionIdThreadLocal: ThreadLocal[Option[Int]] = ThreadLocal.withInitial(() => None) + private val operatorIdThreadLocal: ThreadLocal[Option[String]] = + ThreadLocal.withInitial(() => None) /** * Sets the execution ID for the current thread. @@ -43,10 +45,27 @@ object ExecutionContext { } /** - * Clears the execution ID for the current thread. + * Sets the operator ID for the current thread. + * Should be called when initializing an executor. + */ + def setOperatorId(operatorId: String): Unit = { + operatorIdThreadLocal.set(Some(operatorId)) + } + + /** + * Gets the operator ID for the current thread. + * @return Some(operatorId) if set, None otherwise + */ + def getOperatorId: Option[String] = { + operatorIdThreadLocal.get() + } + + /** + * Clears the execution context for the current thread. * Should be called when cleaning up. */ def clear(): Unit = { executionIdThreadLocal.remove() + operatorIdThreadLocal.remove() } } diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala index 1b5a012b00..5013289c44 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala @@ -87,12 +87,16 @@ class FileScanSourceOpExec private[scan] ( new String(toByteArray(entry), desc.fileEncoding.getCharset) case FileAttributeType.BIG_OBJECT => // For big objects, create a big object pointer from the input stream - // Get execution ID from thread-local context + // Get execution ID and operator ID from thread-local context val executionId = ExecutionContext.getExecutionId .getOrElse( throw new IllegalStateException("Execution ID not set in ExecutionContext") ) - BigObjectManager.create(executionId, entry) + val operatorId = ExecutionContext.getOperatorId + .getOrElse( + throw new IllegalStateException("Operator ID not set in ExecutionContext") + ) + BigObjectManager.create(executionId, operatorId, entry) case _ => parseField(toByteArray(entry), desc.attributeType.getType) }) TupleLike(fields.toSeq: _*) diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql index 188f4b8e98..a7db9ebe15 100644 --- a/sql/texera_ddl.sql +++ b/sql/texera_ddl.sql @@ -446,6 +446,7 @@ END $$; CREATE TABLE big_object ( execution_id INT NOT NULL, + operator_id VARCHAR(100) NOT NULL, uri TEXT NOT NULL UNIQUE, creation_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (execution_id) REFERENCES workflow_executions(eid) ON DELETE CASCADE diff --git a/sql/updates/big_object.sql b/sql/updates/big_object.sql index 0ffaabcc2c..bc762bb346 100644 --- a/sql/updates/big_object.sql +++ b/sql/updates/big_object.sql @@ -27,6 +27,7 @@ SET search_path TO texera_db; -- ============================================ CREATE TABLE big_object ( execution_id INT NOT NULL, + operator_id VARCHAR(100) NOT NULL, uri TEXT NOT NULL UNIQUE, creation_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (execution_id) REFERENCES workflow_executions(eid) ON DELETE CASCADE
