This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 08c4334335 feat: Add BigObject Support for Handling Data Larger Than 
2GB in Java (#4067)
08c4334335 is described below

commit 08c43343353aa88e572bd7796d5086c36ee8aa19
Author: Chris <[email protected]>
AuthorDate: Tue Nov 25 11:46:00 2025 -0800

    feat: Add BigObject Support for Handling Data Larger Than 2GB in Java 
(#4067)
    
    <!--
    Thanks for sending a pull request (PR)! Here are some tips for you:
    1. If this is your first time, please read our contributor guidelines:
    [Contributing to
    Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
      2. Ensure you have added or run the appropriate tests for your PR
      3. If the PR is work in progress, mark it a draft on GitHub.
      4. Please write your PR title to summarize what this PR proposes, we
        are following Conventional Commits style for PR titles as well.
      5. Be sure to keep the PR description updated to reflect all changes.
    -->
    
    ### What changes were proposed in this PR?
    <!--
    Please clarify what changes you are proposing. The purpose of this
    section
    is to outline the changes. Here are some tips for you:
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
      3. If it is a refactoring, clarify what has been changed.
      3. It would be helpful to include a before-and-after comparison using
         screenshots or GIFs.
      4. Please consider writing useful notes for better and faster reviews.
    -->
    This PR introduces a new attribute type, `big_object`, that lets Java
    operators pass data larger than 2 GB to downstream operators. Instead of
    storing large data directly in the tuple, the data is uploaded to MinIO,
    and the tuple stores a pointer to that object. Future PRs will add
    support for Python and R UDF operators.
    
    #### Main changes:
    1. MinIO
    - Added a new bucket: `texera-big-objects`.
    - Implemented multipart upload (separate from LakeFS) to efficiently
    handle large uploads
    2. BigObjectManager (Internal Java API)
    - `create()` → Generates a unique S3 URI, registers it in the database,
    and returns the URI string
    - `deleteAllObjects()` → Deletes all big objects from S3 (Please check
    the Note section below)
    3. Streaming I/O Classes
    - `BigObjectOutputStream`: Streams data to S3 using background multipart
    upload
    - `BigObjectInputStream`: Lazily streams data from S3 when reading
    4. Iceberg Integration
    - BigObject pointers are stored as strings in Iceberg
    - A magic suffix is added to attribute names to differentiate them from
    normal strings
    
    ####  User API
    ##### Creating and Writing a BigObject:
    ```java
    // In an OperatorExecutor
    BigObject bigObject = new BigObject();
    try (BigObjectOutputStream out = new BigObjectOutputStream(bigObject)) {
        out.write(myLargeDataBytes);
        // or: out.write(byteArray, offset, length);
    }
    // bigObject is now ready to be added to tuples
    ```
    
    ##### Reading a BigObject:
    ```java
    // Option 1: Read all data at once
    try (BigObjectInputStream in = new BigObjectInputStream(bigObject)) {
        byte[] allData = in.readAllBytes();
        // ... process data
    }
    
    // Option 2: Read a specific amount
    try (BigObjectInputStream in = new BigObjectInputStream(bigObject)) {
        byte[] chunk = in.readNBytes(1024); // Read 1KB
        // ... process chunk
    }
    
    // Option 3: Use as a standard InputStream
    try (BigObjectInputStream in = new BigObjectInputStream(bigObject)) {
        int bytesRead = in.read(buffer, offset, length);
        // ... process data
    }
    ```
    
    #### Note
    This PR does NOT handle lifecycle management for big objects. For now,
    when a workflow or workflow execution is deleted, all related big
    objects in S3 are deleted immediately. We will add proper lifecycle
    management in a future update.
    
    #### System Diagram
    <img width="3444" height="2684" alt="BigObject-Page-1 drawio (4)"
    
src="https://github.com/user-attachments/assets/98eded06-03b2-41be-b50b-0520a654ddca";
    />
    
    
    ### Any related issues, documentation, discussions?
    <!--
    Please use this section to link other resources if not mentioned
    already.
    1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
    #1234`
    or `Closes #1234`. If it is only related, simply mention the issue
    number.
      4. If there is design documentation, please add the link.
      8. If there is a discussion in the mailing list, please add the link.
    -->
    Related to #3787.
    
    
    ### How was this PR tested?
    <!--
    If tests were added, say they were added here. Or simply mention that if
    the PR
    is tested with existing test cases. Make sure to include/update test
    cases that
    check the changes thoroughly including negative and positive cases if
    possible.
    If it was tested in a way different from regular unit tests, please
    clarify how
    you tested step by step, ideally copy and paste-able, so that other
    reviewers can
    test and check, and descendants can verify in the future. If tests were
    not added,
    please describe why they were not added and/or why it was difficult to
    add.
    -->
    
    Tested by running this workflow multiple times and check MinIO dashboard
    to see whether three big objects are created and deleted. Specify the
    file scan operator's property to use any file bigger than 2GB.
    [Big Object Java
    
UDF.json](https://github.com/user-attachments/files/23666312/Big.Object.Java.UDF.json)
    
    
    ### Was this PR authored or co-authored using generative AI tooling?
    <!--
    If generative AI tooling has been used in the process of authoring this
    PR,
    please include the phrase: 'Generated-by: ' followed by the name of the
    tool
    and its version. If no, write 'No'.
    Please refer to the [ASF Generative Tooling
    Guidance](https://www.apache.org/legal/generative-tooling.html) for
    details.
    -->
    Yes.
    
    ---------
    
    Signed-off-by: Chris <[email protected]>
---
 .../dashboard/user/workflow/WorkflowResource.scala |   3 +
 .../texera/web/service/WorkflowService.scala       |   5 +-
 common/workflow-core/build.sbt                     |  13 +-
 .../org/apache/amber/core/tuple/AttributeType.java |   3 +
 .../amber/core/tuple/AttributeTypeUtils.scala      |  21 +-
 .../org/apache/amber/core/tuple/BigObject.java     | 109 +++++
 .../scala/org/apache/amber/util/IcebergUtil.scala  |  89 +++-
 .../texera/service/util/BigObjectInputStream.scala |  82 ++++
 .../texera/service/util/BigObjectManager.scala     |  66 +++
 .../service/util/BigObjectOutputStream.scala       | 121 ++++++
 .../texera/service/util/S3StorageClient.scala      | 120 ++++++
 .../amber/core/tuple/AttributeTypeUtilsSpec.scala  |  21 +
 .../org/apache/amber/util/IcebergUtilSpec.scala    | 100 ++++-
 .../service/util/BigObjectInputStreamSpec.scala    | 352 +++++++++++++++
 .../texera/service/util/BigObjectManagerSpec.scala | 471 +++++++++++++++++++++
 .../service/util/BigObjectOutputStreamSpec.scala   | 252 +++++++++++
 .../texera/service/util/S3StorageClientSpec.scala  | 337 +++++++++++++++
 .../texera/service/util/S3StorageTestBase.scala    |  69 +++
 .../operator/source/scan/FileAttributeType.java    |   5 +-
 .../source/scan/FileScanSourceOpExec.scala         |  18 +-
 .../source/scan/FileScanSourceOpExecSpec.scala     | 162 +++++++
 file-service/build.sbt                             |   3 -
 22 files changed, 2391 insertions(+), 31 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
index 01ae898a66..c6917d9390 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
@@ -36,6 +36,7 @@ import org.apache.texera.dao.jooq.generated.tables.daos.{
   WorkflowUserAccessDao
 }
 import org.apache.texera.dao.jooq.generated.tables.pojos._
+import org.apache.texera.service.util.BigObjectManager
 import org.apache.texera.web.resource.dashboard.hub.EntityType
 import 
org.apache.texera.web.resource.dashboard.hub.HubResource.recordCloneAction
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowAccessResource.hasReadAccess
@@ -600,6 +601,8 @@ class WorkflowResource extends LazyLogging {
         .asScala
         .toList
 
+      BigObjectManager.deleteAllObjects()
+
       // Collect all URIs related to executions for cleanup
       val uris = eids.flatMap { eid =>
         val executionId = ExecutionIdentity(eid.longValue())
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index 01c66fb458..c9f1bee346 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -46,6 +46,7 @@ import 
org.apache.amber.engine.architecture.worker.WorkflowWorker.{
 }
 import org.apache.amber.error.ErrorUtils.{getOperatorFromActorIdOpt, 
getStackTraceWithAllCauses}
 import org.apache.texera.dao.jooq.generated.tables.pojos.User
+import org.apache.texera.service.util.BigObjectManager
 import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
 import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
@@ -307,6 +308,7 @@ class WorkflowService(
     *  2. Clears URI references from the execution registry
     *  3. Safely clears all result and console message documents
     *  4. Expires Iceberg snapshots for runtime statistics
+    *  5. Deletes big objects from MinIO
     *
     * @param eid The execution identity to clean up resources for
     */
@@ -343,6 +345,7 @@ class WorkflowService(
           logger.debug(s"Error processing document at $uri: 
${error.getMessage}")
       }
     }
+    // Delete big objects
+    BigObjectManager.deleteAllObjects()
   }
-
 }
diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt
index 82a79e8e04..ab6b8f27c6 100644
--- a/common/workflow-core/build.sbt
+++ b/common/workflow-core/build.sbt
@@ -83,11 +83,15 @@ Test / PB.protoSources += PB.externalSourcePath.value
 // Test-related Dependencies
 /////////////////////////////////////////////////////////////////////////////
 
+val testcontainersVersion = "0.43.0"
+
 libraryDependencies ++= Seq(
   "org.scalamock" %% "scalamock" % "5.2.0" % Test,                  // 
ScalaMock
   "org.scalatest" %% "scalatest" % "3.2.15" % Test,                 // 
ScalaTest
   "junit" % "junit" % "4.13.2" % Test,                              // JUnit
-  "com.novocode" % "junit-interface" % "0.11" % Test                // SBT 
interface for JUnit
+  "com.novocode" % "junit-interface" % "0.11" % Test,               // SBT 
interface for JUnit
+  "com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersVersion % 
Test,   // Testcontainers ScalaTest integration
+  "com.dimafeng" %% "testcontainers-scala-minio" % testcontainersVersion % 
Test        // MinIO Testcontainer Scala integration
 )
 
 
@@ -183,5 +187,10 @@ libraryDependencies ++= Seq(
   "org.apache.commons" % "commons-vfs2" % "2.9.0",                     // for 
FileResolver throw VFS-related exceptions
   "io.lakefs" % "sdk" % "1.51.0",                                     // for 
lakeFS api calls
   "com.typesafe" % "config" % "1.4.3",                                 // 
config reader
-  "org.apache.commons" % "commons-jcs3-core" % "3.2"                  // 
Apache Commons JCS
+  "org.apache.commons" % "commons-jcs3-core" % "3.2",                 // 
Apache Commons JCS
+  "software.amazon.awssdk" % "s3" % "2.29.51" excludeAll(
+    ExclusionRule(organization = "io.netty")
+  ),
+  "software.amazon.awssdk" % "auth" % "2.29.51",
+  "software.amazon.awssdk" % "regions" % "2.29.51",
 )
\ No newline at end of file
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java
index 472679f527..64fa921d46 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java
@@ -70,6 +70,7 @@ public enum AttributeType implements Serializable {
     BOOLEAN("boolean", Boolean.class),
     TIMESTAMP("timestamp", Timestamp.class),
     BINARY("binary", byte[].class),
+    BIG_OBJECT("big_object", BigObject.class),
     ANY("ANY", Object.class);
 
     private final String name;
@@ -109,6 +110,8 @@ public enum AttributeType implements Serializable {
             return TIMESTAMP;
         } else if (fieldClass.equals(byte[].class)) {
             return BINARY;
+        } else if (fieldClass.equals(BigObject.class)) {
+            return BIG_OBJECT;
         } else {
             return ANY;
         }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
index 7cbfb27179..b959c38981 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
@@ -121,14 +121,15 @@ object AttributeTypeUtils extends Serializable {
   ): Any = {
     if (field == null) return null
     attributeType match {
-      case AttributeType.INTEGER   => parseInteger(field, force)
-      case AttributeType.LONG      => parseLong(field, force)
-      case AttributeType.DOUBLE    => parseDouble(field)
-      case AttributeType.BOOLEAN   => parseBoolean(field)
-      case AttributeType.TIMESTAMP => parseTimestamp(field)
-      case AttributeType.STRING    => field.toString
-      case AttributeType.BINARY    => field
-      case AttributeType.ANY | _   => field
+      case AttributeType.INTEGER    => parseInteger(field, force)
+      case AttributeType.LONG       => parseLong(field, force)
+      case AttributeType.DOUBLE     => parseDouble(field)
+      case AttributeType.BOOLEAN    => parseBoolean(field)
+      case AttributeType.TIMESTAMP  => parseTimestamp(field)
+      case AttributeType.STRING     => field.toString
+      case AttributeType.BINARY     => field
+      case AttributeType.BIG_OBJECT => new BigObject(field.toString)
+      case AttributeType.ANY | _    => field
     }
   }
 
@@ -383,7 +384,9 @@ object AttributeTypeUtils extends Serializable {
       case AttributeType.INTEGER   => tryParseInteger(fieldValue)
       case AttributeType.TIMESTAMP => tryParseTimestamp(fieldValue)
       case AttributeType.BINARY    => tryParseString()
-      case _                       => tryParseString()
+      case AttributeType.BIG_OBJECT =>
+        AttributeType.BIG_OBJECT // Big objects are never inferred from data
+      case _ => tryParseString()
     }
   }
 
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObject.java
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObject.java
new file mode 100644
index 0000000000..2be14dc167
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObject.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.core.tuple;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.amber.core.executor.OperatorExecutor;
+import org.apache.texera.service.util.BigObjectManager;
+
+import java.net.URI;
+import java.util.Objects;
+
+/**
+ * BigObject represents a reference to a large object stored in S3.
+ * 
+ * Each BigObject is identified by an S3 URI (s3://bucket/path/to/object).
+ * BigObjects are automatically tracked and cleaned up when the workflow 
execution completes.
+ */
+public class BigObject {
+    
+    private final String uri;
+    
+    /**
+     * Creates a BigObject from an existing S3 URI.
+     * Used primarily for deserialization from JSON.
+     * 
+     * @param uri S3 URI in the format s3://bucket/path/to/object
+     * @throws IllegalArgumentException if URI is null or doesn't start with 
"s3://"
+     */
+    @JsonCreator
+    public BigObject(@JsonProperty("uri") String uri) {
+        if (uri == null) {
+            throw new IllegalArgumentException("BigObject URI cannot be null");
+        }
+        if (!uri.startsWith("s3://")) {
+            throw new IllegalArgumentException(
+                "BigObject URI must start with 's3://', got: " + uri
+            );
+        }
+        this.uri = uri;
+    }
+    
+    /**
+     * Creates a new BigObject for writing data.
+     * Generates a unique S3 URI.
+     * 
+     * Usage example:
+     * 
+     *   BigObject bigObject = new BigObject();
+     *   try (BigObjectOutputStream out = new 
BigObjectOutputStream(bigObject)) {
+     *     out.write(data);
+     *   }
+     *   // bigObject is now ready to be added to tuples
+     * 
+     */
+    public BigObject() {
+        this(BigObjectManager.create());
+    }
+    
+    @JsonValue
+    public String getUri() {
+        return uri;
+    }
+    
+    public String getBucketName() {
+        return URI.create(uri).getHost();
+    }
+    
+    public String getObjectKey() {
+        String path = URI.create(uri).getPath();
+        return path.startsWith("/") ? path.substring(1) : path;
+    }
+    
+    @Override
+    public String toString() {
+        return uri;
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (!(obj instanceof BigObject)) return false;
+        BigObject that = (BigObject) obj;
+        return Objects.equals(uri, that.uri);
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hash(uri);
+    }
+}
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala 
b/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala
index bc17139641..7216530a91 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala
@@ -20,7 +20,7 @@
 package org.apache.amber.util
 
 import org.apache.amber.config.StorageConfig
-import org.apache.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
+import org.apache.amber.core.tuple.{Attribute, AttributeType, BigObject, 
Schema, Tuple}
 import org.apache.hadoop.conf.Configuration
 import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
 import org.apache.iceberg.data.parquet.GenericParquetReaders
@@ -52,6 +52,9 @@ import scala.jdk.CollectionConverters._
   */
 object IcebergUtil {
 
+  // Unique suffix for BIG_OBJECT field encoding
+  private val BIG_OBJECT_FIELD_SUFFIX = "__texera_big_obj_ptr"
+
   /**
     * Creates and initializes a HadoopCatalog with the given parameters.
     * - Uses an empty Hadoop `Configuration`, meaning the local file system 
(or `file:/`) will be used by default
@@ -200,6 +203,7 @@ object IcebergUtil {
 
   /**
     * Converts a custom Amber `Schema` to an Iceberg `Schema`.
+    * Field names are encoded to preserve BIG_OBJECT type information.
     *
     * @param amberSchema The custom Amber Schema.
     * @return An Iceberg Schema.
@@ -207,13 +211,16 @@ object IcebergUtil {
   def toIcebergSchema(amberSchema: Schema): IcebergSchema = {
     val icebergFields = amberSchema.getAttributes.zipWithIndex.map {
       case (attribute, index) =>
-        Types.NestedField.optional(index + 1, attribute.getName, 
toIcebergType(attribute.getType))
+        val encodedName = encodeBigObjectFieldName(attribute.getName, 
attribute.getType)
+        val icebergType = toIcebergType(attribute.getType)
+        Types.NestedField.optional(index + 1, encodedName, icebergType)
     }
     new IcebergSchema(icebergFields.asJava)
   }
 
   /**
     * Converts a custom Amber `AttributeType` to an Iceberg `Type`.
+    * Note: BIG_OBJECT is stored as StringType; field name encoding is used to 
distinguish it.
     *
     * @param attributeType The custom Amber AttributeType.
     * @return The corresponding Iceberg Type.
@@ -227,6 +234,8 @@ object IcebergUtil {
       case AttributeType.BOOLEAN   => Types.BooleanType.get()
       case AttributeType.TIMESTAMP => Types.TimestampType.withoutZone()
       case AttributeType.BINARY    => Types.BinaryType.get()
+      case AttributeType.BIG_OBJECT =>
+        Types.StringType.get() // Store BigObjectPointer URI as string
       case AttributeType.ANY =>
         throw new IllegalArgumentException("ANY type is not supported in 
Iceberg")
     }
@@ -243,13 +252,15 @@ object IcebergUtil {
 
     tuple.schema.getAttributes.zipWithIndex.foreach {
       case (attribute, index) =>
+        val fieldName = encodeBigObjectFieldName(attribute.getName, 
attribute.getType)
         val value = tuple.getField[AnyRef](index) match {
-          case null               => null
-          case ts: Timestamp      => 
ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
-          case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
-          case other              => other
+          case null                 => null
+          case ts: Timestamp        => 
ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
+          case bytes: Array[Byte]   => ByteBuffer.wrap(bytes)
+          case bigObjPtr: BigObject => bigObjPtr.getUri
+          case other                => other
         }
-        record.setField(attribute.getName, value)
+        record.setField(fieldName, value)
     }
 
     record
@@ -264,23 +275,69 @@ object IcebergUtil {
     */
   def fromRecord(record: Record, amberSchema: Schema): Tuple = {
     val fieldValues = amberSchema.getAttributes.map { attribute =>
-      val value = record.getField(attribute.getName) match {
+      val fieldName = encodeBigObjectFieldName(attribute.getName, 
attribute.getType)
+      val rawValue = record.getField(fieldName)
+
+      rawValue match {
         case null               => null
         case ldt: LocalDateTime => Timestamp.valueOf(ldt)
         case buffer: ByteBuffer =>
           val bytes = new Array[Byte](buffer.remaining())
           buffer.get(bytes)
           bytes
+        case uri: String if attribute.getType == AttributeType.BIG_OBJECT =>
+          new BigObject(uri)
         case other => other
       }
-      value
     }
 
     Tuple(amberSchema, fieldValues.toArray)
   }
 
+  /**
+    * Encodes a field name for BIG_OBJECT types by adding a unique system 
suffix.
+    * This ensures BIG_OBJECT fields can be identified when reading from 
Iceberg.
+    *
+    * @param fieldName The original field name
+    * @param attributeType The attribute type
+    * @return The encoded field name with a unique suffix for BIG_OBJECT types
+    */
+  private def encodeBigObjectFieldName(fieldName: String, attributeType: 
AttributeType): String = {
+    if (attributeType == AttributeType.BIG_OBJECT) {
+      s"${fieldName}${BIG_OBJECT_FIELD_SUFFIX}"
+    } else {
+      fieldName
+    }
+  }
+
+  /**
+    * Decodes a field name by removing the unique system suffix if present.
+    * This restores the original user-defined field name.
+    *
+    * @param fieldName The encoded field name
+    * @return The original field name with system suffix removed
+    */
+  private def decodeBigObjectFieldName(fieldName: String): String = {
+    if (isBigObjectField(fieldName)) {
+      fieldName.substring(0, fieldName.length - BIG_OBJECT_FIELD_SUFFIX.length)
+    } else {
+      fieldName
+    }
+  }
+
+  /**
+    * Checks if a field name indicates a BIG_OBJECT type by examining the 
unique suffix.
+    *
+    * @param fieldName The field name to check
+    * @return true if the field represents a BIG_OBJECT type, false otherwise
+    */
+  private def isBigObjectField(fieldName: String): Boolean = {
+    fieldName.endsWith(BIG_OBJECT_FIELD_SUFFIX)
+  }
+
   /**
     * Converts an Iceberg `Schema` to an Amber `Schema`.
+    * Field names are decoded to restore original names and detect BIG_OBJECT 
types.
     *
     * @param icebergSchema The Iceberg Schema.
     * @return The corresponding Amber Schema.
@@ -290,7 +347,10 @@ object IcebergUtil {
       .columns()
       .asScala
       .map { field =>
-        new Attribute(field.name(), 
fromIcebergType(field.`type`().asPrimitiveType()))
+        val fieldName = field.name()
+        val attributeType = fromIcebergType(field.`type`().asPrimitiveType(), 
fieldName)
+        val originalName = decodeBigObjectFieldName(fieldName)
+        new Attribute(originalName, attributeType)
       }
       .toList
 
@@ -301,11 +361,16 @@ object IcebergUtil {
     * Converts an Iceberg `Type` to an Amber `AttributeType`.
     *
     * @param icebergType The Iceberg Type.
+    * @param fieldName The field name (used to detect BIG_OBJECT by suffix).
     * @return The corresponding Amber AttributeType.
     */
-  def fromIcebergType(icebergType: PrimitiveType): AttributeType = {
+  def fromIcebergType(
+      icebergType: PrimitiveType,
+      fieldName: String = ""
+  ): AttributeType = {
     icebergType match {
-      case _: Types.StringType    => AttributeType.STRING
+      case _: Types.StringType =>
+        if (isBigObjectField(fieldName)) AttributeType.BIG_OBJECT else 
AttributeType.STRING
       case _: Types.IntegerType   => AttributeType.INTEGER
       case _: Types.LongType      => AttributeType.LONG
       case _: Types.DoubleType    => AttributeType.DOUBLE
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala
new file mode 100644
index 0000000000..cdc7e5b77a
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.apache.amber.core.tuple.BigObject
+
+import java.io.InputStream
+
+/**
+  * InputStream for reading BigObject data from S3.
+  *
+  * The underlying S3 download is lazily initialized on first read.
+  * The stream will fail if the S3 object doesn't exist when read is attempted.
+  *
+  * Usage:
+  * {{{
+  *   val bigObject: BigObject = ...
+  *   try (val in = new BigObjectInputStream(bigObject)) {
+  *     val bytes = in.readAllBytes()
+  *   }
+  * }}}
+  */
+class BigObjectInputStream(bigObject: BigObject) extends InputStream {
+
+  require(bigObject != null, "BigObject cannot be null")
+
+  // Lazy initialization - downloads only when first read() is called
+  private lazy val underlying: InputStream =
+    S3StorageClient.downloadObject(bigObject.getBucketName, 
bigObject.getObjectKey)
+
+  @volatile private var closed = false
+
+  override def read(): Int = whenOpen(underlying.read())
+
+  override def read(b: Array[Byte], off: Int, len: Int): Int =
+    whenOpen(underlying.read(b, off, len))
+
+  override def readAllBytes(): Array[Byte] = 
whenOpen(underlying.readAllBytes())
+
+  override def readNBytes(n: Int): Array[Byte] = 
whenOpen(underlying.readNBytes(n))
+
+  override def skip(n: Long): Long = whenOpen(underlying.skip(n))
+
+  override def available(): Int = whenOpen(underlying.available())
+
+  override def close(): Unit = {
+    if (!closed) {
+      closed = true
+      if (underlying != null) { // Only close if initialized
+        underlying.close()
+      }
+    }
+  }
+
+  override def markSupported(): Boolean = whenOpen(underlying.markSupported())
+
+  override def mark(readlimit: Int): Unit = 
whenOpen(underlying.mark(readlimit))
+
+  override def reset(): Unit = whenOpen(underlying.reset())
+
+  private def whenOpen[T](f: => T): T = {
+    if (closed) throw new java.io.IOException("Stream is closed")
+    f
+  }
+}
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
new file mode 100644
index 0000000000..a6a273eb30
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import com.typesafe.scalalogging.LazyLogging
+
+import java.util.UUID
+
+/**
+  * Manages the lifecycle of BigObjects stored in S3.
+  *
+  * Handles creation and deletion of large objects that exceed
+  * normal tuple size limits.
+  */
+object BigObjectManager extends LazyLogging {
+  private val DEFAULT_BUCKET = "texera-big-objects"
+
+  /**
+    * Creates a new BigObject reference.
+    * The actual data upload happens separately via BigObjectOutputStream.
+    *
+    * @return S3 URI string for the new BigObject (format: s3://bucket/key)
+    */
+  def create(): String = {
+    S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET)
+
+    val objectKey = 
s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}"
+    val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
+
+    uri
+  }
+
+  /**
+    * Deletes all big objects from the bucket.
+    *
+    * @throws Exception if the deletion fails
+    * @return Unit
+    */
+  def deleteAllObjects(): Unit = {
+    try {
+      S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects")
+      logger.info(s"Successfully deleted all big objects from bucket: 
$DEFAULT_BUCKET")
+    } catch {
+      case e: Exception =>
+        logger.warn(s"Failed to delete big objects from bucket: 
$DEFAULT_BUCKET", e)
+    }
+  }
+
+}
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala
new file mode 100644
index 0000000000..80214a973f
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.amber.core.tuple.BigObject
+
+import java.io.{IOException, OutputStream, PipedInputStream, PipedOutputStream}
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+
+/**
+  * OutputStream for streaming BigObject data to S3.
+  *
+  * Data is uploaded in the background using multipart upload as you write.
+  * Call close() to complete the upload and ensure all data is persisted.
+  *
+  * Usage:
+  * {{{
+  *   val bigObject = new BigObject()
+  *   try (val out = new BigObjectOutputStream(bigObject)) {
+  *     out.write(myBytes)
+  *   }
+  *   // bigObject is now ready to use
+  * }}}
+  *
+  * Note: Not thread-safe. Do not access from multiple threads concurrently.
+  *
+  * @param bigObject The BigObject reference to write to
+  */
+class BigObjectOutputStream(bigObject: BigObject) extends OutputStream with 
LazyLogging {
+
+  private val PIPE_BUFFER_SIZE = 64 * 1024 // 64KB
+
+  require(bigObject != null, "BigObject cannot be null")
+
+  private val bucketName: String = bigObject.getBucketName
+  private val objectKey: String = bigObject.getObjectKey
+  private implicit val ec: ExecutionContext = ExecutionContext.global
+
+  // Pipe: we write to pipedOut, and S3 reads from pipedIn
+  private val pipedIn = new PipedInputStream(PIPE_BUFFER_SIZE)
+  private val pipedOut = new PipedOutputStream(pipedIn)
+
+  @volatile private var closed = false
+  private val uploadException = new AtomicReference[Option[Throwable]](None)
+
+  // Start background upload immediately
+  private val uploadFuture: Future[Unit] = Future {
+    try {
+      S3StorageClient.createBucketIfNotExist(bucketName)
+      S3StorageClient.uploadObject(bucketName, objectKey, pipedIn)
+      logger.debug(s"Upload completed: ${bigObject.getUri}")
+    } catch {
+      case e: Exception =>
+        uploadException.set(Some(e))
+        logger.error(s"Upload failed: ${bigObject.getUri}", e)
+    } finally {
+      pipedIn.close()
+    }
+  }
+
+  override def write(b: Int): Unit = whenOpen(pipedOut.write(b))
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit =
+    whenOpen(pipedOut.write(b, off, len))
+
+  override def flush(): Unit = {
+    if (!closed) pipedOut.flush()
+  }
+
+  /**
+    * Closes the stream and completes the S3 upload.
+    * Blocks until upload is complete. Throws IOException if upload failed.
+    */
+  override def close(): Unit = {
+    if (closed) return
+
+    closed = true
+    try {
+      pipedOut.close()
+      Await.result(uploadFuture, Duration.Inf)
+      checkUploadSuccess()
+    } catch {
+      case e: IOException => throw e
+      case e: Exception =>
+        S3StorageClient.deleteObject(bucketName, objectKey)
+        throw new IOException(s"Failed to complete upload: ${e.getMessage}", e)
+    }
+  }
+
+  private def whenOpen[T](f: => T): T = {
+    if (closed) throw new IOException("Stream is closed")
+    checkUploadSuccess()
+    f
+  }
+
+  private def checkUploadSuccess(): Unit = {
+    uploadException.get().foreach { ex =>
+      throw new IOException("Background upload failed", ex)
+    }
+  }
+}
diff --git 
a/file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
similarity index 59%
rename from 
file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
rename to 
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
index 7c157cc0ae..8c3bc2f5f3 100644
--- 
a/file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
@@ -24,7 +24,9 @@ import 
software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCrede
 import software.amazon.awssdk.regions.Region
 import software.amazon.awssdk.services.s3.model._
 import software.amazon.awssdk.services.s3.{S3Client, S3Configuration}
+import software.amazon.awssdk.core.sync.RequestBody
 
+import java.io.InputStream
 import java.security.MessageDigest
 import scala.jdk.CollectionConverters._
 
@@ -139,4 +141,122 @@ object S3StorageClient {
       s3Client.deleteObjects(deleteObjectsRequest)
     }
   }
+
+  /**
+    * Uploads an object to S3 using multipart upload.
+    * Handles streams of any size without loading into memory.
+    */
+  def uploadObject(bucketName: String, objectKey: String, inputStream: 
InputStream): String = {
+    val buffer = new Array[Byte](MINIMUM_NUM_OF_MULTIPART_S3_PART.toInt)
+
+    // Helper to read a full buffer from input stream
+    def readChunk(): Int = {
+      var offset = 0
+      var read = 0
+      while (
+        offset < buffer.length && {
+          read = inputStream.read(buffer, offset, buffer.length - offset); 
read > 0
+        }
+      ) {
+        offset += read
+      }
+      offset
+    }
+
+    // Read first chunk to check if stream is empty
+    val firstChunkSize = readChunk()
+    if (firstChunkSize == 0) {
+      return s3Client
+        .putObject(
+          PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
+          RequestBody.fromBytes(Array.empty[Byte])
+        )
+        .eTag()
+    }
+
+    val uploadId = s3Client
+      .createMultipartUpload(
+        
CreateMultipartUploadRequest.builder().bucket(bucketName).key(objectKey).build()
+      )
+      .uploadId()
+
+    var uploadSuccess = false
+    try {
+      // Upload all parts using an iterator
+      val allParts = Iterator
+        .iterate((1, firstChunkSize)) { case (partNum, _) => (partNum + 1, 
readChunk()) }
+        .takeWhile { case (_, size) => size > 0 }
+        .map {
+          case (partNumber, chunkSize) =>
+            val eTag = s3Client
+              .uploadPart(
+                UploadPartRequest
+                  .builder()
+                  .bucket(bucketName)
+                  .key(objectKey)
+                  .uploadId(uploadId)
+                  .partNumber(partNumber)
+                  .build(),
+                RequestBody.fromBytes(buffer.take(chunkSize))
+              )
+              .eTag()
+            CompletedPart.builder().partNumber(partNumber).eTag(eTag).build()
+        }
+        .toList
+
+      val result = s3Client
+        .completeMultipartUpload(
+          CompleteMultipartUploadRequest
+            .builder()
+            .bucket(bucketName)
+            .key(objectKey)
+            .uploadId(uploadId)
+            
.multipartUpload(CompletedMultipartUpload.builder().parts(allParts.asJava).build())
+            .build()
+        )
+        .eTag()
+
+      uploadSuccess = true
+      result
+
+    } finally {
+      if (!uploadSuccess) {
+        try {
+          s3Client.abortMultipartUpload(
+            AbortMultipartUploadRequest
+              .builder()
+              .bucket(bucketName)
+              .key(objectKey)
+              .uploadId(uploadId)
+              .build()
+          )
+        } catch { case _: Exception => }
+      }
+    }
+  }
+
+  /**
+    * Downloads an object from S3 as an InputStream.
+    *
+    * @param bucketName The S3 bucket name.
+    * @param objectKey The object key (path) in S3.
+    * @return An InputStream containing the object data.
+    */
+  def downloadObject(bucketName: String, objectKey: String): InputStream = {
+    s3Client.getObject(
+      GetObjectRequest.builder().bucket(bucketName).key(objectKey).build()
+    )
+  }
+
+  /**
+    * Deletes a single object from S3.
+    *
+    * @param bucketName The S3 bucket name.
+    * @param objectKey The object key (path) in S3.
+    */
+  def deleteObject(bucketName: String, objectKey: String): Unit = {
+    s3Client.deleteObject(
+      DeleteObjectRequest.builder().bucket(bucketName).key(objectKey).build()
+    )
+  }
 }
diff --git 
a/common/workflow-core/src/test/scala/org/apache/amber/core/tuple/AttributeTypeUtilsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/amber/core/tuple/AttributeTypeUtilsSpec.scala
index 53e5f68430..492be4a388 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/amber/core/tuple/AttributeTypeUtilsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/amber/core/tuple/AttributeTypeUtilsSpec.scala
@@ -196,6 +196,27 @@ class AttributeTypeUtilsSpec extends AnyFunSuite {
     assert(parseField("anything", AttributeType.ANY) == "anything")
   }
 
+  test("parseField correctly parses to BIG_OBJECT") {
+    // Valid S3 URI strings are converted to BigObject
+    val pointer1 = parseField("s3://bucket/path/to/object", 
AttributeType.BIG_OBJECT)
+      .asInstanceOf[BigObject]
+    assert(pointer1.getUri == "s3://bucket/path/to/object")
+    assert(pointer1.getBucketName == "bucket")
+    assert(pointer1.getObjectKey == "path/to/object")
+
+    // Null input returns null
+    assert(parseField(null, AttributeType.BIG_OBJECT) == null)
+  }
+
+  test("BIG_OBJECT type is preserved but never inferred from data") {
+    // BIG_OBJECT remains BIG_OBJECT when passed as typeSoFar
+    assert(inferField(AttributeType.BIG_OBJECT, "any-value") == 
AttributeType.BIG_OBJECT)
+    assert(inferField(AttributeType.BIG_OBJECT, null) == 
AttributeType.BIG_OBJECT)
+
+    // String data is inferred as STRING, never BIG_OBJECT
+    assert(inferField("s3://bucket/path") == AttributeType.STRING)
+  }
+
   test("compare correctly handles null values for different attribute types") {
     assert(compare(null, null, INTEGER) == 0)
     assert(compare(null, 10, INTEGER) < 0)
diff --git 
a/common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilSpec.scala
index ee7744e815..0b20d5cd97 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilSpec.scala
@@ -19,7 +19,7 @@
 
 package org.apache.amber.util
 
-import org.apache.amber.core.tuple.{AttributeType, Schema, Tuple}
+import org.apache.amber.core.tuple.{AttributeType, BigObject, Schema, Tuple}
 import org.apache.amber.util.IcebergUtil.toIcebergSchema
 import org.apache.iceberg.data.GenericRecord
 import org.apache.iceberg.types.Types
@@ -199,4 +199,102 @@ class IcebergUtilSpec extends AnyFlatSpec {
     assert(tuple.getField[String]("test-6") == "hello world")
     assert(tuple.getField[Array[Byte]]("test-7") sameElements Array[Byte](1, 
2, 3, 4))
   }
+
+  // BIG_OBJECT type tests
+
+  it should "convert BIG_OBJECT type correctly between Texera and Iceberg" in {
+    // BIG_OBJECT stored as StringType with field name suffix
+    assert(IcebergUtil.toIcebergType(AttributeType.BIG_OBJECT) == 
Types.StringType.get())
+    assert(IcebergUtil.fromIcebergType(Types.StringType.get(), "field") == 
AttributeType.STRING)
+    assert(
+      IcebergUtil.fromIcebergType(
+        Types.StringType.get(),
+        "field__texera_big_obj_ptr"
+      ) == AttributeType.BIG_OBJECT
+    )
+  }
+
+  it should "convert schemas with BIG_OBJECT fields correctly" in {
+    val texeraSchema = Schema()
+      .add("id", AttributeType.INTEGER)
+      .add("large_data", AttributeType.BIG_OBJECT)
+
+    val icebergSchema = IcebergUtil.toIcebergSchema(texeraSchema)
+
+    // BIG_OBJECT field gets encoded name with suffix
+    assert(icebergSchema.findField("large_data__texera_big_obj_ptr") != null)
+    assert(
+      icebergSchema.findField("large_data__texera_big_obj_ptr").`type`() == 
Types.StringType.get()
+    )
+
+    // Round-trip preserves schema
+    val roundTripSchema = IcebergUtil.fromIcebergSchema(icebergSchema)
+    assert(roundTripSchema.getAttribute("large_data").getType == 
AttributeType.BIG_OBJECT)
+  }
+
+  it should "convert tuples with BIG_OBJECT to records and back correctly" in {
+    val schema = Schema()
+      .add("id", AttributeType.INTEGER)
+      .add("large_data", AttributeType.BIG_OBJECT)
+
+    val tuple = Tuple
+      .builder(schema)
+      .addSequentially(Array(Int.box(42), new 
BigObject("s3://bucket/object/key.data")))
+      .build()
+
+    val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tuple)
+
+    // BIG_OBJECT stored as URI string with encoded field name
+    assert(record.getField("id") == 42)
+    assert(record.getField("large_data__texera_big_obj_ptr") == 
"s3://bucket/object/key.data")
+
+    // Round-trip preserves data
+    val roundTripTuple = IcebergUtil.fromRecord(record, schema)
+    assert(roundTripTuple == tuple)
+
+    // BigObject properties are accessible
+    val bigObj = roundTripTuple.getField[BigObject]("large_data")
+    assert(bigObj.getUri == "s3://bucket/object/key.data")
+    assert(bigObj.getBucketName == "bucket")
+    assert(bigObj.getObjectKey == "object/key.data")
+  }
+
+  it should "handle null BIG_OBJECT values correctly" in {
+    val schema = Schema().add("data", AttributeType.BIG_OBJECT)
+
+    val tupleWithNull = 
Tuple.builder(schema).addSequentially(Array(null)).build()
+    val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), 
tupleWithNull)
+
+    assert(record.getField("data__texera_big_obj_ptr") == null)
+    assert(IcebergUtil.fromRecord(record, schema) == tupleWithNull)
+  }
+
+  it should "handle multiple BIG_OBJECT fields and mixed types correctly" in {
+    val schema = Schema()
+      .add("int_field", AttributeType.INTEGER)
+      .add("big_obj_1", AttributeType.BIG_OBJECT)
+      .add("string_field", AttributeType.STRING)
+      .add("big_obj_2", AttributeType.BIG_OBJECT)
+
+    val tuple = Tuple
+      .builder(schema)
+      .addSequentially(
+        Array(
+          Int.box(123),
+          new BigObject("s3://bucket1/file1.dat"),
+          "normal string",
+          null // null BIG_OBJECT
+        )
+      )
+      .build()
+
+    val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tuple)
+
+    assert(record.getField("int_field") == 123)
+    assert(record.getField("big_obj_1__texera_big_obj_ptr") == 
"s3://bucket1/file1.dat")
+    assert(record.getField("string_field") == "normal string")
+    assert(record.getField("big_obj_2__texera_big_obj_ptr") == null)
+
+    assert(IcebergUtil.fromRecord(record, schema) == tuple)
+  }
 }
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala
new file mode 100644
index 0000000000..a163326b9d
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.apache.amber.core.tuple.BigObject
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite
+
+import java.io.{ByteArrayInputStream, IOException}
+import scala.util.Random
+
+class BigObjectInputStreamSpec
+    extends AnyFunSuite
+    with S3StorageTestBase
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach {
+
+  private val testBucketName = "test-big-object-input-stream"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    S3StorageClient.createBucketIfNotExist(testBucketName)
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      S3StorageClient.deleteDirectory(testBucketName, "")
+    } catch {
+      case _: Exception => // Ignore cleanup errors
+    }
+    super.afterAll()
+  }
+
+  // Helper methods
+  private def createTestObject(key: String, data: Array[Byte]): BigObject = {
+    S3StorageClient.uploadObject(testBucketName, key, new 
ByteArrayInputStream(data))
+    new BigObject(s"s3://$testBucketName/$key")
+  }
+
+  private def createTestObject(key: String, data: String): BigObject =
+    createTestObject(key, data.getBytes)
+
+  private def generateRandomData(size: Int): Array[Byte] =
+    Array.fill[Byte](size)((Random.nextInt(256) - 128).toByte)
+
+  private def withStream[T](bigObject: BigObject)(f: BigObjectInputStream => 
T): T = {
+    val stream = new BigObjectInputStream(bigObject)
+    try {
+      f(stream)
+    } finally {
+      stream.close()
+    }
+  }
+
+  private def assertThrowsIOExceptionWhenClosed(operation: 
BigObjectInputStream => Unit): Unit = {
+    val bigObject = createTestObject(s"test/closed-${Random.nextInt()}.txt", 
"data")
+    val stream = new BigObjectInputStream(bigObject)
+    stream.close()
+    val exception = intercept[IOException](operation(stream))
+    assert(exception.getMessage.contains("Stream is closed"))
+  }
+
+  // Constructor Tests
+  test("constructor should reject null BigObject") {
+    val exception = intercept[IllegalArgumentException] {
+      new BigObjectInputStream(null)
+    }
+    assert(exception.getMessage.contains("BigObject cannot be null"))
+  }
+
+  test("constructor should accept valid BigObject") {
+    val bigObject = createTestObject("test/valid.txt", "test data")
+    withStream(bigObject) { _ => }
+  }
+
+  // read() Tests
+  test("read() should read single bytes correctly") {
+    val bigObject = createTestObject("test/single-byte.txt", "Hello")
+    withStream(bigObject) { stream =>
+      assert(stream.read() == 'H'.toByte)
+      assert(stream.read() == 'e'.toByte)
+      assert(stream.read() == 'l'.toByte)
+      assert(stream.read() == 'l'.toByte)
+      assert(stream.read() == 'o'.toByte)
+      assert(stream.read() == -1) // EOF
+    }
+  }
+
+  test("read() should return -1 for empty object") {
+    val bigObject = createTestObject("test/empty.txt", "")
+    withStream(bigObject) { stream =>
+      assert(stream.read() == -1)
+    }
+  }
+
+  // read(byte[], int, int) Tests
+  test("read(byte[], int, int) should read data into buffer") {
+    val testData = "Hello, World!"
+    val bigObject = createTestObject("test/buffer-read.txt", testData)
+    withStream(bigObject) { stream =>
+      val buffer = new Array[Byte](testData.length)
+      val bytesRead = stream.read(buffer, 0, buffer.length)
+      assert(bytesRead == testData.length)
+      assert(new String(buffer) == testData)
+    }
+  }
+
+  test("read(byte[], int, int) should handle partial reads and offsets") {
+    val testData = "Hello, World!"
+    val bigObject = createTestObject("test/partial.txt", testData)
+    withStream(bigObject) { stream =>
+      // Test partial read
+      val buffer1 = new Array[Byte](5)
+      assert(stream.read(buffer1, 0, 5) == 5)
+      assert(new String(buffer1) == "Hello")
+    }
+
+    // Test offset
+    withStream(bigObject) { stream =>
+      val buffer2 = new Array[Byte](20)
+      assert(stream.read(buffer2, 5, 10) == 10)
+      assert(new String(buffer2, 5, 10) == "Hello, Wor")
+    }
+  }
+
+  test("read(byte[], int, int) should return -1 at EOF") {
+    val bigObject = createTestObject("test/eof.txt", "test")
+    withStream(bigObject) { stream =>
+      val buffer = new Array[Byte](10)
+      stream.read(buffer, 0, 10)
+      assert(stream.read(buffer, 0, 10) == -1)
+    }
+  }
+
+  // readAllBytes() Tests
+  test("readAllBytes() should read entire object") {
+    val testData = "Hello, World! This is a test."
+    val bigObject = createTestObject("test/read-all.txt", testData)
+    withStream(bigObject) { stream =>
+      assert(new String(stream.readAllBytes()) == testData)
+    }
+  }
+
+  test("readAllBytes() should handle large objects") {
+    val largeData = generateRandomData(1024 * 1024) // 1MB
+    val bigObject = createTestObject("test/large.bin", largeData)
+    withStream(bigObject) { stream =>
+      val bytes = stream.readAllBytes()
+      assert(bytes.length == largeData.length)
+      assert(bytes.sameElements(largeData))
+    }
+  }
+
+  test("readAllBytes() should return empty array for empty object") {
+    val bigObject = createTestObject("test/empty-all.txt", "")
+    withStream(bigObject) { stream =>
+      assert(stream.readAllBytes().length == 0)
+    }
+  }
+
+  // readNBytes() Tests
+  test("readNBytes() should read exactly N bytes") {
+    val testData = "Hello, World! This is a test."
+    val bigObject = createTestObject("test/read-n.txt", testData)
+    withStream(bigObject) { stream =>
+      val bytes = stream.readNBytes(5)
+      assert(bytes.length == 5)
+      assert(new String(bytes) == "Hello")
+    }
+  }
+
+  test("readNBytes() should handle EOF and zero") {
+    val bigObject = createTestObject("test/read-n-eof.txt", "Hello")
+    withStream(bigObject) { stream =>
+      // Request more than available
+      val bytes = stream.readNBytes(100)
+      assert(bytes.length == 5)
+      assert(new String(bytes) == "Hello")
+    }
+
+    // Test n=0
+    withStream(bigObject) { stream =>
+      assert(stream.readNBytes(0).length == 0)
+    }
+  }
+
+  // skip() Tests
+  test("skip() should skip bytes correctly") {
+    val bigObject = createTestObject("test/skip.txt", "Hello, World!")
+    withStream(bigObject) { stream =>
+      assert(stream.skip(7) == 7)
+      assert(stream.read() == 'W'.toByte)
+    }
+  }
+
+  test("skip() should handle EOF and zero") {
+    val bigObject = createTestObject("test/skip-eof.txt", "Hello")
+    withStream(bigObject) { stream =>
+      assert(stream.skip(100) == 5)
+      assert(stream.read() == -1)
+    }
+
+    // Test n=0
+    withStream(bigObject) { stream =>
+      assert(stream.skip(0) == 0)
+    }
+  }
+
+  // available() Tests
+  test("available() should return non-negative value") {
+    val bigObject = createTestObject("test/available.txt", "Hello, World!")
+    withStream(bigObject) { stream =>
+      assert(stream.available() >= 0)
+    }
+  }
+
+  // close() Tests
+  test("close() should be idempotent") {
+    val bigObject = createTestObject("test/close-idempotent.txt", "data")
+    val stream = new BigObjectInputStream(bigObject)
+    stream.close()
+    stream.close() // Should not throw
+    stream.close() // Should not throw
+  }
+
+  test("close() should prevent further operations") {
+    val bigObject = createTestObject("test/close-prevents.txt", "data")
+    val stream = new BigObjectInputStream(bigObject)
+    stream.close()
+
+    intercept[IOException] { stream.read() }
+    intercept[IOException] { stream.readAllBytes() }
+    intercept[IOException] { stream.readNBytes(10) }
+    intercept[IOException] { stream.skip(10) }
+    intercept[IOException] { stream.available() }
+  }
+
+  test("close() should work without reading (lazy initialization)") {
+    val bigObject = createTestObject("test/close-lazy.txt", "data")
+    val stream = new BigObjectInputStream(bigObject)
+    stream.close() // Should not throw
+  }
+
+  // Closed stream tests - consolidated
+  test("operations should throw IOException when stream is closed") {
+    assertThrowsIOExceptionWhenClosed(_.read())
+    assertThrowsIOExceptionWhenClosed(_.read(new Array[Byte](10), 0, 10))
+    assertThrowsIOExceptionWhenClosed(_.readAllBytes())
+    assertThrowsIOExceptionWhenClosed(_.readNBytes(10))
+    assertThrowsIOExceptionWhenClosed(_.skip(10))
+    assertThrowsIOExceptionWhenClosed(_.available())
+    assertThrowsIOExceptionWhenClosed(_.mark(100))
+    assertThrowsIOExceptionWhenClosed(_.reset())
+  }
+
+  // mark/reset Tests
+  test("markSupported() should delegate to underlying stream") {
+    val bigObject = createTestObject("test/mark.txt", "data")
+    withStream(bigObject) { stream =>
+      val supported = stream.markSupported()
+      assert(!supported || supported) // Just verify it's callable
+    }
+  }
+
+  test("mark() and reset() should delegate to underlying stream") {
+    val bigObject = createTestObject("test/mark-reset.txt", "data")
+    withStream(bigObject) { stream =>
+      if (stream.markSupported()) {
+        stream.mark(100)
+        stream.read()
+        stream.reset()
+      }
+    // If not supported, methods should still be callable
+    }
+  }
+
+  // Lazy initialization Tests
+  test("lazy initialization should not download until first read") {
+    val bigObject = createTestObject("test/lazy-init.txt", "data")
+    val stream = new BigObjectInputStream(bigObject)
+    // Creating the stream should not trigger download
+    // Reading should trigger download
+    try {
+      assert(stream.read() == 'd'.toByte)
+    } finally {
+      stream.close()
+    }
+  }
+
+  // Integration Tests
+  test("should handle chunked reading of large objects") {
+    val largeData = generateRandomData(10 * 1024) // 10KB
+    val bigObject = createTestObject("test/chunked.bin", largeData)
+    withStream(bigObject) { stream =>
+      val buffer = new Array[Byte](1024)
+      val output = new java.io.ByteArrayOutputStream()
+      var bytesRead = 0
+
+      while ({
+        bytesRead = stream.read(buffer, 0, buffer.length)
+        bytesRead != -1
+      }) {
+        output.write(buffer, 0, bytesRead)
+      }
+
+      val result = output.toByteArray
+      assert(result.length == largeData.length)
+      assert(result.sameElements(largeData))
+    }
+  }
+
+  test("should handle multiple streams reading same object") {
+    val testData = "Shared data"
+    val bigObject = createTestObject("test/shared.txt", testData)
+
+    val stream1 = new BigObjectInputStream(bigObject)
+    val stream2 = new BigObjectInputStream(bigObject)
+
+    try {
+      assert(new String(stream1.readAllBytes()) == testData)
+      assert(new String(stream2.readAllBytes()) == testData)
+    } finally {
+      stream1.close()
+      stream2.close()
+    }
+  }
+
+  test("should preserve binary data integrity") {
+    val binaryData = Array[Byte](0, 1, 2, 127, -128, -1, 50, 100)
+    val bigObject = createTestObject("test/binary.bin", binaryData)
+    withStream(bigObject) { stream =>
+      assert(stream.readAllBytes().sameElements(binaryData))
+    }
+  }
+}
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala
new file mode 100644
index 0000000000..ce1d4f4e69
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.apache.amber.core.tuple.BigObject
+import org.scalatest.funsuite.AnyFunSuite
+
+class BigObjectManagerSpec extends AnyFunSuite with S3StorageTestBase {
+
+  /** Creates a big object from string data and returns it. */
+  private def createBigObject(data: String): BigObject = {
+    val bigObject = new BigObject()
+    val out = new BigObjectOutputStream(bigObject)
+    try {
+      out.write(data.getBytes)
+    } finally {
+      out.close()
+    }
+    bigObject
+  }
+
+  /** Verifies standard bucket name. */
+  private def assertStandardBucket(pointer: BigObject): Unit = {
+    assert(pointer.getBucketName == "texera-big-objects")
+    assert(pointer.getUri.startsWith("s3://texera-big-objects/"))
+  }
+
+  // ========================================
+  // BigObjectInputStream Tests (Standard Java InputStream)
+  // ========================================
+
+  test("BigObjectInputStream should read all bytes from stream") {
+    val data = "Hello, World! This is a test."
+    val bigObject = createBigObject(data)
+
+    val stream = new BigObjectInputStream(bigObject)
+    assert(stream.readAllBytes().sameElements(data.getBytes))
+    stream.close()
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectInputStream should read exact number of bytes") {
+    val bigObject = createBigObject("0123456789ABCDEF")
+
+    val stream = new BigObjectInputStream(bigObject)
+    val result = stream.readNBytes(10)
+
+    assert(result.length == 10)
+    assert(result.sameElements("0123456789".getBytes))
+    stream.close()
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectInputStream should handle reading more bytes than available") 
{
+    val data = "Short"
+    val bigObject = createBigObject(data)
+
+    val stream = new BigObjectInputStream(bigObject)
+    val result = stream.readNBytes(100)
+
+    assert(result.length == data.length)
+    assert(result.sameElements(data.getBytes))
+    stream.close()
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectInputStream should support standard single-byte read") {
+    val bigObject = createBigObject("ABC")
+
+    val stream = new BigObjectInputStream(bigObject)
+    assert(stream.read() == 65) // 'A'
+    assert(stream.read() == 66) // 'B'
+    assert(stream.read() == 67) // 'C'
+    assert(stream.read() == -1) // EOF
+    stream.close()
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectInputStream should return -1 at EOF") {
+    val bigObject = createBigObject("EOF")
+
+    val stream = new BigObjectInputStream(bigObject)
+    stream.readAllBytes() // Read all data
+    assert(stream.read() == -1)
+    stream.close()
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectInputStream should throw exception when reading from closed 
stream") {
+    val bigObject = createBigObject("test")
+
+    val stream = new BigObjectInputStream(bigObject)
+    stream.close()
+
+    assertThrows[java.io.IOException](stream.read())
+    assertThrows[java.io.IOException](stream.readAllBytes())
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectInputStream should handle multiple close calls") {
+    val bigObject = createBigObject("test")
+
+    val stream = new BigObjectInputStream(bigObject)
+    stream.close()
+    stream.close() // Should not throw
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectInputStream should read large data correctly") {
+    val largeData = Array.fill[Byte](20000)((scala.util.Random.nextInt(256) - 
128).toByte)
+    val bigObject = new BigObject()
+    val out = new BigObjectOutputStream(bigObject)
+    try {
+      out.write(largeData)
+    } finally {
+      out.close()
+    }
+
+    val stream = new BigObjectInputStream(bigObject)
+    val result = stream.readAllBytes()
+    assert(result.sameElements(largeData))
+    stream.close()
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  // ========================================
+  // BigObjectManager Tests
+  // ========================================
+
+  test("BigObjectManager should create a big object") {
+    val pointer = createBigObject("Test big object data")
+
+    assertStandardBucket(pointer)
+  }
+
+  test("BigObjectInputStream should open and read a big object") {
+    val data = "Hello from big object!"
+    val pointer = createBigObject(data)
+
+    val stream = new BigObjectInputStream(pointer)
+    val readData = stream.readAllBytes()
+    stream.close()
+
+    assert(readData.sameElements(data.getBytes))
+  }
+
+  test("BigObjectInputStream should fail to open non-existent big object") {
+    val fakeBigObject = new 
BigObject("s3://texera-big-objects/nonexistent/file")
+    val stream = new BigObjectInputStream(fakeBigObject)
+
+    try {
+      intercept[Exception] {
+        stream.read()
+      }
+    } finally {
+      try { stream.close() }
+      catch { case _: Exception => }
+    }
+  }
+
+  test("BigObjectManager should delete all big objects") {
+    val pointer1 = new BigObject()
+    val out1 = new BigObjectOutputStream(pointer1)
+    try {
+      out1.write("Object 1".getBytes)
+    } finally {
+      out1.close()
+    }
+
+    val pointer2 = new BigObject()
+    val out2 = new BigObjectOutputStream(pointer2)
+    try {
+      out2.write("Object 2".getBytes)
+    } finally {
+      out2.close()
+    }
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectManager should handle delete with no objects gracefully") {
+    BigObjectManager.deleteAllObjects() // Should not throw exception
+  }
+
+  test("BigObjectManager should delete all objects") {
+    val pointer1 = createBigObject("Test data")
+    val pointer2 = createBigObject("Test data")
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectManager should create bucket if it doesn't exist") {
+    val pointer = createBigObject("Test bucket creation")
+
+    assertStandardBucket(pointer)
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectManager should handle large objects correctly") {
+    val largeData = Array.fill[Byte](6 * 1024 * 
1024)((scala.util.Random.nextInt(256) - 128).toByte)
+    val pointer = new BigObject()
+    val out = new BigObjectOutputStream(pointer)
+    try {
+      out.write(largeData)
+    } finally {
+      out.close()
+    }
+
+    val stream = new BigObjectInputStream(pointer)
+    val readData = stream.readAllBytes()
+    stream.close()
+
+    assert(readData.sameElements(largeData))
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectManager should generate unique URIs for different objects") {
+    val testData = "Unique URI test".getBytes
+    val pointer1 = new BigObject()
+    val out1 = new BigObjectOutputStream(pointer1)
+    try {
+      out1.write(testData)
+    } finally {
+      out1.close()
+    }
+
+    val pointer2 = new BigObject()
+    val out2 = new BigObjectOutputStream(pointer2)
+    try {
+      out2.write(testData)
+    } finally {
+      out2.close()
+    }
+
+    assert(pointer1.getUri != pointer2.getUri)
+    assert(pointer1.getObjectKey != pointer2.getObjectKey)
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectInputStream should handle multiple reads from the same big 
object") {
+    val data = "Multiple reads test data"
+    val pointer = createBigObject(data)
+
+    val stream1 = new BigObjectInputStream(pointer)
+    val readData1 = stream1.readAllBytes()
+    stream1.close()
+
+    val stream2 = new BigObjectInputStream(pointer)
+    val readData2 = stream2.readAllBytes()
+    stream2.close()
+
+    assert(readData1.sameElements(data.getBytes))
+    assert(readData2.sameElements(data.getBytes))
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectManager should properly parse bucket name and object key from 
big object") {
+    val bigObject = createBigObject("URI parsing test")
+
+    assertStandardBucket(bigObject)
+    assert(bigObject.getObjectKey.nonEmpty)
+    assert(!bigObject.getObjectKey.startsWith("/"))
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  // ========================================
+  // Object-Oriented API Tests
+  // ========================================
+
+  test("BigObject with BigObjectOutputStream should create a big object") {
+    val data = "Test data for BigObject with BigObjectOutputStream"
+
+    val bigObject = new BigObject()
+    val out = new BigObjectOutputStream(bigObject)
+    try {
+      out.write(data.getBytes)
+    } finally {
+      out.close()
+    }
+
+    assertStandardBucket(bigObject)
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectInputStream constructor should read big object contents") {
+    val data = "Test data for BigObjectInputStream constructor"
+    val bigObject = createBigObject(data)
+
+    val stream = new BigObjectInputStream(bigObject)
+    val readData = stream.readAllBytes()
+    stream.close()
+
+    assert(readData.sameElements(data.getBytes))
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectOutputStream and BigObjectInputStream should work together 
end-to-end") {
+    val data = "End-to-end test data"
+
+    // Create using streaming API
+    val bigObject = new BigObject()
+    val out = new BigObjectOutputStream(bigObject)
+    try {
+      out.write(data.getBytes)
+    } finally {
+      out.close()
+    }
+
+    // Read using standard constructor
+    val stream = new BigObjectInputStream(bigObject)
+    val readData = stream.readAllBytes()
+    stream.close()
+
+    assert(readData.sameElements(data.getBytes))
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  // ========================================
+  // BigObjectOutputStream Tests (New Symmetric API)
+  // ========================================
+
+  test("BigObjectOutputStream should write and upload data to S3") {
+    val data = "Test data for BigObjectOutputStream"
+
+    val bigObject = new BigObject()
+    val outStream = new BigObjectOutputStream(bigObject)
+    outStream.write(data.getBytes)
+    outStream.close()
+
+    assertStandardBucket(bigObject)
+
+    // Verify data can be read back
+    val inStream = new BigObjectInputStream(bigObject)
+    val readData = inStream.readAllBytes()
+    inStream.close()
+
+    assert(readData.sameElements(data.getBytes))
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectOutputStream should create big object") {
+    val data = "Database registration test"
+
+    val bigObject = new BigObject()
+    val outStream = new BigObjectOutputStream(bigObject)
+    outStream.write(data.getBytes)
+    outStream.close()
+
+    assertStandardBucket(bigObject)
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectOutputStream should handle large data correctly") {
+    val largeData = Array.fill[Byte](8 * 1024 * 
1024)((scala.util.Random.nextInt(256) - 128).toByte)
+
+    val bigObject = new BigObject()
+    val outStream = new BigObjectOutputStream(bigObject)
+    outStream.write(largeData)
+    outStream.close()
+
+    // Verify data integrity
+    val inStream = new BigObjectInputStream(bigObject)
+    val readData = inStream.readAllBytes()
+    inStream.close()
+
+    assert(readData.sameElements(largeData))
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectOutputStream should handle multiple writes") {
+    val bigObject = new BigObject()
+    val outStream = new BigObjectOutputStream(bigObject)
+    outStream.write("Hello ".getBytes)
+    outStream.write("World".getBytes)
+    outStream.write("!".getBytes)
+    outStream.close()
+
+    val inStream = new BigObjectInputStream(bigObject)
+    val readData = inStream.readAllBytes()
+    inStream.close()
+
+    assert(readData.sameElements("Hello World!".getBytes))
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectOutputStream should throw exception when writing to closed 
stream") {
+    val bigObject = new BigObject()
+    val outStream = new BigObjectOutputStream(bigObject)
+    outStream.write("test".getBytes)
+    outStream.close()
+
+    assertThrows[java.io.IOException](outStream.write("more".getBytes))
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObjectOutputStream should handle close() being called multiple 
times") {
+    val bigObject = new BigObject()
+    val outStream = new BigObjectOutputStream(bigObject)
+    outStream.write("test".getBytes)
+    outStream.close()
+    outStream.close() // Should not throw
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("New BigObject() constructor should create unique URIs") {
+    val bigObject1 = new BigObject()
+    val bigObject2 = new BigObject()
+
+    assert(bigObject1.getUri != bigObject2.getUri)
+    assert(bigObject1.getObjectKey != bigObject2.getObjectKey)
+
+    BigObjectManager.deleteAllObjects()
+  }
+
+  test("BigObject() and BigObjectOutputStream API should be symmetric with 
input") {
+    val data = "Symmetric API test"
+
+    // Write using new symmetric API
+    val bigObject = new BigObject()
+    val outStream = new BigObjectOutputStream(bigObject)
+    outStream.write(data.getBytes)
+    outStream.close()
+
+    // Read using symmetric API
+    val inStream = new BigObjectInputStream(bigObject)
+    val readData = inStream.readAllBytes()
+    inStream.close()
+
+    assert(readData.sameElements(data.getBytes))
+
+    BigObjectManager.deleteAllObjects()
+  }
+}
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala
new file mode 100644
index 0000000000..14fdfa1ddb
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.apache.amber.core.tuple.BigObject
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite
+
+import java.io.IOException
+import scala.util.Random
+
+class BigObjectOutputStreamSpec
+    extends AnyFunSuite
+    with S3StorageTestBase
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach {
+
+  private val testBucketName = "test-big-object-output-stream"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    S3StorageClient.createBucketIfNotExist(testBucketName)
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      S3StorageClient.deleteDirectory(testBucketName, "")
+    } catch {
+      case _: Exception => // Ignore cleanup errors
+    }
+    super.afterAll()
+  }
+
+  // Helper methods
+  private def createBigObject(key: String): BigObject =
+    new BigObject(s"s3://$testBucketName/$key")
+
+  private def generateRandomData(size: Int): Array[Byte] =
+    Array.fill[Byte](size)((Random.nextInt(256) - 128).toByte)
+
+  private def withStream[T](bigObject: BigObject)(f: BigObjectOutputStream => 
T): T = {
+    val stream = new BigObjectOutputStream(bigObject)
+    try f(stream)
+    finally stream.close()
+  }
+
+  private def readBack(bigObject: BigObject): Array[Byte] = {
+    val inputStream = new BigObjectInputStream(bigObject)
+    try inputStream.readAllBytes()
+    finally inputStream.close()
+  }
+
+  private def writeAndVerify(key: String, data: Array[Byte]): Unit = {
+    val bigObject = createBigObject(key)
+    withStream(bigObject)(_.write(data, 0, data.length))
+    assert(readBack(bigObject).sameElements(data))
+  }
+
+  // === Constructor Tests ===
+  test("should reject null BigObject") {
+    val exception = intercept[IllegalArgumentException](new 
BigObjectOutputStream(null))
+    assert(exception.getMessage.contains("BigObject cannot be null"))
+  }
+
+  // === Basic Write Tests ===
+  test("should write single bytes correctly") {
+    val bigObject = createBigObject("test/single-bytes.txt")
+    withStream(bigObject) { stream =>
+      "Hello".foreach(c => stream.write(c.toByte))
+    }
+    assert(new String(readBack(bigObject)) == "Hello")
+  }
+
+  test("should write byte arrays correctly") {
+    val testData = "Hello, World!".getBytes
+    writeAndVerify("test/array-write.txt", testData)
+  }
+
+  test("should handle partial writes with offset and length") {
+    val testData = "Hello, World!".getBytes
+    val bigObject = createBigObject("test/partial-write.txt")
+
+    withStream(bigObject) { stream =>
+      stream.write(testData, 0, 5) // "Hello"
+      stream.write(testData, 7, 5) // "World"
+    }
+
+    assert(new String(readBack(bigObject)) == "HelloWorld")
+  }
+
+  test("should handle multiple consecutive writes") {
+    val bigObject = createBigObject("test/multiple-writes.txt")
+    withStream(bigObject) { stream =>
+      stream.write("Hello".getBytes)
+      stream.write(", ".getBytes)
+      stream.write("World!".getBytes)
+    }
+    assert(new String(readBack(bigObject)) == "Hello, World!")
+  }
+
+  // === Stream Lifecycle Tests ===
+  test("flush should not throw") {
+    val bigObject = createBigObject("test/flush.txt")
+    withStream(bigObject) { stream =>
+      stream.write("test".getBytes)
+      stream.flush()
+      stream.write(" data".getBytes)
+    }
+    assert(new String(readBack(bigObject)) == "test data")
+  }
+
+  test("close should be idempotent") {
+    val bigObject = createBigObject("test/close-idempotent.txt")
+    val stream = new BigObjectOutputStream(bigObject)
+    stream.write("data".getBytes)
+    stream.close()
+    stream.close() // Should not throw
+    stream.flush() // Should not throw after close
+    assert(new String(readBack(bigObject)) == "data")
+  }
+
+  test("close should handle empty stream") {
+    val bigObject = createBigObject("test/empty-stream.txt")
+    val stream = new BigObjectOutputStream(bigObject)
+    stream.close()
+    assert(readBack(bigObject).length == 0)
+  }
+
+  // === Error Handling ===
+  test("write operations should throw IOException when stream is closed") {
+    val bigObject = createBigObject("test/closed-stream.txt")
+    val stream = new BigObjectOutputStream(bigObject)
+    stream.close()
+
+    val ex1 = intercept[IOException](stream.write('A'.toByte))
+    assert(ex1.getMessage.contains("Stream is closed"))
+
+    val ex2 = intercept[IOException](stream.write("test".getBytes))
+    assert(ex2.getMessage.contains("Stream is closed"))
+  }
+
+  // === Large Data Tests ===
+  test("should handle large data (1MB)") {
+    val largeData = generateRandomData(1024 * 1024)
+    writeAndVerify("test/large-1mb.bin", largeData)
+  }
+
+  test("should handle very large data (10MB)") {
+    val veryLargeData = generateRandomData(10 * 1024 * 1024)
+    writeAndVerify("test/large-10mb.bin", veryLargeData)
+  }
+
+  test("should handle chunked writes") {
+    val totalSize = 1024 * 1024 // 1MB
+    val chunkSize = 8 * 1024 // 8KB
+    val data = generateRandomData(totalSize)
+    val bigObject = createBigObject("test/chunked.bin")
+
+    withStream(bigObject) { stream =>
+      data.grouped(chunkSize).foreach(chunk => stream.write(chunk))
+    }
+
+    assert(readBack(bigObject).sameElements(data))
+  }
+
+  // === Binary Data Tests ===
+  test("should preserve all byte values (0-255)") {
+    val allBytes = (0 until 256).map(_.toByte).toArray
+    writeAndVerify("test/all-bytes.bin", allBytes)
+  }
+
+  // === Integration Tests ===
+  test("should handle concurrent writes to different objects") {
+    val streams = (1 to 3).map { i =>
+      val obj = createBigObject(s"test/concurrent-$i.txt")
+      val stream = new BigObjectOutputStream(obj)
+      (obj, stream, s"Data $i")
+    }
+
+    try {
+      streams.foreach { case (_, stream, data) => stream.write(data.getBytes) }
+    } finally {
+      streams.foreach(_._2.close())
+    }
+
+    streams.foreach {
+      case (obj, _, expected) =>
+        assert(new String(readBack(obj)) == expected)
+    }
+  }
+
+  test("should overwrite existing object") {
+    val bigObject = createBigObject("test/overwrite.txt")
+    withStream(bigObject)(_.write("original data".getBytes))
+    withStream(bigObject)(_.write("new data".getBytes))
+    assert(new String(readBack(bigObject)) == "new data")
+  }
+
+  test("should handle mixed write operations") {
+    val bigObject = createBigObject("test/mixed-writes.txt")
+    withStream(bigObject) { stream =>
+      stream.write('A'.toByte)
+      stream.write(" test ".getBytes)
+      stream.write('B'.toByte)
+      val data = "Hello, World!".getBytes
+      stream.write(data, 7, 6) // "World!"
+    }
+    assert(new String(readBack(bigObject)) == "A test BWorld!")
+  }
+
+  // === Edge Cases ===
+  test("should create bucket automatically") {
+    val newBucketName = s"new-bucket-${Random.nextInt(10000)}"
+    val bigObject = new BigObject(s"s3://$newBucketName/test/auto-create.txt")
+
+    try {
+      withStream(bigObject)(_.write("test".getBytes))
+      assert(new String(readBack(bigObject)) == "test")
+    } finally {
+      try S3StorageClient.deleteDirectory(newBucketName, "")
+      catch { case _: Exception => /* ignore */ }
+    }
+  }
+
+  test("should handle rapid open/close cycles") {
+    (1 to 10).foreach { i =>
+      
withStream(createBigObject(s"test/rapid-$i.txt"))(_.write(s"data-$i".getBytes))
+    }
+
+    (1 to 10).foreach { i =>
+      val result = readBack(createBigObject(s"test/rapid-$i.txt"))
+      assert(new String(result) == s"data-$i")
+    }
+  }
+}
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
new file mode 100644
index 0000000000..a1662cf8c3
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite
+
+import java.io.ByteArrayInputStream
+import scala.util.Random
+
+class S3StorageClientSpec
+    extends AnyFunSuite
+    with S3StorageTestBase
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach {
+
+  private val testBucketName = "test-s3-storage-client"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    S3StorageClient.createBucketIfNotExist(testBucketName)
+  }
+
+  override def afterAll(): Unit = {
+    // Clean up test bucket
+    try {
+      S3StorageClient.deleteDirectory(testBucketName, "")
+    } catch {
+      case _: Exception => // Ignore cleanup errors
+    }
+    super.afterAll()
+  }
+
+  // Helper methods
+  private def createInputStream(data: String): ByteArrayInputStream = {
+    new ByteArrayInputStream(data.getBytes)
+  }
+
+  private def createInputStream(data: Array[Byte]): ByteArrayInputStream = {
+    new ByteArrayInputStream(data)
+  }
+
+  private def readInputStream(inputStream: java.io.InputStream): Array[Byte] = 
{
+    val buffer = new Array[Byte](8192)
+    val outputStream = new java.io.ByteArrayOutputStream()
+    var bytesRead = 0
+    while ({
+      bytesRead = inputStream.read(buffer); bytesRead != -1
+    }) {
+      outputStream.write(buffer, 0, bytesRead)
+    }
+    outputStream.toByteArray
+  }
+
+  // ========================================
+  // uploadObject Tests
+  // ========================================
+
+  test("uploadObject should upload a small object successfully") {
+    val testData = "Hello, World! This is a small test object."
+    val objectKey = "test/small-object.txt"
+
+    val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(testData))
+
+    assert(eTag != null)
+    assert(eTag.nonEmpty)
+
+    // Clean up
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  test("uploadObject should upload an empty object") {
+    val objectKey = "test/empty-object.txt"
+
+    val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(""))
+
+    assert(eTag != null)
+
+    // Clean up
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  test("uploadObject should upload a large object using multipart upload") {
+    // Create data larger than MINIMUM_NUM_OF_MULTIPART_S3_PART (5MB)
+    val largeData = Array.fill[Byte](6 * 1024 * 1024)((Random.nextInt(256) - 
128).toByte)
+    val objectKey = "test/large-object.bin"
+
+    val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(largeData))
+
+    assert(eTag != null)
+    assert(eTag.nonEmpty)
+
+    // Verify the uploaded content
+    val downloadedStream = S3StorageClient.downloadObject(testBucketName, 
objectKey)
+    val downloadedData = readInputStream(downloadedStream)
+    downloadedStream.close()
+
+    assert(downloadedData.length == largeData.length)
+    assert(downloadedData.sameElements(largeData))
+
+    // Clean up
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  test("uploadObject should handle objects with special characters in key") {
+    val testData = "Testing special characters"
+    val objectKey = "test/special-chars/file with spaces & symbols!@#.txt"
+
+    val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(testData))
+
+    assert(eTag != null)
+
+    // Clean up
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  test("uploadObject should overwrite existing object") {
+    val objectKey = "test/overwrite-test.txt"
+    val data1 = "Original data"
+    val data2 = "Updated data"
+
+    S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(data1))
+    val eTag2 = S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(data2))
+
+    assert(eTag2 != null)
+
+    val downloadedStream = S3StorageClient.downloadObject(testBucketName, 
objectKey)
+    val downloadedData = new String(readInputStream(downloadedStream))
+    downloadedStream.close()
+
+    assert(downloadedData == data2)
+
+    // Clean up
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  // ========================================
+  // downloadObject Tests
+  // ========================================
+
+  test("downloadObject should download an object successfully") {
+    val testData = "This is test data for download."
+    val objectKey = "test/download-test.txt"
+
+    S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(testData))
+
+    val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey)
+    val downloadedData = new String(readInputStream(inputStream))
+    inputStream.close()
+
+    assert(downloadedData == testData)
+
+    // Clean up
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  test("downloadObject should download large objects correctly") {
+    val largeData = Array.fill[Byte](10 * 1024 * 1024)((Random.nextInt(256) - 
128).toByte)
+    val objectKey = "test/large-download-test.bin"
+
+    S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(largeData))
+
+    val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey)
+    val downloadedData = readInputStream(inputStream)
+    inputStream.close()
+
+    assert(downloadedData.length == largeData.length)
+    assert(downloadedData.sameElements(largeData))
+
+    // Clean up
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  test("downloadObject should download empty objects") {
+    val objectKey = "test/empty-download-test.txt"
+
+    S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(""))
+
+    val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey)
+    val downloadedData = readInputStream(inputStream)
+    inputStream.close()
+
+    assert(downloadedData.isEmpty)
+
+    // Clean up
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  test("downloadObject should throw exception for non-existent object") {
+    val nonExistentKey = "test/non-existent-object.txt"
+
+    assertThrows[Exception] {
+      S3StorageClient.downloadObject(testBucketName, nonExistentKey)
+    }
+  }
+
+  test("downloadObject should handle binary data correctly") {
+    val binaryData = Array[Byte](0, 1, 2, 127, -128, -1, 64, 32, 16, 8, 4, 2, 
1)
+    val objectKey = "test/binary-data.bin"
+
+    S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(binaryData))
+
+    val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey)
+    val downloadedData = readInputStream(inputStream)
+    inputStream.close()
+
+    assert(downloadedData.sameElements(binaryData))
+
+    // Clean up
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  // ========================================
+  // deleteObject Tests
+  // ========================================
+
+  test("deleteObject should delete an existing object") {
+    val objectKey = "test/delete-test.txt"
+    S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream("delete me"))
+
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+
+    // Verify deletion by attempting to download
+    assertThrows[Exception] {
+      S3StorageClient.downloadObject(testBucketName, objectKey)
+    }
+  }
+
+  test("deleteObject should not throw exception for non-existent object") {
+    val nonExistentKey = "test/already-deleted.txt"
+
+    // Should not throw exception
+    S3StorageClient.deleteObject(testBucketName, nonExistentKey)
+  }
+
+  test("deleteObject should delete large objects") {
+    val largeData = Array.fill[Byte](7 * 1024 * 1024)((Random.nextInt(256) - 
128).toByte)
+    val objectKey = "test/large-delete-test.bin"
+
+    S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(largeData))
+
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+
+    // Verify deletion by attempting to download
+    assertThrows[Exception] {
+      S3StorageClient.downloadObject(testBucketName, objectKey)
+    }
+  }
+
+  test("deleteObject should handle multiple deletions of the same object") {
+    val objectKey = "test/multi-delete-test.txt"
+    S3StorageClient.uploadObject(
+      testBucketName,
+      objectKey,
+      createInputStream("delete multiple times")
+    )
+
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+
+    // Second delete should not throw exception
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  // ========================================
+  // Integration Tests (combining methods)
+  // ========================================
+
+  test("upload, download, and delete workflow should work correctly") {
+    val testData = "Complete workflow test data"
+    val objectKey = "test/workflow-test.txt"
+
+    // Upload
+    val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(testData))
+    assert(eTag != null)
+
+    // Download
+    val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey)
+    val downloadedData = new String(readInputStream(inputStream))
+    inputStream.close()
+    assert(downloadedData == testData)
+
+    // Delete
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+
+  test("multiple objects can be managed independently") {
+    val objects = Map(
+      "test/object1.txt" -> "Data for object 1",
+      "test/object2.txt" -> "Data for object 2",
+      "test/object3.txt" -> "Data for object 3"
+    )
+
+    // Upload all objects
+    objects.foreach {
+      case (key, data) =>
+        S3StorageClient.uploadObject(testBucketName, key, 
createInputStream(data))
+    }
+
+    // Delete one object
+    S3StorageClient.deleteObject(testBucketName, "test/object2.txt")
+
+    // Clean up remaining objects
+    S3StorageClient.deleteObject(testBucketName, "test/object1.txt")
+    S3StorageClient.deleteObject(testBucketName, "test/object3.txt")
+  }
+
+  test("objects with nested paths should be handled correctly") {
+    val objectKey = "test/deeply/nested/path/to/object.txt"
+    val testData = "Nested path test"
+
+    S3StorageClient.uploadObject(testBucketName, objectKey, 
createInputStream(testData))
+
+    val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey)
+    val downloadedData = new String(readInputStream(inputStream))
+    inputStream.close()
+    assert(downloadedData == testData)
+
+    S3StorageClient.deleteObject(testBucketName, objectKey)
+  }
+}
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageTestBase.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageTestBase.scala
new file mode 100644
index 0000000000..ad80e6c40e
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageTestBase.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import com.dimafeng.testcontainers.MinIOContainer
+import org.apache.amber.config.StorageConfig
+import org.scalatest.{BeforeAndAfterAll, Suite}
+import org.testcontainers.utility.DockerImageName
+
+/**
+  * Base trait for tests requiring S3 storage (MinIO).
+  * Provides access to a single shared MinIO container across all test suites.
+  *
+  * Usage: Mix this trait into any test suite that needs S3 storage.
+  */
+trait S3StorageTestBase extends BeforeAndAfterAll { this: Suite =>
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    // Trigger lazy initialization of shared container
+    S3StorageTestBase.ensureContainerStarted()
+  }
+}
+
+object S3StorageTestBase {
+  private lazy val container: MinIOContainer = {
+    val c = MinIOContainer(
+      dockerImageName = 
DockerImageName.parse("minio/minio:RELEASE.2025-02-28T09-55-16Z"),
+      userName = "texera_minio",
+      password = "password"
+    )
+    c.start()
+
+    val endpoint = s"http://${c.host}:${c.mappedPort(9000)}"
+    StorageConfig.s3Endpoint = endpoint
+
+    println(s"[S3Storage] Started shared MinIO at $endpoint")
+
+    sys.addShutdownHook {
+      println("[S3Storage] Stopping shared MinIO...")
+      c.stop()
+    }
+
+    c
+  }
+
+  /** Ensures the container is started (triggers lazy initialization). */
+  def ensureContainerStarted(): Unit = {
+    container // Access lazy val to trigger initialization
+    ()
+  }
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java
index 84e3de95a6..aa198a9d2d 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java
@@ -30,7 +30,8 @@ public enum FileAttributeType {
     DOUBLE("double", AttributeType.DOUBLE),
     BOOLEAN("boolean", AttributeType.BOOLEAN),
     TIMESTAMP("timestamp", AttributeType.TIMESTAMP),
-    BINARY("binary", AttributeType.BINARY);
+    BINARY("binary", AttributeType.BINARY),
+    BIG_OBJECT("big object", AttributeType.BIG_OBJECT);
 
 
     private final String name;
@@ -56,6 +57,6 @@ public enum FileAttributeType {
     }
 
     public boolean isSingle() {
-        return this == SINGLE_STRING || this == BINARY;
+        return this == SINGLE_STRING || this == BINARY || this == BIG_OBJECT;
     }
 }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
index 2124c9da43..c039b6e2d8 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
@@ -22,8 +22,9 @@ package org.apache.amber.operator.source.scan
 import org.apache.amber.core.executor.SourceOperatorExecutor
 import org.apache.amber.core.storage.DocumentFactory
 import org.apache.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.amber.core.tuple.TupleLike
+import org.apache.amber.core.tuple.{BigObject, TupleLike}
 import org.apache.amber.util.JSONUtils.objectMapper
+import org.apache.texera.service.util.BigObjectOutputStream
 import org.apache.commons.compress.archivers.ArchiveStreamFactory
 import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
 import org.apache.commons.io.IOUtils.toByteArray
@@ -84,6 +85,21 @@ class FileScanSourceOpExec private[scan] (
             fields.addOne(desc.attributeType match {
               case FileAttributeType.SINGLE_STRING =>
                 new String(toByteArray(entry), desc.fileEncoding.getCharset)
+              case FileAttributeType.BIG_OBJECT =>
+                // For big objects, create reference and upload via streaming
+                val bigObject = new BigObject()
+                val out = new BigObjectOutputStream(bigObject)
+                try {
+                  val buffer = new Array[Byte](8192)
+                  var bytesRead = entry.read(buffer)
+                  while (bytesRead != -1) {
+                    out.write(buffer, 0, bytesRead)
+                    bytesRead = entry.read(buffer)
+                  }
+                } finally {
+                  out.close()
+                }
+                bigObject
               case _ => parseField(toByteArray(entry), 
desc.attributeType.getType)
             })
             TupleLike(fields.toSeq: _*)
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
new file mode 100644
index 0000000000..07b09f0a26
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.source.scan
+
+import org.apache.amber.core.tuple.{AttributeType, BigObject, Schema, 
SchemaEnforceable}
+import org.apache.amber.util.JSONUtils.objectMapper
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.io.{BufferedOutputStream, FileOutputStream}
+import java.net.URI
+import java.nio.file.{Files, Path}
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+/**
+  * Unit tests for BIG_OBJECT logic in FileScanSourceOpExec.
+  * Full integration tests with S3 and database are in BigObjectManagerSpec.
+  */
+class FileScanSourceOpExecSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+  private val testDir = Path
+    .of(sys.env.getOrElse("TEXERA_HOME", "."))
+    .resolve("common/workflow-operator/src/test/resources")
+    .toRealPath()
+
+  private val testFile = testDir.resolve("test_big_object.txt")
+  private val testZip = testDir.resolve("test_big_object.zip")
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    Files.write(testFile, "Test content\nLine 2\nLine 3".getBytes)
+    createZipFile(testZip, Map("file1.txt" -> "Content 1", "file2.txt" -> 
"Content 2"))
+  }
+
+  override def afterAll(): Unit = {
+    Files.deleteIfExists(testFile)
+    Files.deleteIfExists(testZip)
+    super.afterAll()
+  }
+
+  private def createZipFile(path: Path, entries: Map[String, String]): Unit = {
+    val zipOut = new ZipOutputStream(new BufferedOutputStream(new 
FileOutputStream(path.toFile)))
+    try {
+      entries.foreach {
+        case (name, content) =>
+          zipOut.putNextEntry(new ZipEntry(name))
+          zipOut.write(content.getBytes)
+          zipOut.closeEntry()
+      }
+    } finally {
+      zipOut.close()
+    }
+  }
+
+  private def createDescriptor(
+      file: Path = testFile,
+      attributeName: String = "line"
+  ): FileScanSourceOpDesc = {
+    val desc = new FileScanSourceOpDesc()
+    desc.fileName = Some(file.toString)
+    desc.attributeType = FileAttributeType.BIG_OBJECT
+    desc.attributeName = attributeName
+    desc.fileEncoding = FileDecodingMethod.UTF_8
+    desc
+  }
+
+  private def assertSchema(schema: Schema, attributeName: String): Unit = {
+    assert(schema.getAttributes.length == 1)
+    assert(schema.getAttribute(attributeName).getType == 
AttributeType.BIG_OBJECT)
+  }
+
+  // Schema Tests
+  it should "infer BIG_OBJECT schema with default attribute name" in {
+    assertSchema(createDescriptor().sourceSchema(), "line")
+  }
+
+  it should "infer BIG_OBJECT schema with custom attribute name" in {
+    assertSchema(createDescriptor(attributeName = 
"custom_field").sourceSchema(), "custom_field")
+  }
+
+  it should "map BIG_OBJECT to correct AttributeType" in {
+    assert(FileAttributeType.BIG_OBJECT.getType == AttributeType.BIG_OBJECT)
+  }
+
+  // Type Classification Tests
+  it should "correctly classify BIG_OBJECT as isSingle type" in {
+    val isSingleTypes = List(
+      FileAttributeType.BIG_OBJECT,
+      FileAttributeType.SINGLE_STRING,
+      FileAttributeType.BINARY
+    )
+    val multiLineTypes = List(
+      FileAttributeType.STRING,
+      FileAttributeType.INTEGER,
+      FileAttributeType.LONG,
+      FileAttributeType.DOUBLE,
+      FileAttributeType.BOOLEAN,
+      FileAttributeType.TIMESTAMP
+    )
+
+    isSingleTypes.foreach(t => assert(t.isSingle, s"$t should be isSingle"))
+    multiLineTypes.foreach(t => assert(!t.isSingle, s"$t should not be 
isSingle"))
+  }
+
+  // Execution Tests
+  it should "create BigObject when reading file with BIG_OBJECT type" in {
+    val desc = createDescriptor()
+    desc.setResolvedFileName(URI.create(testFile.toUri.toString))
+
+    val executor = new 
FileScanSourceOpExec(objectMapper.writeValueAsString(desc))
+
+    try {
+      executor.open()
+      val tuples = executor.produceTuple().toSeq
+      executor.close()
+
+      assert(tuples.size == 1)
+      val field = tuples.head
+        .asInstanceOf[SchemaEnforceable]
+        .enforceSchema(desc.sourceSchema())
+        .getField[Any]("line")
+
+      assert(field.isInstanceOf[BigObject])
+      assert(field.asInstanceOf[BigObject].getUri.startsWith("s3://"))
+    } catch {
+      case e: Exception =>
+        info(s"S3 not configured: ${e.getMessage}")
+    }
+  }
+
+  // BigObject Tests
+  it should "create valid BigObject with correct URI parsing" in {
+    val pointer = new BigObject("s3://bucket/path/to/object")
+
+    assert(pointer.getUri == "s3://bucket/path/to/object")
+    assert(pointer.getBucketName == "bucket")
+    assert(pointer.getObjectKey == "path/to/object")
+  }
+
+  it should "reject invalid BigObject URIs" in {
+    assertThrows[IllegalArgumentException](new BigObject("http://invalid";))
+    assertThrows[IllegalArgumentException](new BigObject("not-a-uri"))
+    assertThrows[IllegalArgumentException](new 
BigObject(null.asInstanceOf[String]))
+  }
+}
diff --git a/file-service/build.sbt b/file-service/build.sbt
index 68ac82e6b3..34b30472e0 100644
--- a/file-service/build.sbt
+++ b/file-service/build.sbt
@@ -84,7 +84,4 @@ libraryDependencies ++= Seq(
   "jakarta.ws.rs" % "jakarta.ws.rs-api" % "3.1.0", // Ensure Jakarta JAX-RS 
API is available
   "org.bitbucket.b_c" % "jose4j" % "0.9.6",
   "org.playframework" %% "play-json" % "3.1.0-M1",
-  "software.amazon.awssdk" % "s3" % "2.29.51",
-  "software.amazon.awssdk" % "auth" % "2.29.51",
-  "software.amazon.awssdk" % "regions" % "2.29.51",
 )

Reply via email to