This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch chris-big-object-version-0
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 038ebb725649e46142348a536441aed79c51bd92
Author: Kunwoo Park <[email protected]>
AuthorDate: Mon Oct 27 10:27:46 2025 -0700

    Add operator id to the table
---
 .../main/python/core/models/big_object_manager.R   | 119 +++++++++++++++++++--
 .../InitializeExecutorHandler.scala                |   1 +
 .../amber/core/executor/ExecutionContext.scala     |  23 +++-
 .../source/scan/FileScanSourceOpExec.scala         |   8 +-
 sql/texera_ddl.sql                                 |   1 +
 sql/updates/big_object.sql                         |   1 +
 6 files changed, 141 insertions(+), 12 deletions(-)

diff --git a/amber/src/main/python/core/models/big_object_manager.R 
b/amber/src/main/python/core/models/big_object_manager.R
index c365c11141..9e33f23149 100644
--- a/amber/src/main/python/core/models/big_object_manager.R
+++ b/amber/src/main/python/core/models/big_object_manager.R
@@ -27,17 +27,40 @@ if (!require("aws.s3", quietly = TRUE)) {
 
 # BigObjectStream Reference Class
 # Provides stream-like access to big object content
+# 
+# Uses S3 streaming connection (downloads on-demand, not upfront!)
+# Memory usage: O(1) - only current chunk in memory
 BigObjectStream <- setRefClass("BigObjectStream",
     fields = list(conn = "ANY", uri = "character", is_closed = "logical"),
     methods = list(
-        initialize = function(raw_bytes, uri_val) {
-            conn <<- rawConnection(raw_bytes, open = "rb")
+        initialize = function(s3_conn, uri_val) {
+            # Store the S3 connection (NOT raw bytes!)
+            # This enables true streaming (downloads on-demand)
+            conn <<- s3_conn
             uri <<- uri_val
             is_closed <<- FALSE
         },
         read = function(n = -1L) {
             if (is_closed) stop("Stream is closed")
-            readBin(conn, "raw", if (n == -1L) 1e9 else n)
+            
+            if (n == -1L) {
+                # Read all remaining data in chunks to avoid memory spike
+                # Downloads incrementally from S3 (not all at once!)
+                result <- raw(0)
+                chunk_size <- 10 * 1024 * 1024  # 10MB chunks
+                repeat {
+                    chunk <- tryCatch(
+                        readBin(conn, "raw", chunk_size),
+                        error = function(e) raw(0)
+                    )
+                    if (length(chunk) == 0) break
+                    result <- c(result, chunk)
+                }
+                result
+            } else {
+                # Read exactly n bytes (downloads only n bytes from S3!)
+                readBin(conn, "raw", n)
+            }
         },
         close = function() {
             if (!is_closed) {
@@ -73,6 +96,9 @@ BigObjectManager <- list(
         parts <- strsplit(sub("^s3://", "", uri), "/", fixed = TRUE)[[1]]
         if (length(parts) < 2) stop(paste("Invalid S3 URI format:", uri))
         
+        bucket <- parts[1]
+        key <- paste(parts[-1], collapse = "/")
+        
         # Configure S3 credentials from environment variables
         Sys.setenv(
             AWS_ACCESS_KEY_ID = Sys.getenv("STORAGE_S3_AUTH_USERNAME", 
"texera_minio"),
@@ -81,19 +107,96 @@ BigObjectManager <- list(
             AWS_DEFAULT_REGION = Sys.getenv("STORAGE_S3_REGION", "us-west-2")
         )
         
-        # Fetch object from S3
-        raw_bytes <- tryCatch(
-            aws.s3::get_object(
+        # Create TRUE STREAMING connection to S3
+        # This does NOT download the file - downloads happen on-demand as you 
read()!
+        message("BigObjectManager: Opening streaming connection for: ", uri)
+        start_time <- Sys.time()
+        
+        s3_conn <- tryCatch(
+            aws.s3::s3connection(
+                object = key,
+                bucket = bucket,
+                region = Sys.getenv("AWS_DEFAULT_REGION"),
+                base_url = Sys.getenv("AWS_S3_ENDPOINT"),
+                use_https = grepl("^https://";, Sys.getenv("AWS_S3_ENDPOINT"))
+            ),
+            error = function(e) stop(paste("Failed to open streaming 
connection for", uri, ":", conditionMessage(e)))
+        )
+        
+        open_time <- Sys.time()
+        message(sprintf("BigObjectManager: Streaming connection established in 
%.2f seconds", 
+                       as.numeric(difftime(open_time, start_time, units = 
"secs"))))
+        message("BigObjectManager: Data will be downloaded on-demand as you 
read() - O(1) memory!")
+        
+        BigObjectStream$new(s3_conn, uri)
+    },
+    
+    # Fast path for reading RDS files - downloads directly to disk, skips 
memory
+    readRDS = function(pointer_or_uri) {
+        # Extract from list if needed (for backward compatibility)
+        if (is.list(pointer_or_uri) && length(pointer_or_uri) == 1) {
+            pointer_or_uri <- pointer_or_uri[[1]]
+        }
+        
+        # Get URI string
+        uri <- if (inherits(pointer_or_uri, "BigObjectPointer")) {
+            pointer_or_uri$uri
+        } else if (is.character(pointer_or_uri)) {
+            pointer_or_uri
+        } else {
+            stop("Expected BigObjectPointer or character URI")
+        }
+        
+        if (!grepl("^s3://", uri)) stop(paste("Invalid S3 URI:", uri))
+        
+        # Parse s3://bucket/key
+        parts <- strsplit(sub("^s3://", "", uri), "/", fixed = TRUE)[[1]]
+        if (length(parts) < 2) stop(paste("Invalid S3 URI format:", uri))
+        
+        # Configure S3 credentials from environment variables
+        Sys.setenv(
+            AWS_ACCESS_KEY_ID = Sys.getenv("STORAGE_S3_AUTH_USERNAME", 
"texera_minio"),
+            AWS_SECRET_ACCESS_KEY = Sys.getenv("STORAGE_S3_AUTH_PASSWORD", 
"password"),
+            AWS_S3_ENDPOINT = Sys.getenv("STORAGE_S3_ENDPOINT", 
"localhost:9000"),
+            AWS_DEFAULT_REGION = Sys.getenv("STORAGE_S3_REGION", "us-west-2")
+        )
+        
+        message("BigObjectManager: Fast readRDS for: ", uri)
+        start_time <- Sys.time()
+        
+        # Download directly to temp file (skips loading into memory!)
+        temp_file <- tempfile(fileext = ".rds")
+        tryCatch(
+            aws.s3::save_object(
                 object = paste(parts[-1], collapse = "/"),
                 bucket = parts[1],
+                file = temp_file,
                 region = Sys.getenv("AWS_DEFAULT_REGION"),
                 base_url = Sys.getenv("AWS_S3_ENDPOINT"),
                 use_https = grepl("^https://";, Sys.getenv("AWS_S3_ENDPOINT"))
             ),
-            error = function(e) stop(paste("Failed to open", uri, ":", 
conditionMessage(e)))
+            error = function(e) stop(paste("Failed to download", uri, ":", 
conditionMessage(e)))
         )
         
-        BigObjectStream$new(raw_bytes, uri)
+        download_time <- Sys.time()
+        file_size_gb <- file.info(temp_file)$size / 1e9
+        message(sprintf("BigObjectManager: Downloaded %.2f GB to disk in %.2f 
seconds", 
+                       file_size_gb,
+                       as.numeric(difftime(download_time, start_time, units = 
"secs"))))
+        
+        # Read RDS from temp file
+        result <- base::readRDS(temp_file)
+        read_time <- Sys.time()
+        message(sprintf("BigObjectManager: readRDS() took %.2f seconds", 
+                       as.numeric(difftime(read_time, download_time, units = 
"secs"))))
+        
+        unlink(temp_file)  # Clean up
+        
+        total_time <- Sys.time()
+        message(sprintf("BigObjectManager: Total time: %.2f seconds", 
+                       as.numeric(difftime(total_time, start_time, units = 
"secs"))))
+        
+        result
     }
 )
 
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index b1d6b2f609..871d1a2efb 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -45,6 +45,7 @@ trait InitializeExecutorHandler {
 
     // Set execution context for this thread
     ExecutionContext.setExecutionId(req.executionId)
+    
ExecutionContext.setOperatorId(VirtualIdentityUtils.getPhysicalOpId(actorId).logicalOpId.id)
 
     dp.executor = req.opExecInitInfo match {
       case OpExecWithClassName(className, descString) =>
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
index af33e84d2a..f448f3eb59 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
@@ -21,10 +21,12 @@ package org.apache.amber.core.executor
 
 /**
   * ExecutionContext provides thread-local access to execution metadata.
-  * This allows operator executors to access execution ID.
+  * This allows operator executors to access execution ID and operator ID.
   */
 object ExecutionContext {
   private val executionIdThreadLocal: ThreadLocal[Option[Int]] = 
ThreadLocal.withInitial(() => None)
+  private val operatorIdThreadLocal: ThreadLocal[Option[String]] =
+    ThreadLocal.withInitial(() => None)
 
   /**
     * Sets the execution ID for the current thread.
@@ -43,10 +45,27 @@ object ExecutionContext {
   }
 
   /**
-    * Clears the execution ID for the current thread.
+    * Sets the operator ID for the current thread.
+    * Should be called when initializing an executor.
+    */
+  def setOperatorId(operatorId: String): Unit = {
+    operatorIdThreadLocal.set(Some(operatorId))
+  }
+
+  /**
+    * Gets the operator ID for the current thread.
+    * @return Some(operatorId) if set, None otherwise
+    */
+  def getOperatorId: Option[String] = {
+    operatorIdThreadLocal.get()
+  }
+
+  /**
+    * Clears the execution context for the current thread.
     * Should be called when cleaning up.
     */
   def clear(): Unit = {
     executionIdThreadLocal.remove()
+    operatorIdThreadLocal.remove()
   }
 }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
index 1b5a012b00..5013289c44 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
@@ -87,12 +87,16 @@ class FileScanSourceOpExec private[scan] (
                 new String(toByteArray(entry), desc.fileEncoding.getCharset)
               case FileAttributeType.BIG_OBJECT =>
                 // For big objects, create a big object pointer from the input 
stream
-                // Get execution ID from thread-local context
+                // Get execution ID and operator ID from thread-local context
                 val executionId = ExecutionContext.getExecutionId
                   .getOrElse(
                     throw new IllegalStateException("Execution ID not set in 
ExecutionContext")
                   )
-                BigObjectManager.create(executionId, entry)
+                val operatorId = ExecutionContext.getOperatorId
+                  .getOrElse(
+                    throw new IllegalStateException("Operator ID not set in 
ExecutionContext")
+                  )
+                BigObjectManager.create(executionId, operatorId, entry)
               case _ => parseField(toByteArray(entry), 
desc.attributeType.getType)
             })
             TupleLike(fields.toSeq: _*)
diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql
index 188f4b8e98..a7db9ebe15 100644
--- a/sql/texera_ddl.sql
+++ b/sql/texera_ddl.sql
@@ -446,6 +446,7 @@ END $$;
 
 CREATE TABLE big_object (
   execution_id INT NOT NULL,
+  operator_id VARCHAR(100) NOT NULL,
   uri TEXT NOT NULL UNIQUE,
   creation_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
   FOREIGN KEY (execution_id) REFERENCES workflow_executions(eid) ON DELETE 
CASCADE
diff --git a/sql/updates/big_object.sql b/sql/updates/big_object.sql
index 0ffaabcc2c..bc762bb346 100644
--- a/sql/updates/big_object.sql
+++ b/sql/updates/big_object.sql
@@ -27,6 +27,7 @@ SET search_path TO texera_db;
 -- ============================================
 CREATE TABLE big_object (
   execution_id INT NOT NULL,
+  operator_id VARCHAR(100) NOT NULL,
   uri TEXT NOT NULL UNIQUE,
   creation_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
   FOREIGN KEY (execution_id) REFERENCES workflow_executions(eid) ON DELETE 
CASCADE

Reply via email to