This is an automated email from the ASF dual-hosted git repository.
kunwp1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 60a7822bcf style: Rename BigObject to LargeBinary (#4111)
60a7822bcf is described below
commit 60a7822bcff10f30cd8bafa0d2ac2b8c1f68bc3d
Author: Chris <[email protected]>
AuthorDate: Mon Dec 8 11:12:29 2025 -0800
style: Rename BigObject to LargeBinary (#4111)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR renames the `BigObject` type to `LargeBinary`. The original
feature was introduced in #4067, but we decided to adopt the
`LargeBinary` terminology to align with naming conventions used in other
systems (e.g., Arrow).
This change is purely a renaming/terminology update and does not modify
the underlying functionality.
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
https://github.com/apache/texera/pull/4100#issuecomment-3605021751
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
Run this workflow and check if the workflow runs successfully and see if
three objects are created in MinIO console.
[Java
UDF.json](https://github.com/user-attachments/files/23976766/Java.UDF.json)
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
No.
---------
Signed-off-by: Chris <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
.../dashboard/user/workflow/WorkflowResource.scala | 4 +-
.../texera/web/service/WorkflowService.scala | 6 +-
.../texera/amber/core/tuple/AttributeType.java | 6 +-
.../amber/core/tuple/AttributeTypeUtils.scala | 22 +-
.../tuple/{BigObject.java => LargeBinary.java} | 34 +-
.../org/apache/texera/amber/util/IcebergUtil.scala | 71 ++--
...utStream.scala => LargeBinaryInputStream.scala} | 14 +-
...bjectManager.scala => LargeBinaryManager.scala} | 18 +-
...tStream.scala => LargeBinaryOutputStream.scala} | 24 +-
.../amber/core/tuple/AttributeTypeUtilsSpec.scala | 20 +-
.../apache/texera/amber/util/IcebergUtilSpec.scala | 67 +--
.../texera/service/util/BigObjectManagerSpec.scala | 471 ---------------------
...Spec.scala => LargeBinaryInputStreamSpec.scala} | 130 +++---
.../service/util/LargeBinaryManagerSpec.scala | 471 +++++++++++++++++++++
...pec.scala => LargeBinaryOutputStreamSpec.scala} | 104 ++---
.../operator/source/scan/FileAttributeType.java | 4 +-
.../source/scan/FileScanSourceOpExec.scala | 14 +-
.../source/scan/FileScanSourceOpExecSpec.scala | 46 +-
18 files changed, 765 insertions(+), 761 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
index 0f66388c85..4248d06bdd 100644
---
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
+++
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
@@ -36,7 +36,7 @@ import org.apache.texera.dao.jooq.generated.tables.daos.{
WorkflowUserAccessDao
}
import org.apache.texera.dao.jooq.generated.tables.pojos._
-import org.apache.texera.service.util.BigObjectManager
+import org.apache.texera.service.util.LargeBinaryManager
import org.apache.texera.web.resource.dashboard.hub.EntityType
import
org.apache.texera.web.resource.dashboard.hub.HubResource.recordCloneAction
import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowAccessResource.hasReadAccess
@@ -601,7 +601,7 @@ class WorkflowResource extends LazyLogging {
.asScala
.toList
- BigObjectManager.deleteAllObjects()
+ LargeBinaryManager.deleteAllObjects()
// Collect all URIs related to executions for cleanup
val uris = eids.flatMap { eid =>
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index ae67407a81..ee7cbe5544 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -49,7 +49,7 @@ import org.apache.texera.amber.error.ErrorUtils.{
getStackTraceWithAllCauses
}
import org.apache.texera.dao.jooq.generated.tables.pojos.User
-import org.apache.texera.service.util.BigObjectManager
+import org.apache.texera.service.util.LargeBinaryManager
import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
@@ -348,7 +348,7 @@ class WorkflowService(
logger.debug(s"Error processing document at $uri:
${error.getMessage}")
}
}
- // Delete big objects
- BigObjectManager.deleteAllObjects()
+ // Delete large binaries
+ LargeBinaryManager.deleteAllObjects()
}
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeType.java
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeType.java
index fa8f93ea9e..61abd741e8 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeType.java
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeType.java
@@ -70,7 +70,7 @@ public enum AttributeType implements Serializable {
BOOLEAN("boolean", Boolean.class),
TIMESTAMP("timestamp", Timestamp.class),
BINARY("binary", byte[].class),
- BIG_OBJECT("big_object", BigObject.class),
+ LARGE_BINARY("large_binary", LargeBinary.class),
ANY("ANY", Object.class);
private final String name;
@@ -110,8 +110,8 @@ public enum AttributeType implements Serializable {
return TIMESTAMP;
} else if (fieldClass.equals(byte[].class)) {
return BINARY;
- } else if (fieldClass.equals(BigObject.class)) {
- return BIG_OBJECT;
+ } else if (fieldClass.equals(LargeBinary.class)) {
+ return LARGE_BINARY;
} else {
return ANY;
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
index 41c04c41e8..efb119e664 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
@@ -121,15 +121,15 @@ object AttributeTypeUtils extends Serializable {
): Any = {
if (field == null) return null
attributeType match {
- case AttributeType.INTEGER => parseInteger(field, force)
- case AttributeType.LONG => parseLong(field, force)
- case AttributeType.DOUBLE => parseDouble(field)
- case AttributeType.BOOLEAN => parseBoolean(field)
- case AttributeType.TIMESTAMP => parseTimestamp(field)
- case AttributeType.STRING => field.toString
- case AttributeType.BINARY => field
- case AttributeType.BIG_OBJECT => new BigObject(field.toString)
- case AttributeType.ANY | _ => field
+ case AttributeType.INTEGER => parseInteger(field, force)
+ case AttributeType.LONG => parseLong(field, force)
+ case AttributeType.DOUBLE => parseDouble(field)
+ case AttributeType.BOOLEAN => parseBoolean(field)
+ case AttributeType.TIMESTAMP => parseTimestamp(field)
+ case AttributeType.STRING => field.toString
+ case AttributeType.BINARY => field
+ case AttributeType.LARGE_BINARY => new LargeBinary(field.toString)
+ case AttributeType.ANY | _ => field
}
}
@@ -384,8 +384,8 @@ object AttributeTypeUtils extends Serializable {
case AttributeType.INTEGER => tryParseInteger(fieldValue)
case AttributeType.TIMESTAMP => tryParseTimestamp(fieldValue)
case AttributeType.BINARY => tryParseString()
- case AttributeType.BIG_OBJECT =>
- AttributeType.BIG_OBJECT // Big objects are never inferred from data
+ case AttributeType.LARGE_BINARY =>
+ AttributeType.LARGE_BINARY // Large binaries are never inferred from
data
case _ => tryParseString()
}
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/BigObject.java
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/LargeBinary.java
similarity index 70%
rename from
common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/BigObject.java
rename to
common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/LargeBinary.java
index 4feb5673ab..9e8c4db870 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/BigObject.java
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/LargeBinary.java
@@ -23,56 +23,56 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.texera.amber.core.executor.OperatorExecutor;
-import org.apache.texera.service.util.BigObjectManager;
+import org.apache.texera.service.util.LargeBinaryManager;
import java.net.URI;
import java.util.Objects;
/**
- * BigObject represents a reference to a large object stored in S3.
+ * LargeBinary represents a reference to a large object stored in S3.
*
- * Each BigObject is identified by an S3 URI (s3://bucket/path/to/object).
- * BigObjects are automatically tracked and cleaned up when the workflow
execution completes.
+ * Each LargeBinary is identified by an S3 URI (s3://bucket/path/to/object).
+ * LargeBinaries are automatically tracked and cleaned up when the workflow
execution completes.
*/
-public class BigObject {
+public class LargeBinary {
private final String uri;
/**
- * Creates a BigObject from an existing S3 URI.
+ * Creates a LargeBinary from an existing S3 URI.
* Used primarily for deserialization from JSON.
*
* @param uri S3 URI in the format s3://bucket/path/to/object
* @throws IllegalArgumentException if URI is null or doesn't start with
"s3://"
*/
@JsonCreator
- public BigObject(@JsonProperty("uri") String uri) {
+ public LargeBinary(@JsonProperty("uri") String uri) {
if (uri == null) {
- throw new IllegalArgumentException("BigObject URI cannot be null");
+ throw new IllegalArgumentException("LargeBinary URI cannot be
null");
}
if (!uri.startsWith("s3://")) {
throw new IllegalArgumentException(
- "BigObject URI must start with 's3://', got: " + uri
+ "LargeBinary URI must start with 's3://', got: " + uri
);
}
this.uri = uri;
}
/**
- * Creates a new BigObject for writing data.
+ * Creates a new LargeBinary for writing data.
* Generates a unique S3 URI.
*
* Usage example:
*
- * BigObject bigObject = new BigObject();
- * try (BigObjectOutputStream out = new
BigObjectOutputStream(bigObject)) {
+ * LargeBinary largeBinary = new LargeBinary();
+ * try (LargeBinaryOutputStream out = new
LargeBinaryOutputStream(largeBinary)) {
* out.write(data);
* }
- * // bigObject is now ready to be added to tuples
+ * // largeBinary is now ready to be added to tuples
*
*/
- public BigObject() {
- this(BigObjectManager.create());
+ public LargeBinary() {
+ this(LargeBinaryManager.create());
}
@JsonValue
@@ -97,8 +97,8 @@ public class BigObject {
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
- if (!(obj instanceof BigObject)) return false;
- BigObject that = (BigObject) obj;
+ if (!(obj instanceof LargeBinary)) return false;
+ LargeBinary that = (LargeBinary) obj;
return Objects.equals(uri, that.uri);
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
index 19ed81efa4..ad6ac07c1f 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
@@ -20,7 +20,7 @@
package org.apache.texera.amber.util
import org.apache.texera.amber.config.StorageConfig
-import org.apache.texera.amber.core.tuple.{Attribute, AttributeType,
BigObject, Schema, Tuple}
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType,
LargeBinary, Schema, Tuple}
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.data.parquet.GenericParquetReaders
@@ -52,8 +52,8 @@ import scala.jdk.CollectionConverters._
*/
object IcebergUtil {
- // Unique suffix for BIG_OBJECT field encoding
- private val BIG_OBJECT_FIELD_SUFFIX = "__texera_big_obj_ptr"
+ // Unique suffix for LARGE_BINARY field encoding
+ private val LARGE_BINARY_FIELD_SUFFIX = "__texera_large_binary_ptr"
/**
* Creates and initializes a HadoopCatalog with the given parameters.
@@ -203,7 +203,7 @@ object IcebergUtil {
/**
* Converts a custom Amber `Schema` to an Iceberg `Schema`.
- * Field names are encoded to preserve BIG_OBJECT type information.
+ * Field names are encoded to preserve LARGE_BINARY type information.
*
* @param amberSchema The custom Amber Schema.
* @return An Iceberg Schema.
@@ -211,7 +211,7 @@ object IcebergUtil {
def toIcebergSchema(amberSchema: Schema): IcebergSchema = {
val icebergFields = amberSchema.getAttributes.zipWithIndex.map {
case (attribute, index) =>
- val encodedName = encodeBigObjectFieldName(attribute.getName,
attribute.getType)
+ val encodedName = encodeLargeBinaryFieldName(attribute.getName,
attribute.getType)
val icebergType = toIcebergType(attribute.getType)
Types.NestedField.optional(index + 1, encodedName, icebergType)
}
@@ -220,7 +220,7 @@ object IcebergUtil {
/**
* Converts a custom Amber `AttributeType` to an Iceberg `Type`.
- * Note: BIG_OBJECT is stored as StringType; field name encoding is used to
distinguish it.
+ * Note: LARGE_BINARY is stored as StringType; field name encoding is used
to distinguish it.
*
* @param attributeType The custom Amber AttributeType.
* @return The corresponding Iceberg Type.
@@ -234,8 +234,8 @@ object IcebergUtil {
case AttributeType.BOOLEAN => Types.BooleanType.get()
case AttributeType.TIMESTAMP => Types.TimestampType.withoutZone()
case AttributeType.BINARY => Types.BinaryType.get()
- case AttributeType.BIG_OBJECT =>
- Types.StringType.get() // Store BigObjectPointer URI as string
+ case AttributeType.LARGE_BINARY =>
+ Types.StringType.get() // Store LargeBinary URI as string
case AttributeType.ANY =>
throw new IllegalArgumentException("ANY type is not supported in
Iceberg")
}
@@ -252,13 +252,13 @@ object IcebergUtil {
tuple.schema.getAttributes.zipWithIndex.foreach {
case (attribute, index) =>
- val fieldName = encodeBigObjectFieldName(attribute.getName,
attribute.getType)
+ val fieldName = encodeLargeBinaryFieldName(attribute.getName,
attribute.getType)
val value = tuple.getField[AnyRef](index) match {
- case null => null
- case ts: Timestamp =>
ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
- case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
- case bigObjPtr: BigObject => bigObjPtr.getUri
- case other => other
+ case null => null
+ case ts: Timestamp =>
ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
+ case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
+ case largeBinaryPtr: LargeBinary => largeBinaryPtr.getUri
+ case other => other
}
record.setField(fieldName, value)
}
@@ -275,7 +275,7 @@ object IcebergUtil {
*/
def fromRecord(record: Record, amberSchema: Schema): Tuple = {
val fieldValues = amberSchema.getAttributes.map { attribute =>
- val fieldName = encodeBigObjectFieldName(attribute.getName,
attribute.getType)
+ val fieldName = encodeLargeBinaryFieldName(attribute.getName,
attribute.getType)
val rawValue = record.getField(fieldName)
rawValue match {
@@ -285,8 +285,8 @@ object IcebergUtil {
val bytes = new Array[Byte](buffer.remaining())
buffer.get(bytes)
bytes
- case uri: String if attribute.getType == AttributeType.BIG_OBJECT =>
- new BigObject(uri)
+ case uri: String if attribute.getType == AttributeType.LARGE_BINARY =>
+ new LargeBinary(uri)
case other => other
}
}
@@ -295,16 +295,19 @@ object IcebergUtil {
}
/**
- * Encodes a field name for BIG_OBJECT types by adding a unique system
suffix.
- * This ensures BIG_OBJECT fields can be identified when reading from
Iceberg.
+ * Encodes a field name for LARGE_BINARY types by adding a unique system
suffix.
+ * This ensures LARGE_BINARY fields can be identified when reading from
Iceberg.
*
* @param fieldName The original field name
* @param attributeType The attribute type
- * @return The encoded field name with a unique suffix for BIG_OBJECT types
+ * @return The encoded field name with a unique suffix for LARGE_BINARY
types
*/
- private def encodeBigObjectFieldName(fieldName: String, attributeType:
AttributeType): String = {
- if (attributeType == AttributeType.BIG_OBJECT) {
- s"${fieldName}${BIG_OBJECT_FIELD_SUFFIX}"
+ private def encodeLargeBinaryFieldName(
+ fieldName: String,
+ attributeType: AttributeType
+ ): String = {
+ if (attributeType == AttributeType.LARGE_BINARY) {
+ s"${fieldName}${LARGE_BINARY_FIELD_SUFFIX}"
} else {
fieldName
}
@@ -317,27 +320,27 @@ object IcebergUtil {
* @param fieldName The encoded field name
* @return The original field name with system suffix removed
*/
- private def decodeBigObjectFieldName(fieldName: String): String = {
- if (isBigObjectField(fieldName)) {
- fieldName.substring(0, fieldName.length - BIG_OBJECT_FIELD_SUFFIX.length)
+ private def decodeLargeBinaryFieldName(fieldName: String): String = {
+ if (isLargeBinaryField(fieldName)) {
+ fieldName.substring(0, fieldName.length -
LARGE_BINARY_FIELD_SUFFIX.length)
} else {
fieldName
}
}
/**
- * Checks if a field name indicates a BIG_OBJECT type by examining the
unique suffix.
+ * Checks if a field name indicates a LARGE_BINARY type by examining the
unique suffix.
*
* @param fieldName The field name to check
- * @return true if the field represents a BIG_OBJECT type, false otherwise
+ * @return true if the field represents a LARGE_BINARY type, false otherwise
*/
- private def isBigObjectField(fieldName: String): Boolean = {
- fieldName.endsWith(BIG_OBJECT_FIELD_SUFFIX)
+ private def isLargeBinaryField(fieldName: String): Boolean = {
+ fieldName.endsWith(LARGE_BINARY_FIELD_SUFFIX)
}
/**
* Converts an Iceberg `Schema` to an Amber `Schema`.
- * Field names are decoded to restore original names and detect BIG_OBJECT
types.
+ * Field names are decoded to restore original names and detect
LARGE_BINARY types.
*
* @param icebergSchema The Iceberg Schema.
* @return The corresponding Amber Schema.
@@ -349,7 +352,7 @@ object IcebergUtil {
.map { field =>
val fieldName = field.name()
val attributeType = fromIcebergType(field.`type`().asPrimitiveType(),
fieldName)
- val originalName = decodeBigObjectFieldName(fieldName)
+ val originalName = decodeLargeBinaryFieldName(fieldName)
new Attribute(originalName, attributeType)
}
.toList
@@ -361,7 +364,7 @@ object IcebergUtil {
* Converts an Iceberg `Type` to an Amber `AttributeType`.
*
* @param icebergType The Iceberg Type.
- * @param fieldName The field name (used to detect BIG_OBJECT by suffix).
+ * @param fieldName The field name (used to detect LARGE_BINARY by suffix).
* @return The corresponding Amber AttributeType.
*/
def fromIcebergType(
@@ -370,7 +373,7 @@ object IcebergUtil {
): AttributeType = {
icebergType match {
case _: Types.StringType =>
- if (isBigObjectField(fieldName)) AttributeType.BIG_OBJECT else
AttributeType.STRING
+ if (isLargeBinaryField(fieldName)) AttributeType.LARGE_BINARY else
AttributeType.STRING
case _: Types.IntegerType => AttributeType.INTEGER
case _: Types.LongType => AttributeType.LONG
case _: Types.DoubleType => AttributeType.DOUBLE
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryInputStream.scala
similarity index 83%
rename from
common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala
rename to
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryInputStream.scala
index 8a3da1d6b7..e2af371e77 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryInputStream.scala
@@ -19,31 +19,31 @@
package org.apache.texera.service.util
-import org.apache.texera.amber.core.tuple.BigObject
+import org.apache.texera.amber.core.tuple.LargeBinary
import java.io.InputStream
/**
- * InputStream for reading BigObject data from S3.
+ * InputStream for reading LargeBinary data from S3.
*
* The underlying S3 download is lazily initialized on first read.
* The stream will fail if the S3 object doesn't exist when read is attempted.
*
* Usage:
* {{{
- * val bigObject: BigObject = ...
- * try (val in = new BigObjectInputStream(bigObject)) {
+ * val largeBinary: LargeBinary = ...
+ * try (val in = new LargeBinaryInputStream(largeBinary)) {
* val bytes = in.readAllBytes()
* }
* }}}
*/
-class BigObjectInputStream(bigObject: BigObject) extends InputStream {
+class LargeBinaryInputStream(largeBinary: LargeBinary) extends InputStream {
- require(bigObject != null, "BigObject cannot be null")
+ require(largeBinary != null, "LargeBinary cannot be null")
// Lazy initialization - downloads only when first read() is called
private lazy val underlying: InputStream =
- S3StorageClient.downloadObject(bigObject.getBucketName,
bigObject.getObjectKey)
+ S3StorageClient.downloadObject(largeBinary.getBucketName,
largeBinary.getObjectKey)
@volatile private var closed = false
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
similarity index 72%
rename from
common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
rename to
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
index a6a273eb30..211d7d3b75 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
@@ -24,19 +24,19 @@ import com.typesafe.scalalogging.LazyLogging
import java.util.UUID
/**
- * Manages the lifecycle of BigObjects stored in S3.
+ * Manages the lifecycle of LargeBinaries stored in S3.
*
* Handles creation and deletion of large objects that exceed
* normal tuple size limits.
*/
-object BigObjectManager extends LazyLogging {
- private val DEFAULT_BUCKET = "texera-big-objects"
+object LargeBinaryManager extends LazyLogging {
+ private val DEFAULT_BUCKET = "texera-large-binaries"
/**
- * Creates a new BigObject reference.
- * The actual data upload happens separately via BigObjectOutputStream.
+ * Creates a new LargeBinary reference.
+ * The actual data upload happens separately via LargeBinaryOutputStream.
*
- * @return S3 URI string for the new BigObject (format: s3://bucket/key)
+ * @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
*/
def create(): String = {
S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET)
@@ -48,7 +48,7 @@ object BigObjectManager extends LazyLogging {
}
/**
- * Deletes all big objects from the bucket.
+ * Deletes all large binaries from the bucket.
*
* @throws Exception if the deletion fails
* @return Unit
@@ -56,10 +56,10 @@ object BigObjectManager extends LazyLogging {
def deleteAllObjects(): Unit = {
try {
S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects")
- logger.info(s"Successfully deleted all big objects from bucket:
$DEFAULT_BUCKET")
+ logger.info(s"Successfully deleted all large binaries from bucket:
$DEFAULT_BUCKET")
} catch {
case e: Exception =>
- logger.warn(s"Failed to delete big objects from bucket:
$DEFAULT_BUCKET", e)
+ logger.warn(s"Failed to delete large binaries from bucket:
$DEFAULT_BUCKET", e)
}
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryOutputStream.scala
similarity index 82%
rename from
common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala
rename to
common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryOutputStream.scala
index e0526107fa..ac6025146c 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryOutputStream.scala
@@ -20,7 +20,7 @@
package org.apache.texera.service.util
import com.typesafe.scalalogging.LazyLogging
-import org.apache.texera.amber.core.tuple.BigObject
+import org.apache.texera.amber.core.tuple.LargeBinary
import java.io.{IOException, OutputStream, PipedInputStream, PipedOutputStream}
import java.util.concurrent.atomic.AtomicReference
@@ -28,32 +28,32 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
/**
- * OutputStream for streaming BigObject data to S3.
+ * OutputStream for streaming LargeBinary data to S3.
*
* Data is uploaded in the background using multipart upload as you write.
* Call close() to complete the upload and ensure all data is persisted.
*
* Usage:
* {{{
- * val bigObject = new BigObject()
- * try (val out = new BigObjectOutputStream(bigObject)) {
+ * val largeBinary = new LargeBinary()
+ * try (val out = new LargeBinaryOutputStream(largeBinary)) {
* out.write(myBytes)
* }
- * // bigObject is now ready to use
+ * // largeBinary is now ready to use
* }}}
*
* Note: Not thread-safe. Do not access from multiple threads concurrently.
*
- * @param bigObject The BigObject reference to write to
+ * @param largeBinary The LargeBinary reference to write to
*/
-class BigObjectOutputStream(bigObject: BigObject) extends OutputStream with
LazyLogging {
+class LargeBinaryOutputStream(largeBinary: LargeBinary) extends OutputStream
with LazyLogging {
private val PIPE_BUFFER_SIZE = 64 * 1024 // 64KB
- require(bigObject != null, "BigObject cannot be null")
+ require(largeBinary != null, "LargeBinary cannot be null")
- private val bucketName: String = bigObject.getBucketName
- private val objectKey: String = bigObject.getObjectKey
+ private val bucketName: String = largeBinary.getBucketName
+ private val objectKey: String = largeBinary.getObjectKey
private implicit val ec: ExecutionContext = ExecutionContext.global
// Pipe: we write to pipedOut, and S3 reads from pipedIn
@@ -68,11 +68,11 @@ class BigObjectOutputStream(bigObject: BigObject) extends
OutputStream with Lazy
try {
S3StorageClient.createBucketIfNotExist(bucketName)
S3StorageClient.uploadObject(bucketName, objectKey, pipedIn)
- logger.debug(s"Upload completed: ${bigObject.getUri}")
+ logger.debug(s"Upload completed: ${largeBinary.getUri}")
} catch {
case e: Exception =>
uploadException.set(Some(e))
- logger.error(s"Upload failed: ${bigObject.getUri}", e)
+ logger.error(s"Upload failed: ${largeBinary.getUri}", e)
} finally {
pipedIn.close()
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
index e99c54d5d6..08b9774607 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
@@ -196,24 +196,24 @@ class AttributeTypeUtilsSpec extends AnyFunSuite {
assert(parseField("anything", AttributeType.ANY) == "anything")
}
- test("parseField correctly parses to BIG_OBJECT") {
- // Valid S3 URI strings are converted to BigObject
- val pointer1 = parseField("s3://bucket/path/to/object",
AttributeType.BIG_OBJECT)
- .asInstanceOf[BigObject]
+ test("parseField correctly parses to LARGE_BINARY") {
+ // Valid S3 URI strings are converted to LargeBinary
+ val pointer1 = parseField("s3://bucket/path/to/object",
AttributeType.LARGE_BINARY)
+ .asInstanceOf[LargeBinary]
assert(pointer1.getUri == "s3://bucket/path/to/object")
assert(pointer1.getBucketName == "bucket")
assert(pointer1.getObjectKey == "path/to/object")
// Null input returns null
- assert(parseField(null, AttributeType.BIG_OBJECT) == null)
+ assert(parseField(null, AttributeType.LARGE_BINARY) == null)
}
- test("BIG_OBJECT type is preserved but never inferred from data") {
- // BIG_OBJECT remains BIG_OBJECT when passed as typeSoFar
- assert(inferField(AttributeType.BIG_OBJECT, "any-value") ==
AttributeType.BIG_OBJECT)
- assert(inferField(AttributeType.BIG_OBJECT, null) ==
AttributeType.BIG_OBJECT)
+ test("LARGE_BINARY type is preserved but never inferred from data") {
+ // LARGE_BINARY remains LARGE_BINARY when passed as typeSoFar
+ assert(inferField(AttributeType.LARGE_BINARY, "any-value") ==
AttributeType.LARGE_BINARY)
+ assert(inferField(AttributeType.LARGE_BINARY, null) ==
AttributeType.LARGE_BINARY)
- // String data is inferred as STRING, never BIG_OBJECT
+ // String data is inferred as STRING, never LARGE_BINARY
assert(inferField("s3://bucket/path") == AttributeType.STRING)
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/IcebergUtilSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/IcebergUtilSpec.scala
index 59a04ad668..da15a8060d 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/IcebergUtilSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/IcebergUtilSpec.scala
@@ -19,7 +19,7 @@
package org.apache.texera.amber.util
-import org.apache.texera.amber.core.tuple.{AttributeType, BigObject, Schema,
Tuple}
+import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema,
Tuple}
import org.apache.texera.amber.util.IcebergUtil.toIcebergSchema
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.types.Types
@@ -200,90 +200,91 @@ class IcebergUtilSpec extends AnyFlatSpec {
assert(tuple.getField[Array[Byte]]("test-7") sameElements Array[Byte](1,
2, 3, 4))
}
- // BIG_OBJECT type tests
+ // LARGE_BINARY type tests
- it should "convert BIG_OBJECT type correctly between Texera and Iceberg" in {
- // BIG_OBJECT stored as StringType with field name suffix
- assert(IcebergUtil.toIcebergType(AttributeType.BIG_OBJECT) ==
Types.StringType.get())
+ it should "convert LARGE_BINARY type correctly between Texera and Iceberg"
in {
+ // LARGE_BINARY stored as StringType with field name suffix
+ assert(IcebergUtil.toIcebergType(AttributeType.LARGE_BINARY) ==
Types.StringType.get())
assert(IcebergUtil.fromIcebergType(Types.StringType.get(), "field") ==
AttributeType.STRING)
assert(
IcebergUtil.fromIcebergType(
Types.StringType.get(),
- "field__texera_big_obj_ptr"
- ) == AttributeType.BIG_OBJECT
+ "field__texera_large_binary_ptr"
+ ) == AttributeType.LARGE_BINARY
)
}
- it should "convert schemas with BIG_OBJECT fields correctly" in {
+ it should "convert schemas with LARGE_BINARY fields correctly" in {
val texeraSchema = Schema()
.add("id", AttributeType.INTEGER)
- .add("large_data", AttributeType.BIG_OBJECT)
+ .add("large_data", AttributeType.LARGE_BINARY)
val icebergSchema = IcebergUtil.toIcebergSchema(texeraSchema)
- // BIG_OBJECT field gets encoded name with suffix
- assert(icebergSchema.findField("large_data__texera_big_obj_ptr") != null)
+ // LARGE_BINARY field gets encoded name with suffix
+ assert(icebergSchema.findField("large_data__texera_large_binary_ptr") !=
null)
assert(
- icebergSchema.findField("large_data__texera_big_obj_ptr").`type`() ==
Types.StringType.get()
+ icebergSchema.findField("large_data__texera_large_binary_ptr").`type`()
== Types.StringType
+ .get()
)
// Round-trip preserves schema
val roundTripSchema = IcebergUtil.fromIcebergSchema(icebergSchema)
- assert(roundTripSchema.getAttribute("large_data").getType ==
AttributeType.BIG_OBJECT)
+ assert(roundTripSchema.getAttribute("large_data").getType ==
AttributeType.LARGE_BINARY)
}
- it should "convert tuples with BIG_OBJECT to records and back correctly" in {
+ it should "convert tuples with LARGE_BINARY to records and back correctly"
in {
val schema = Schema()
.add("id", AttributeType.INTEGER)
- .add("large_data", AttributeType.BIG_OBJECT)
+ .add("large_data", AttributeType.LARGE_BINARY)
val tuple = Tuple
.builder(schema)
- .addSequentially(Array(Int.box(42), new
BigObject("s3://bucket/object/key.data")))
+ .addSequentially(Array(Int.box(42), new
LargeBinary("s3://bucket/object/key.data")))
.build()
val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tuple)
- // BIG_OBJECT stored as URI string with encoded field name
+ // LARGE_BINARY stored as URI string with encoded field name
assert(record.getField("id") == 42)
- assert(record.getField("large_data__texera_big_obj_ptr") ==
"s3://bucket/object/key.data")
+ assert(record.getField("large_data__texera_large_binary_ptr") ==
"s3://bucket/object/key.data")
// Round-trip preserves data
val roundTripTuple = IcebergUtil.fromRecord(record, schema)
assert(roundTripTuple == tuple)
- // BigObject properties are accessible
- val bigObj = roundTripTuple.getField[BigObject]("large_data")
- assert(bigObj.getUri == "s3://bucket/object/key.data")
- assert(bigObj.getBucketName == "bucket")
- assert(bigObj.getObjectKey == "object/key.data")
+ // LargeBinary properties are accessible
+ val largeBinary = roundTripTuple.getField[LargeBinary]("large_data")
+ assert(largeBinary.getUri == "s3://bucket/object/key.data")
+ assert(largeBinary.getBucketName == "bucket")
+ assert(largeBinary.getObjectKey == "object/key.data")
}
- it should "handle null BIG_OBJECT values correctly" in {
- val schema = Schema().add("data", AttributeType.BIG_OBJECT)
+ it should "handle null LARGE_BINARY values correctly" in {
+ val schema = Schema().add("data", AttributeType.LARGE_BINARY)
val tupleWithNull =
Tuple.builder(schema).addSequentially(Array(null)).build()
val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema),
tupleWithNull)
- assert(record.getField("data__texera_big_obj_ptr") == null)
+ assert(record.getField("data__texera_large_binary_ptr") == null)
assert(IcebergUtil.fromRecord(record, schema) == tupleWithNull)
}
- it should "handle multiple BIG_OBJECT fields and mixed types correctly" in {
+ it should "handle multiple LARGE_BINARY fields and mixed types correctly" in
{
val schema = Schema()
.add("int_field", AttributeType.INTEGER)
- .add("big_obj_1", AttributeType.BIG_OBJECT)
+ .add("large_binary_1", AttributeType.LARGE_BINARY)
.add("string_field", AttributeType.STRING)
- .add("big_obj_2", AttributeType.BIG_OBJECT)
+ .add("large_binary_2", AttributeType.LARGE_BINARY)
val tuple = Tuple
.builder(schema)
.addSequentially(
Array(
Int.box(123),
- new BigObject("s3://bucket1/file1.dat"),
+ new LargeBinary("s3://bucket1/file1.dat"),
"normal string",
- null // null BIG_OBJECT
+ null // null LARGE_BINARY
)
)
.build()
@@ -291,9 +292,9 @@ class IcebergUtilSpec extends AnyFlatSpec {
val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tuple)
assert(record.getField("int_field") == 123)
- assert(record.getField("big_obj_1__texera_big_obj_ptr") ==
"s3://bucket1/file1.dat")
+ assert(record.getField("large_binary_1__texera_large_binary_ptr") ==
"s3://bucket1/file1.dat")
assert(record.getField("string_field") == "normal string")
- assert(record.getField("big_obj_2__texera_big_obj_ptr") == null)
+ assert(record.getField("large_binary_2__texera_large_binary_ptr") == null)
assert(IcebergUtil.fromRecord(record, schema) == tuple)
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala
deleted file mode 100644
index c40e87a4a8..0000000000
---
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.service.util
-
-import org.apache.texera.amber.core.tuple.BigObject
-import org.scalatest.funsuite.AnyFunSuite
-
-class BigObjectManagerSpec extends AnyFunSuite with S3StorageTestBase {
-
- /** Creates a big object from string data and returns it. */
- private def createBigObject(data: String): BigObject = {
- val bigObject = new BigObject()
- val out = new BigObjectOutputStream(bigObject)
- try {
- out.write(data.getBytes)
- } finally {
- out.close()
- }
- bigObject
- }
-
- /** Verifies standard bucket name. */
- private def assertStandardBucket(pointer: BigObject): Unit = {
- assert(pointer.getBucketName == "texera-big-objects")
- assert(pointer.getUri.startsWith("s3://texera-big-objects/"))
- }
-
- // ========================================
- // BigObjectInputStream Tests (Standard Java InputStream)
- // ========================================
-
- test("BigObjectInputStream should read all bytes from stream") {
- val data = "Hello, World! This is a test."
- val bigObject = createBigObject(data)
-
- val stream = new BigObjectInputStream(bigObject)
- assert(stream.readAllBytes().sameElements(data.getBytes))
- stream.close()
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectInputStream should read exact number of bytes") {
- val bigObject = createBigObject("0123456789ABCDEF")
-
- val stream = new BigObjectInputStream(bigObject)
- val result = stream.readNBytes(10)
-
- assert(result.length == 10)
- assert(result.sameElements("0123456789".getBytes))
- stream.close()
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectInputStream should handle reading more bytes than available")
{
- val data = "Short"
- val bigObject = createBigObject(data)
-
- val stream = new BigObjectInputStream(bigObject)
- val result = stream.readNBytes(100)
-
- assert(result.length == data.length)
- assert(result.sameElements(data.getBytes))
- stream.close()
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectInputStream should support standard single-byte read") {
- val bigObject = createBigObject("ABC")
-
- val stream = new BigObjectInputStream(bigObject)
- assert(stream.read() == 65) // 'A'
- assert(stream.read() == 66) // 'B'
- assert(stream.read() == 67) // 'C'
- assert(stream.read() == -1) // EOF
- stream.close()
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectInputStream should return -1 at EOF") {
- val bigObject = createBigObject("EOF")
-
- val stream = new BigObjectInputStream(bigObject)
- stream.readAllBytes() // Read all data
- assert(stream.read() == -1)
- stream.close()
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectInputStream should throw exception when reading from closed
stream") {
- val bigObject = createBigObject("test")
-
- val stream = new BigObjectInputStream(bigObject)
- stream.close()
-
- assertThrows[java.io.IOException](stream.read())
- assertThrows[java.io.IOException](stream.readAllBytes())
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectInputStream should handle multiple close calls") {
- val bigObject = createBigObject("test")
-
- val stream = new BigObjectInputStream(bigObject)
- stream.close()
- stream.close() // Should not throw
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectInputStream should read large data correctly") {
- val largeData = Array.fill[Byte](20000)((scala.util.Random.nextInt(256) -
128).toByte)
- val bigObject = new BigObject()
- val out = new BigObjectOutputStream(bigObject)
- try {
- out.write(largeData)
- } finally {
- out.close()
- }
-
- val stream = new BigObjectInputStream(bigObject)
- val result = stream.readAllBytes()
- assert(result.sameElements(largeData))
- stream.close()
-
- BigObjectManager.deleteAllObjects()
- }
-
- // ========================================
- // BigObjectManager Tests
- // ========================================
-
- test("BigObjectManager should create a big object") {
- val pointer = createBigObject("Test big object data")
-
- assertStandardBucket(pointer)
- }
-
- test("BigObjectInputStream should open and read a big object") {
- val data = "Hello from big object!"
- val pointer = createBigObject(data)
-
- val stream = new BigObjectInputStream(pointer)
- val readData = stream.readAllBytes()
- stream.close()
-
- assert(readData.sameElements(data.getBytes))
- }
-
- test("BigObjectInputStream should fail to open non-existent big object") {
- val fakeBigObject = new
BigObject("s3://texera-big-objects/nonexistent/file")
- val stream = new BigObjectInputStream(fakeBigObject)
-
- try {
- intercept[Exception] {
- stream.read()
- }
- } finally {
- try { stream.close() }
- catch { case _: Exception => }
- }
- }
-
- test("BigObjectManager should delete all big objects") {
- val pointer1 = new BigObject()
- val out1 = new BigObjectOutputStream(pointer1)
- try {
- out1.write("Object 1".getBytes)
- } finally {
- out1.close()
- }
-
- val pointer2 = new BigObject()
- val out2 = new BigObjectOutputStream(pointer2)
- try {
- out2.write("Object 2".getBytes)
- } finally {
- out2.close()
- }
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectManager should handle delete with no objects gracefully") {
- BigObjectManager.deleteAllObjects() // Should not throw exception
- }
-
- test("BigObjectManager should delete all objects") {
- val pointer1 = createBigObject("Test data")
- val pointer2 = createBigObject("Test data")
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectManager should create bucket if it doesn't exist") {
- val pointer = createBigObject("Test bucket creation")
-
- assertStandardBucket(pointer)
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectManager should handle large objects correctly") {
- val largeData = Array.fill[Byte](6 * 1024 *
1024)((scala.util.Random.nextInt(256) - 128).toByte)
- val pointer = new BigObject()
- val out = new BigObjectOutputStream(pointer)
- try {
- out.write(largeData)
- } finally {
- out.close()
- }
-
- val stream = new BigObjectInputStream(pointer)
- val readData = stream.readAllBytes()
- stream.close()
-
- assert(readData.sameElements(largeData))
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectManager should generate unique URIs for different objects") {
- val testData = "Unique URI test".getBytes
- val pointer1 = new BigObject()
- val out1 = new BigObjectOutputStream(pointer1)
- try {
- out1.write(testData)
- } finally {
- out1.close()
- }
-
- val pointer2 = new BigObject()
- val out2 = new BigObjectOutputStream(pointer2)
- try {
- out2.write(testData)
- } finally {
- out2.close()
- }
-
- assert(pointer1.getUri != pointer2.getUri)
- assert(pointer1.getObjectKey != pointer2.getObjectKey)
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectInputStream should handle multiple reads from the same big
object") {
- val data = "Multiple reads test data"
- val pointer = createBigObject(data)
-
- val stream1 = new BigObjectInputStream(pointer)
- val readData1 = stream1.readAllBytes()
- stream1.close()
-
- val stream2 = new BigObjectInputStream(pointer)
- val readData2 = stream2.readAllBytes()
- stream2.close()
-
- assert(readData1.sameElements(data.getBytes))
- assert(readData2.sameElements(data.getBytes))
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectManager should properly parse bucket name and object key from
big object") {
- val bigObject = createBigObject("URI parsing test")
-
- assertStandardBucket(bigObject)
- assert(bigObject.getObjectKey.nonEmpty)
- assert(!bigObject.getObjectKey.startsWith("/"))
-
- BigObjectManager.deleteAllObjects()
- }
-
- // ========================================
- // Object-Oriented API Tests
- // ========================================
-
- test("BigObject with BigObjectOutputStream should create a big object") {
- val data = "Test data for BigObject with BigObjectOutputStream"
-
- val bigObject = new BigObject()
- val out = new BigObjectOutputStream(bigObject)
- try {
- out.write(data.getBytes)
- } finally {
- out.close()
- }
-
- assertStandardBucket(bigObject)
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectInputStream constructor should read big object contents") {
- val data = "Test data for BigObjectInputStream constructor"
- val bigObject = createBigObject(data)
-
- val stream = new BigObjectInputStream(bigObject)
- val readData = stream.readAllBytes()
- stream.close()
-
- assert(readData.sameElements(data.getBytes))
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectOutputStream and BigObjectInputStream should work together
end-to-end") {
- val data = "End-to-end test data"
-
- // Create using streaming API
- val bigObject = new BigObject()
- val out = new BigObjectOutputStream(bigObject)
- try {
- out.write(data.getBytes)
- } finally {
- out.close()
- }
-
- // Read using standard constructor
- val stream = new BigObjectInputStream(bigObject)
- val readData = stream.readAllBytes()
- stream.close()
-
- assert(readData.sameElements(data.getBytes))
-
- BigObjectManager.deleteAllObjects()
- }
-
- // ========================================
- // BigObjectOutputStream Tests (New Symmetric API)
- // ========================================
-
- test("BigObjectOutputStream should write and upload data to S3") {
- val data = "Test data for BigObjectOutputStream"
-
- val bigObject = new BigObject()
- val outStream = new BigObjectOutputStream(bigObject)
- outStream.write(data.getBytes)
- outStream.close()
-
- assertStandardBucket(bigObject)
-
- // Verify data can be read back
- val inStream = new BigObjectInputStream(bigObject)
- val readData = inStream.readAllBytes()
- inStream.close()
-
- assert(readData.sameElements(data.getBytes))
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectOutputStream should create big object") {
- val data = "Database registration test"
-
- val bigObject = new BigObject()
- val outStream = new BigObjectOutputStream(bigObject)
- outStream.write(data.getBytes)
- outStream.close()
-
- assertStandardBucket(bigObject)
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectOutputStream should handle large data correctly") {
- val largeData = Array.fill[Byte](8 * 1024 *
1024)((scala.util.Random.nextInt(256) - 128).toByte)
-
- val bigObject = new BigObject()
- val outStream = new BigObjectOutputStream(bigObject)
- outStream.write(largeData)
- outStream.close()
-
- // Verify data integrity
- val inStream = new BigObjectInputStream(bigObject)
- val readData = inStream.readAllBytes()
- inStream.close()
-
- assert(readData.sameElements(largeData))
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectOutputStream should handle multiple writes") {
- val bigObject = new BigObject()
- val outStream = new BigObjectOutputStream(bigObject)
- outStream.write("Hello ".getBytes)
- outStream.write("World".getBytes)
- outStream.write("!".getBytes)
- outStream.close()
-
- val inStream = new BigObjectInputStream(bigObject)
- val readData = inStream.readAllBytes()
- inStream.close()
-
- assert(readData.sameElements("Hello World!".getBytes))
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectOutputStream should throw exception when writing to closed
stream") {
- val bigObject = new BigObject()
- val outStream = new BigObjectOutputStream(bigObject)
- outStream.write("test".getBytes)
- outStream.close()
-
- assertThrows[java.io.IOException](outStream.write("more".getBytes))
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObjectOutputStream should handle close() being called multiple
times") {
- val bigObject = new BigObject()
- val outStream = new BigObjectOutputStream(bigObject)
- outStream.write("test".getBytes)
- outStream.close()
- outStream.close() // Should not throw
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("New BigObject() constructor should create unique URIs") {
- val bigObject1 = new BigObject()
- val bigObject2 = new BigObject()
-
- assert(bigObject1.getUri != bigObject2.getUri)
- assert(bigObject1.getObjectKey != bigObject2.getObjectKey)
-
- BigObjectManager.deleteAllObjects()
- }
-
- test("BigObject() and BigObjectOutputStream API should be symmetric with
input") {
- val data = "Symmetric API test"
-
- // Write using new symmetric API
- val bigObject = new BigObject()
- val outStream = new BigObjectOutputStream(bigObject)
- outStream.write(data.getBytes)
- outStream.close()
-
- // Read using symmetric API
- val inStream = new BigObjectInputStream(bigObject)
- val readData = inStream.readAllBytes()
- inStream.close()
-
- assert(readData.sameElements(data.getBytes))
-
- BigObjectManager.deleteAllObjects()
- }
-}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryInputStreamSpec.scala
similarity index 68%
rename from
common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala
rename to
common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryInputStreamSpec.scala
index e964d0a1cc..0e307d8e42 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryInputStreamSpec.scala
@@ -19,20 +19,20 @@
package org.apache.texera.service.util
-import org.apache.texera.amber.core.tuple.BigObject
+import org.apache.texera.amber.core.tuple.LargeBinary
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
import java.io.{ByteArrayInputStream, IOException}
import scala.util.Random
-class BigObjectInputStreamSpec
+class LargeBinaryInputStreamSpec
extends AnyFunSuite
with S3StorageTestBase
with BeforeAndAfterAll
with BeforeAndAfterEach {
- private val testBucketName = "test-big-object-input-stream"
+ private val testBucketName = "test-large-binary-input-stream"
override def beforeAll(): Unit = {
super.beforeAll()
@@ -49,19 +49,19 @@ class BigObjectInputStreamSpec
}
// Helper methods
- private def createTestObject(key: String, data: Array[Byte]): BigObject = {
+ private def createTestObject(key: String, data: Array[Byte]): LargeBinary = {
S3StorageClient.uploadObject(testBucketName, key, new
ByteArrayInputStream(data))
- new BigObject(s"s3://$testBucketName/$key")
+ new LargeBinary(s"s3://$testBucketName/$key")
}
- private def createTestObject(key: String, data: String): BigObject =
+ private def createTestObject(key: String, data: String): LargeBinary =
createTestObject(key, data.getBytes)
private def generateRandomData(size: Int): Array[Byte] =
Array.fill[Byte](size)((Random.nextInt(256) - 128).toByte)
- private def withStream[T](bigObject: BigObject)(f: BigObjectInputStream =>
T): T = {
- val stream = new BigObjectInputStream(bigObject)
+ private def withStream[T](largeBinary: LargeBinary)(f:
LargeBinaryInputStream => T): T = {
+ val stream = new LargeBinaryInputStream(largeBinary)
try {
f(stream)
} finally {
@@ -69,31 +69,31 @@ class BigObjectInputStreamSpec
}
}
- private def assertThrowsIOExceptionWhenClosed(operation:
BigObjectInputStream => Unit): Unit = {
- val bigObject = createTestObject(s"test/closed-${Random.nextInt()}.txt",
"data")
- val stream = new BigObjectInputStream(bigObject)
+ private def assertThrowsIOExceptionWhenClosed(operation:
LargeBinaryInputStream => Unit): Unit = {
+ val largeBinary = createTestObject(s"test/closed-${Random.nextInt()}.txt",
"data")
+ val stream = new LargeBinaryInputStream(largeBinary)
stream.close()
val exception = intercept[IOException](operation(stream))
assert(exception.getMessage.contains("Stream is closed"))
}
// Constructor Tests
- test("constructor should reject null BigObject") {
+ test("constructor should reject null LargeBinary") {
val exception = intercept[IllegalArgumentException] {
- new BigObjectInputStream(null)
+ new LargeBinaryInputStream(null)
}
- assert(exception.getMessage.contains("BigObject cannot be null"))
+ assert(exception.getMessage.contains("LargeBinary cannot be null"))
}
- test("constructor should accept valid BigObject") {
- val bigObject = createTestObject("test/valid.txt", "test data")
- withStream(bigObject) { _ => }
+ test("constructor should accept valid LargeBinary") {
+ val largeBinary = createTestObject("test/valid.txt", "test data")
+ withStream(largeBinary) { _ => }
}
// read() Tests
test("read() should read single bytes correctly") {
- val bigObject = createTestObject("test/single-byte.txt", "Hello")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/single-byte.txt", "Hello")
+ withStream(largeBinary) { stream =>
assert(stream.read() == 'H'.toByte)
assert(stream.read() == 'e'.toByte)
assert(stream.read() == 'l'.toByte)
@@ -104,8 +104,8 @@ class BigObjectInputStreamSpec
}
test("read() should return -1 for empty object") {
- val bigObject = createTestObject("test/empty.txt", "")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/empty.txt", "")
+ withStream(largeBinary) { stream =>
assert(stream.read() == -1)
}
}
@@ -113,8 +113,8 @@ class BigObjectInputStreamSpec
// read(byte[], int, int) Tests
test("read(byte[], int, int) should read data into buffer") {
val testData = "Hello, World!"
- val bigObject = createTestObject("test/buffer-read.txt", testData)
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/buffer-read.txt", testData)
+ withStream(largeBinary) { stream =>
val buffer = new Array[Byte](testData.length)
val bytesRead = stream.read(buffer, 0, buffer.length)
assert(bytesRead == testData.length)
@@ -124,8 +124,8 @@ class BigObjectInputStreamSpec
test("read(byte[], int, int) should handle partial reads and offsets") {
val testData = "Hello, World!"
- val bigObject = createTestObject("test/partial.txt", testData)
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/partial.txt", testData)
+ withStream(largeBinary) { stream =>
// Test partial read
val buffer1 = new Array[Byte](5)
assert(stream.read(buffer1, 0, 5) == 5)
@@ -133,7 +133,7 @@ class BigObjectInputStreamSpec
}
// Test offset
- withStream(bigObject) { stream =>
+ withStream(largeBinary) { stream =>
val buffer2 = new Array[Byte](20)
assert(stream.read(buffer2, 5, 10) == 10)
assert(new String(buffer2, 5, 10) == "Hello, Wor")
@@ -141,8 +141,8 @@ class BigObjectInputStreamSpec
}
test("read(byte[], int, int) should return -1 at EOF") {
- val bigObject = createTestObject("test/eof.txt", "test")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/eof.txt", "test")
+ withStream(largeBinary) { stream =>
val buffer = new Array[Byte](10)
stream.read(buffer, 0, 10)
assert(stream.read(buffer, 0, 10) == -1)
@@ -152,16 +152,16 @@ class BigObjectInputStreamSpec
// readAllBytes() Tests
test("readAllBytes() should read entire object") {
val testData = "Hello, World! This is a test."
- val bigObject = createTestObject("test/read-all.txt", testData)
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/read-all.txt", testData)
+ withStream(largeBinary) { stream =>
assert(new String(stream.readAllBytes()) == testData)
}
}
test("readAllBytes() should handle large objects") {
val largeData = generateRandomData(1024 * 1024) // 1MB
- val bigObject = createTestObject("test/large.bin", largeData)
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/large.bin", largeData)
+ withStream(largeBinary) { stream =>
val bytes = stream.readAllBytes()
assert(bytes.length == largeData.length)
assert(bytes.sameElements(largeData))
@@ -169,8 +169,8 @@ class BigObjectInputStreamSpec
}
test("readAllBytes() should return empty array for empty object") {
- val bigObject = createTestObject("test/empty-all.txt", "")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/empty-all.txt", "")
+ withStream(largeBinary) { stream =>
assert(stream.readAllBytes().length == 0)
}
}
@@ -178,8 +178,8 @@ class BigObjectInputStreamSpec
// readNBytes() Tests
test("readNBytes() should read exactly N bytes") {
val testData = "Hello, World! This is a test."
- val bigObject = createTestObject("test/read-n.txt", testData)
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/read-n.txt", testData)
+ withStream(largeBinary) { stream =>
val bytes = stream.readNBytes(5)
assert(bytes.length == 5)
assert(new String(bytes) == "Hello")
@@ -187,8 +187,8 @@ class BigObjectInputStreamSpec
}
test("readNBytes() should handle EOF and zero") {
- val bigObject = createTestObject("test/read-n-eof.txt", "Hello")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/read-n-eof.txt", "Hello")
+ withStream(largeBinary) { stream =>
// Request more than available
val bytes = stream.readNBytes(100)
assert(bytes.length == 5)
@@ -196,53 +196,53 @@ class BigObjectInputStreamSpec
}
// Test n=0
- withStream(bigObject) { stream =>
+ withStream(largeBinary) { stream =>
assert(stream.readNBytes(0).length == 0)
}
}
// skip() Tests
test("skip() should skip bytes correctly") {
- val bigObject = createTestObject("test/skip.txt", "Hello, World!")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/skip.txt", "Hello, World!")
+ withStream(largeBinary) { stream =>
assert(stream.skip(7) == 7)
assert(stream.read() == 'W'.toByte)
}
}
test("skip() should handle EOF and zero") {
- val bigObject = createTestObject("test/skip-eof.txt", "Hello")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/skip-eof.txt", "Hello")
+ withStream(largeBinary) { stream =>
assert(stream.skip(100) == 5)
assert(stream.read() == -1)
}
// Test n=0
- withStream(bigObject) { stream =>
+ withStream(largeBinary) { stream =>
assert(stream.skip(0) == 0)
}
}
// available() Tests
test("available() should return non-negative value") {
- val bigObject = createTestObject("test/available.txt", "Hello, World!")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/available.txt", "Hello, World!")
+ withStream(largeBinary) { stream =>
assert(stream.available() >= 0)
}
}
// close() Tests
test("close() should be idempotent") {
- val bigObject = createTestObject("test/close-idempotent.txt", "data")
- val stream = new BigObjectInputStream(bigObject)
+ val largeBinary = createTestObject("test/close-idempotent.txt", "data")
+ val stream = new LargeBinaryInputStream(largeBinary)
stream.close()
stream.close() // Should not throw
stream.close() // Should not throw
}
test("close() should prevent further operations") {
- val bigObject = createTestObject("test/close-prevents.txt", "data")
- val stream = new BigObjectInputStream(bigObject)
+ val largeBinary = createTestObject("test/close-prevents.txt", "data")
+ val stream = new LargeBinaryInputStream(largeBinary)
stream.close()
intercept[IOException] { stream.read() }
@@ -253,8 +253,8 @@ class BigObjectInputStreamSpec
}
test("close() should work without reading (lazy initialization)") {
- val bigObject = createTestObject("test/close-lazy.txt", "data")
- val stream = new BigObjectInputStream(bigObject)
+ val largeBinary = createTestObject("test/close-lazy.txt", "data")
+ val stream = new LargeBinaryInputStream(largeBinary)
stream.close() // Should not throw
}
@@ -272,16 +272,16 @@ class BigObjectInputStreamSpec
// mark/reset Tests
test("markSupported() should delegate to underlying stream") {
- val bigObject = createTestObject("test/mark.txt", "data")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/mark.txt", "data")
+ withStream(largeBinary) { stream =>
val supported = stream.markSupported()
assert(!supported || supported) // Just verify it's callable
}
}
test("mark() and reset() should delegate to underlying stream") {
- val bigObject = createTestObject("test/mark-reset.txt", "data")
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/mark-reset.txt", "data")
+ withStream(largeBinary) { stream =>
if (stream.markSupported()) {
stream.mark(100)
stream.read()
@@ -293,8 +293,8 @@ class BigObjectInputStreamSpec
// Lazy initialization Tests
test("lazy initialization should not download until first read") {
- val bigObject = createTestObject("test/lazy-init.txt", "data")
- val stream = new BigObjectInputStream(bigObject)
+ val largeBinary = createTestObject("test/lazy-init.txt", "data")
+ val stream = new LargeBinaryInputStream(largeBinary)
// Creating the stream should not trigger download
// Reading should trigger download
try {
@@ -307,8 +307,8 @@ class BigObjectInputStreamSpec
// Integration Tests
test("should handle chunked reading of large objects") {
val largeData = generateRandomData(10 * 1024) // 10KB
- val bigObject = createTestObject("test/chunked.bin", largeData)
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/chunked.bin", largeData)
+ withStream(largeBinary) { stream =>
val buffer = new Array[Byte](1024)
val output = new java.io.ByteArrayOutputStream()
var bytesRead = 0
@@ -328,10 +328,10 @@ class BigObjectInputStreamSpec
test("should handle multiple streams reading same object") {
val testData = "Shared data"
- val bigObject = createTestObject("test/shared.txt", testData)
+ val largeBinary = createTestObject("test/shared.txt", testData)
- val stream1 = new BigObjectInputStream(bigObject)
- val stream2 = new BigObjectInputStream(bigObject)
+ val stream1 = new LargeBinaryInputStream(largeBinary)
+ val stream2 = new LargeBinaryInputStream(largeBinary)
try {
assert(new String(stream1.readAllBytes()) == testData)
@@ -344,8 +344,8 @@ class BigObjectInputStreamSpec
test("should preserve binary data integrity") {
val binaryData = Array[Byte](0, 1, 2, 127, -128, -1, 50, 100)
- val bigObject = createTestObject("test/binary.bin", binaryData)
- withStream(bigObject) { stream =>
+ val largeBinary = createTestObject("test/binary.bin", binaryData)
+ withStream(largeBinary) { stream =>
assert(stream.readAllBytes().sameElements(binaryData))
}
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
new file mode 100644
index 0000000000..77d142efee
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.apache.texera.amber.core.tuple.LargeBinary
+import org.scalatest.funsuite.AnyFunSuite
+
+class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase {
+
+ /** Creates a large binary from string data and returns it. */
+ private def createLargeBinary(data: String): LargeBinary = {
+ val largeBinary = new LargeBinary()
+ val out = new LargeBinaryOutputStream(largeBinary)
+ try {
+ out.write(data.getBytes)
+ } finally {
+ out.close()
+ }
+ largeBinary
+ }
+
+ /** Verifies standard bucket name. */
+ private def assertStandardBucket(pointer: LargeBinary): Unit = {
+ assert(pointer.getBucketName == "texera-large-binaries")
+ assert(pointer.getUri.startsWith("s3://texera-large-binaries/"))
+ }
+
+ // ========================================
+ // LargeBinaryInputStream Tests (Standard Java InputStream)
+ // ========================================
+
+ test("LargeBinaryInputStream should read all bytes from stream") {
+ val data = "Hello, World! This is a test."
+ val largeBinary = createLargeBinary(data)
+
+ val stream = new LargeBinaryInputStream(largeBinary)
+ assert(stream.readAllBytes().sameElements(data.getBytes))
+ stream.close()
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryInputStream should read exact number of bytes") {
+ val largeBinary = createLargeBinary("0123456789ABCDEF")
+
+ val stream = new LargeBinaryInputStream(largeBinary)
+ val result = stream.readNBytes(10)
+
+ assert(result.length == 10)
+ assert(result.sameElements("0123456789".getBytes))
+ stream.close()
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryInputStream should handle reading more bytes than
available") {
+ val data = "Short"
+ val largeBinary = createLargeBinary(data)
+
+ val stream = new LargeBinaryInputStream(largeBinary)
+ val result = stream.readNBytes(100)
+
+ assert(result.length == data.length)
+ assert(result.sameElements(data.getBytes))
+ stream.close()
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryInputStream should support standard single-byte read") {
+ val largeBinary = createLargeBinary("ABC")
+
+ val stream = new LargeBinaryInputStream(largeBinary)
+ assert(stream.read() == 65) // 'A'
+ assert(stream.read() == 66) // 'B'
+ assert(stream.read() == 67) // 'C'
+ assert(stream.read() == -1) // EOF
+ stream.close()
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryInputStream should return -1 at EOF") {
+ val largeBinary = createLargeBinary("EOF")
+
+ val stream = new LargeBinaryInputStream(largeBinary)
+ stream.readAllBytes() // Read all data
+ assert(stream.read() == -1)
+ stream.close()
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryInputStream should throw exception when reading from closed
stream") {
+ val largeBinary = createLargeBinary("test")
+
+ val stream = new LargeBinaryInputStream(largeBinary)
+ stream.close()
+
+ assertThrows[java.io.IOException](stream.read())
+ assertThrows[java.io.IOException](stream.readAllBytes())
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryInputStream should handle multiple close calls") {
+ val largeBinary = createLargeBinary("test")
+
+ val stream = new LargeBinaryInputStream(largeBinary)
+ stream.close()
+ stream.close() // Should not throw
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryInputStream should read large data correctly") {
+ val largeData = Array.fill[Byte](20000)((scala.util.Random.nextInt(256) -
128).toByte)
+ val largeBinary = new LargeBinary()
+ val out = new LargeBinaryOutputStream(largeBinary)
+ try {
+ out.write(largeData)
+ } finally {
+ out.close()
+ }
+
+ val stream = new LargeBinaryInputStream(largeBinary)
+ val result = stream.readAllBytes()
+ assert(result.sameElements(largeData))
+ stream.close()
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ // ========================================
+ // LargeBinaryManager Tests
+ // ========================================
+
+ test("LargeBinaryManager should create a large binary") {
+ val pointer = createLargeBinary("Test large binary data")
+
+ assertStandardBucket(pointer)
+ }
+
+ test("LargeBinaryInputStream should open and read a large binary") {
+ val data = "Hello from large binary!"
+ val pointer = createLargeBinary(data)
+
+ val stream = new LargeBinaryInputStream(pointer)
+ val readData = stream.readAllBytes()
+ stream.close()
+
+ assert(readData.sameElements(data.getBytes))
+ }
+
+ test("LargeBinaryInputStream should fail to open non-existent large binary")
{
+ val fakeLargeBinary = new
LargeBinary("s3://texera-large-binaries/nonexistent/file")
+ val stream = new LargeBinaryInputStream(fakeLargeBinary)
+
+ try {
+ intercept[Exception] {
+ stream.read()
+ }
+ } finally {
+ try { stream.close() }
+ catch { case _: Exception => }
+ }
+ }
+
+ test("LargeBinaryManager should delete all large binaries") {
+ val pointer1 = new LargeBinary()
+ val out1 = new LargeBinaryOutputStream(pointer1)
+ try {
+ out1.write("Object 1".getBytes)
+ } finally {
+ out1.close()
+ }
+
+ val pointer2 = new LargeBinary()
+ val out2 = new LargeBinaryOutputStream(pointer2)
+ try {
+ out2.write("Object 2".getBytes)
+ } finally {
+ out2.close()
+ }
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryManager should handle delete with no objects gracefully") {
+ LargeBinaryManager.deleteAllObjects() // Should not throw exception
+ }
+
+ test("LargeBinaryManager should delete all objects") {
+ val pointer1 = createLargeBinary("Test data")
+ val pointer2 = createLargeBinary("Test data")
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryManager should create bucket if it doesn't exist") {
+ val pointer = createLargeBinary("Test bucket creation")
+
+ assertStandardBucket(pointer)
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryManager should handle large objects correctly") {
+ val largeData = Array.fill[Byte](6 * 1024 *
1024)((scala.util.Random.nextInt(256) - 128).toByte)
+ val pointer = new LargeBinary()
+ val out = new LargeBinaryOutputStream(pointer)
+ try {
+ out.write(largeData)
+ } finally {
+ out.close()
+ }
+
+ val stream = new LargeBinaryInputStream(pointer)
+ val readData = stream.readAllBytes()
+ stream.close()
+
+ assert(readData.sameElements(largeData))
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryManager should generate unique URIs for different objects")
{
+ val testData = "Unique URI test".getBytes
+ val pointer1 = new LargeBinary()
+ val out1 = new LargeBinaryOutputStream(pointer1)
+ try {
+ out1.write(testData)
+ } finally {
+ out1.close()
+ }
+
+ val pointer2 = new LargeBinary()
+ val out2 = new LargeBinaryOutputStream(pointer2)
+ try {
+ out2.write(testData)
+ } finally {
+ out2.close()
+ }
+
+ assert(pointer1.getUri != pointer2.getUri)
+ assert(pointer1.getObjectKey != pointer2.getObjectKey)
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryInputStream should handle multiple reads from the same
large binary") {
+ val data = "Multiple reads test data"
+ val pointer = createLargeBinary(data)
+
+ val stream1 = new LargeBinaryInputStream(pointer)
+ val readData1 = stream1.readAllBytes()
+ stream1.close()
+
+ val stream2 = new LargeBinaryInputStream(pointer)
+ val readData2 = stream2.readAllBytes()
+ stream2.close()
+
+ assert(readData1.sameElements(data.getBytes))
+ assert(readData2.sameElements(data.getBytes))
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryManager should properly parse bucket name and object key
from large binary") {
+ val largeBinary = createLargeBinary("URI parsing test")
+
+ assertStandardBucket(largeBinary)
+ assert(largeBinary.getObjectKey.nonEmpty)
+ assert(!largeBinary.getObjectKey.startsWith("/"))
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ // ========================================
+ // Object-Oriented API Tests
+ // ========================================
+
+ test("LargeBinary with LargeBinaryOutputStream should create a large
binary") {
+ val data = "Test data for LargeBinary with LargeBinaryOutputStream"
+
+ val largeBinary = new LargeBinary()
+ val out = new LargeBinaryOutputStream(largeBinary)
+ try {
+ out.write(data.getBytes)
+ } finally {
+ out.close()
+ }
+
+ assertStandardBucket(largeBinary)
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryInputStream constructor should read large binary contents")
{
+ val data = "Test data for LargeBinaryInputStream constructor"
+ val largeBinary = createLargeBinary(data)
+
+ val stream = new LargeBinaryInputStream(largeBinary)
+ val readData = stream.readAllBytes()
+ stream.close()
+
+ assert(readData.sameElements(data.getBytes))
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryOutputStream and LargeBinaryInputStream should work
together end-to-end") {
+ val data = "End-to-end test data"
+
+ // Create using streaming API
+ val largeBinary = new LargeBinary()
+ val out = new LargeBinaryOutputStream(largeBinary)
+ try {
+ out.write(data.getBytes)
+ } finally {
+ out.close()
+ }
+
+ // Read using standard constructor
+ val stream = new LargeBinaryInputStream(largeBinary)
+ val readData = stream.readAllBytes()
+ stream.close()
+
+ assert(readData.sameElements(data.getBytes))
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ // ========================================
+ // LargeBinaryOutputStream Tests (New Symmetric API)
+ // ========================================
+
+ test("LargeBinaryOutputStream should write and upload data to S3") {
+ val data = "Test data for LargeBinaryOutputStream"
+
+ val largeBinary = new LargeBinary()
+ val outStream = new LargeBinaryOutputStream(largeBinary)
+ outStream.write(data.getBytes)
+ outStream.close()
+
+ assertStandardBucket(largeBinary)
+
+ // Verify data can be read back
+ val inStream = new LargeBinaryInputStream(largeBinary)
+ val readData = inStream.readAllBytes()
+ inStream.close()
+
+ assert(readData.sameElements(data.getBytes))
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryOutputStream should create large binary") {
+ val data = "Database registration test"
+
+ val largeBinary = new LargeBinary()
+ val outStream = new LargeBinaryOutputStream(largeBinary)
+ outStream.write(data.getBytes)
+ outStream.close()
+
+ assertStandardBucket(largeBinary)
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryOutputStream should handle large data correctly") {
+ val largeData = Array.fill[Byte](8 * 1024 *
1024)((scala.util.Random.nextInt(256) - 128).toByte)
+
+ val largeBinary = new LargeBinary()
+ val outStream = new LargeBinaryOutputStream(largeBinary)
+ outStream.write(largeData)
+ outStream.close()
+
+ // Verify data integrity
+ val inStream = new LargeBinaryInputStream(largeBinary)
+ val readData = inStream.readAllBytes()
+ inStream.close()
+
+ assert(readData.sameElements(largeData))
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryOutputStream should handle multiple writes") {
+ val largeBinary = new LargeBinary()
+ val outStream = new LargeBinaryOutputStream(largeBinary)
+ outStream.write("Hello ".getBytes)
+ outStream.write("World".getBytes)
+ outStream.write("!".getBytes)
+ outStream.close()
+
+ val inStream = new LargeBinaryInputStream(largeBinary)
+ val readData = inStream.readAllBytes()
+ inStream.close()
+
+ assert(readData.sameElements("Hello World!".getBytes))
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryOutputStream should throw exception when writing to closed
stream") {
+ val largeBinary = new LargeBinary()
+ val outStream = new LargeBinaryOutputStream(largeBinary)
+ outStream.write("test".getBytes)
+ outStream.close()
+
+ assertThrows[java.io.IOException](outStream.write("more".getBytes))
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinaryOutputStream should handle close() being called multiple
times") {
+ val largeBinary = new LargeBinary()
+ val outStream = new LargeBinaryOutputStream(largeBinary)
+ outStream.write("test".getBytes)
+ outStream.close()
+ outStream.close() // Should not throw
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("New LargeBinary() constructor should create unique URIs") {
+ val largeBinary1 = new LargeBinary()
+ val largeBinary2 = new LargeBinary()
+
+ assert(largeBinary1.getUri != largeBinary2.getUri)
+ assert(largeBinary1.getObjectKey != largeBinary2.getObjectKey)
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+
+ test("LargeBinary() and LargeBinaryOutputStream API should be symmetric with
input") {
+ val data = "Symmetric API test"
+
+ // Write using new symmetric API
+ val largeBinary = new LargeBinary()
+ val outStream = new LargeBinaryOutputStream(largeBinary)
+ outStream.write(data.getBytes)
+ outStream.close()
+
+ // Read using symmetric API
+ val inStream = new LargeBinaryInputStream(largeBinary)
+ val readData = inStream.readAllBytes()
+ inStream.close()
+
+ assert(readData.sameElements(data.getBytes))
+
+ LargeBinaryManager.deleteAllObjects()
+ }
+}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryOutputStreamSpec.scala
similarity index 63%
rename from
common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala
rename to
common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryOutputStreamSpec.scala
index b4c106d619..ed2a2fbace 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryOutputStreamSpec.scala
@@ -19,20 +19,20 @@
package org.apache.texera.service.util
-import org.apache.texera.amber.core.tuple.BigObject
+import org.apache.texera.amber.core.tuple.LargeBinary
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
import java.io.IOException
import scala.util.Random
-class BigObjectOutputStreamSpec
+class LargeBinaryOutputStreamSpec
extends AnyFunSuite
with S3StorageTestBase
with BeforeAndAfterAll
with BeforeAndAfterEach {
- private val testBucketName = "test-big-object-output-stream"
+ private val testBucketName = "test-large-binary-output-stream"
override def beforeAll(): Unit = {
super.beforeAll()
@@ -49,43 +49,43 @@ class BigObjectOutputStreamSpec
}
// Helper methods
- private def createBigObject(key: String): BigObject =
- new BigObject(s"s3://$testBucketName/$key")
+ private def createLargeBinary(key: String): LargeBinary =
+ new LargeBinary(s"s3://$testBucketName/$key")
private def generateRandomData(size: Int): Array[Byte] =
Array.fill[Byte](size)((Random.nextInt(256) - 128).toByte)
- private def withStream[T](bigObject: BigObject)(f: BigObjectOutputStream =>
T): T = {
- val stream = new BigObjectOutputStream(bigObject)
+ private def withStream[T](largeBinary: LargeBinary)(f:
LargeBinaryOutputStream => T): T = {
+ val stream = new LargeBinaryOutputStream(largeBinary)
try f(stream)
finally stream.close()
}
- private def readBack(bigObject: BigObject): Array[Byte] = {
- val inputStream = new BigObjectInputStream(bigObject)
+ private def readBack(largeBinary: LargeBinary): Array[Byte] = {
+ val inputStream = new LargeBinaryInputStream(largeBinary)
try inputStream.readAllBytes()
finally inputStream.close()
}
private def writeAndVerify(key: String, data: Array[Byte]): Unit = {
- val bigObject = createBigObject(key)
- withStream(bigObject)(_.write(data, 0, data.length))
- assert(readBack(bigObject).sameElements(data))
+ val largeBinary = createLargeBinary(key)
+ withStream(largeBinary)(_.write(data, 0, data.length))
+ assert(readBack(largeBinary).sameElements(data))
}
// === Constructor Tests ===
- test("should reject null BigObject") {
- val exception = intercept[IllegalArgumentException](new
BigObjectOutputStream(null))
- assert(exception.getMessage.contains("BigObject cannot be null"))
+ test("should reject null LargeBinary") {
+ val exception = intercept[IllegalArgumentException](new
LargeBinaryOutputStream(null))
+ assert(exception.getMessage.contains("LargeBinary cannot be null"))
}
// === Basic Write Tests ===
test("should write single bytes correctly") {
- val bigObject = createBigObject("test/single-bytes.txt")
- withStream(bigObject) { stream =>
+ val largeBinary = createLargeBinary("test/single-bytes.txt")
+ withStream(largeBinary) { stream =>
"Hello".foreach(c => stream.write(c.toByte))
}
- assert(new String(readBack(bigObject)) == "Hello")
+ assert(new String(readBack(largeBinary)) == "Hello")
}
test("should write byte arrays correctly") {
@@ -95,58 +95,58 @@ class BigObjectOutputStreamSpec
test("should handle partial writes with offset and length") {
val testData = "Hello, World!".getBytes
- val bigObject = createBigObject("test/partial-write.txt")
+ val largeBinary = createLargeBinary("test/partial-write.txt")
- withStream(bigObject) { stream =>
+ withStream(largeBinary) { stream =>
stream.write(testData, 0, 5) // "Hello"
stream.write(testData, 7, 5) // "World"
}
- assert(new String(readBack(bigObject)) == "HelloWorld")
+ assert(new String(readBack(largeBinary)) == "HelloWorld")
}
test("should handle multiple consecutive writes") {
- val bigObject = createBigObject("test/multiple-writes.txt")
- withStream(bigObject) { stream =>
+ val largeBinary = createLargeBinary("test/multiple-writes.txt")
+ withStream(largeBinary) { stream =>
stream.write("Hello".getBytes)
stream.write(", ".getBytes)
stream.write("World!".getBytes)
}
- assert(new String(readBack(bigObject)) == "Hello, World!")
+ assert(new String(readBack(largeBinary)) == "Hello, World!")
}
// === Stream Lifecycle Tests ===
test("flush should not throw") {
- val bigObject = createBigObject("test/flush.txt")
- withStream(bigObject) { stream =>
+ val largeBinary = createLargeBinary("test/flush.txt")
+ withStream(largeBinary) { stream =>
stream.write("test".getBytes)
stream.flush()
stream.write(" data".getBytes)
}
- assert(new String(readBack(bigObject)) == "test data")
+ assert(new String(readBack(largeBinary)) == "test data")
}
test("close should be idempotent") {
- val bigObject = createBigObject("test/close-idempotent.txt")
- val stream = new BigObjectOutputStream(bigObject)
+ val largeBinary = createLargeBinary("test/close-idempotent.txt")
+ val stream = new LargeBinaryOutputStream(largeBinary)
stream.write("data".getBytes)
stream.close()
stream.close() // Should not throw
stream.flush() // Should not throw after close
- assert(new String(readBack(bigObject)) == "data")
+ assert(new String(readBack(largeBinary)) == "data")
}
test("close should handle empty stream") {
- val bigObject = createBigObject("test/empty-stream.txt")
- val stream = new BigObjectOutputStream(bigObject)
+ val largeBinary = createLargeBinary("test/empty-stream.txt")
+ val stream = new LargeBinaryOutputStream(largeBinary)
stream.close()
- assert(readBack(bigObject).length == 0)
+ assert(readBack(largeBinary).length == 0)
}
// === Error Handling ===
test("write operations should throw IOException when stream is closed") {
- val bigObject = createBigObject("test/closed-stream.txt")
- val stream = new BigObjectOutputStream(bigObject)
+ val largeBinary = createLargeBinary("test/closed-stream.txt")
+ val stream = new LargeBinaryOutputStream(largeBinary)
stream.close()
val ex1 = intercept[IOException](stream.write('A'.toByte))
@@ -171,13 +171,13 @@ class BigObjectOutputStreamSpec
val totalSize = 1024 * 1024 // 1MB
val chunkSize = 8 * 1024 // 8KB
val data = generateRandomData(totalSize)
- val bigObject = createBigObject("test/chunked.bin")
+ val largeBinary = createLargeBinary("test/chunked.bin")
- withStream(bigObject) { stream =>
+ withStream(largeBinary) { stream =>
data.grouped(chunkSize).foreach(chunk => stream.write(chunk))
}
- assert(readBack(bigObject).sameElements(data))
+ assert(readBack(largeBinary).sameElements(data))
}
// === Binary Data Tests ===
@@ -189,8 +189,8 @@ class BigObjectOutputStreamSpec
// === Integration Tests ===
test("should handle concurrent writes to different objects") {
val streams = (1 to 3).map { i =>
- val obj = createBigObject(s"test/concurrent-$i.txt")
- val stream = new BigObjectOutputStream(obj)
+ val obj = createLargeBinary(s"test/concurrent-$i.txt")
+ val stream = new LargeBinaryOutputStream(obj)
(obj, stream, s"Data $i")
}
@@ -207,32 +207,32 @@ class BigObjectOutputStreamSpec
}
test("should overwrite existing object") {
- val bigObject = createBigObject("test/overwrite.txt")
- withStream(bigObject)(_.write("original data".getBytes))
- withStream(bigObject)(_.write("new data".getBytes))
- assert(new String(readBack(bigObject)) == "new data")
+ val largeBinary = createLargeBinary("test/overwrite.txt")
+ withStream(largeBinary)(_.write("original data".getBytes))
+ withStream(largeBinary)(_.write("new data".getBytes))
+ assert(new String(readBack(largeBinary)) == "new data")
}
test("should handle mixed write operations") {
- val bigObject = createBigObject("test/mixed-writes.txt")
- withStream(bigObject) { stream =>
+ val largeBinary = createLargeBinary("test/mixed-writes.txt")
+ withStream(largeBinary) { stream =>
stream.write('A'.toByte)
stream.write(" test ".getBytes)
stream.write('B'.toByte)
val data = "Hello, World!".getBytes
stream.write(data, 7, 6) // "World!"
}
- assert(new String(readBack(bigObject)) == "A test BWorld!")
+ assert(new String(readBack(largeBinary)) == "A test BWorld!")
}
// === Edge Cases ===
test("should create bucket automatically") {
val newBucketName = s"new-bucket-${Random.nextInt(10000)}"
- val bigObject = new BigObject(s"s3://$newBucketName/test/auto-create.txt")
+ val largeBinary = new
LargeBinary(s"s3://$newBucketName/test/auto-create.txt")
try {
- withStream(bigObject)(_.write("test".getBytes))
- assert(new String(readBack(bigObject)) == "test")
+ withStream(largeBinary)(_.write("test".getBytes))
+ assert(new String(readBack(largeBinary)) == "test")
} finally {
try S3StorageClient.deleteDirectory(newBucketName, "")
catch { case _: Exception => /* ignore */ }
@@ -241,11 +241,11 @@ class BigObjectOutputStreamSpec
test("should handle rapid open/close cycles") {
(1 to 10).foreach { i =>
-
withStream(createBigObject(s"test/rapid-$i.txt"))(_.write(s"data-$i".getBytes))
+
withStream(createLargeBinary(s"test/rapid-$i.txt"))(_.write(s"data-$i".getBytes))
}
(1 to 10).foreach { i =>
- val result = readBack(createBigObject(s"test/rapid-$i.txt"))
+ val result = readBack(createLargeBinary(s"test/rapid-$i.txt"))
assert(new String(result) == s"data-$i")
}
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileAttributeType.java
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileAttributeType.java
index 9e1dfc2b8c..87d6c38c64 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileAttributeType.java
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileAttributeType.java
@@ -31,7 +31,7 @@ public enum FileAttributeType {
BOOLEAN("boolean", AttributeType.BOOLEAN),
TIMESTAMP("timestamp", AttributeType.TIMESTAMP),
BINARY("binary", AttributeType.BINARY),
- BIG_OBJECT("big object", AttributeType.BIG_OBJECT);
+ LARGE_BINARY("large binary", AttributeType.LARGE_BINARY);
private final String name;
@@ -57,6 +57,6 @@ public enum FileAttributeType {
}
public boolean isSingle() {
- return this == SINGLE_STRING || this == BINARY || this == BIG_OBJECT;
+ return this == SINGLE_STRING || this == BINARY || this == LARGE_BINARY;
}
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
index 11bfd60cf0..91c817240c 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
@@ -22,9 +22,9 @@ package org.apache.texera.amber.operator.source.scan
import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.texera.amber.core.tuple.{BigObject, TupleLike}
+import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
import org.apache.texera.amber.util.JSONUtils.objectMapper
-import org.apache.texera.service.util.BigObjectOutputStream
+import org.apache.texera.service.util.LargeBinaryOutputStream
import org.apache.commons.compress.archivers.ArchiveStreamFactory
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
import org.apache.commons.io.IOUtils.toByteArray
@@ -85,10 +85,10 @@ class FileScanSourceOpExec private[scan] (
fields.addOne(desc.attributeType match {
case FileAttributeType.SINGLE_STRING =>
new String(toByteArray(entry), desc.fileEncoding.getCharset)
- case FileAttributeType.BIG_OBJECT =>
- // For big objects, create reference and upload via streaming
- val bigObject = new BigObject()
- val out = new BigObjectOutputStream(bigObject)
+ case FileAttributeType.LARGE_BINARY =>
+ // For large binaries, create reference and upload via
streaming
+ val largeBinary = new LargeBinary()
+ val out = new LargeBinaryOutputStream(largeBinary)
try {
val buffer = new Array[Byte](8192)
var bytesRead = entry.read(buffer)
@@ -99,7 +99,7 @@ class FileScanSourceOpExec private[scan] (
} finally {
out.close()
}
- bigObject
+ largeBinary
case _ => parseField(toByteArray(entry),
desc.attributeType.getType)
})
TupleLike(fields.toSeq: _*)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
index 9a23b3f1d6..ab3383ca0a 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
@@ -19,7 +19,7 @@
package org.apache.texera.amber.operator.source.scan
-import org.apache.texera.amber.core.tuple.{AttributeType, BigObject, Schema,
SchemaEnforceable}
+import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema,
SchemaEnforceable}
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
@@ -30,8 +30,8 @@ import java.nio.file.{Files, Path}
import java.util.zip.{ZipEntry, ZipOutputStream}
/**
- * Unit tests for BIG_OBJECT logic in FileScanSourceOpExec.
- * Full integration tests with S3 and database are in BigObjectManagerSpec.
+ * Unit tests for LARGE_BINARY logic in FileScanSourceOpExec.
+ * Full integration tests with S3 and database are in LargeBinaryManagerSpec.
*/
class FileScanSourceOpExecSpec extends AnyFlatSpec with BeforeAndAfterAll {
@@ -40,8 +40,8 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with
BeforeAndAfterAll {
.resolve("common/workflow-operator/src/test/resources")
.toRealPath()
- private val testFile = testDir.resolve("test_big_object.txt")
- private val testZip = testDir.resolve("test_big_object.zip")
+ private val testFile = testDir.resolve("test_large_binary.txt")
+ private val testZip = testDir.resolve("test_large_binary.zip")
override def beforeAll(): Unit = {
super.beforeAll()
@@ -75,7 +75,7 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with
BeforeAndAfterAll {
): FileScanSourceOpDesc = {
val desc = new FileScanSourceOpDesc()
desc.fileName = Some(file.toString)
- desc.attributeType = FileAttributeType.BIG_OBJECT
+ desc.attributeType = FileAttributeType.LARGE_BINARY
desc.attributeName = attributeName
desc.fileEncoding = FileDecodingMethod.UTF_8
desc
@@ -83,26 +83,26 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with
BeforeAndAfterAll {
private def assertSchema(schema: Schema, attributeName: String): Unit = {
assert(schema.getAttributes.length == 1)
- assert(schema.getAttribute(attributeName).getType ==
AttributeType.BIG_OBJECT)
+ assert(schema.getAttribute(attributeName).getType ==
AttributeType.LARGE_BINARY)
}
// Schema Tests
- it should "infer BIG_OBJECT schema with default attribute name" in {
+ it should "infer LARGE_BINARY schema with default attribute name" in {
assertSchema(createDescriptor().sourceSchema(), "line")
}
- it should "infer BIG_OBJECT schema with custom attribute name" in {
+ it should "infer LARGE_BINARY schema with custom attribute name" in {
assertSchema(createDescriptor(attributeName =
"custom_field").sourceSchema(), "custom_field")
}
- it should "map BIG_OBJECT to correct AttributeType" in {
- assert(FileAttributeType.BIG_OBJECT.getType == AttributeType.BIG_OBJECT)
+ it should "map LARGE_BINARY to correct AttributeType" in {
+ assert(FileAttributeType.LARGE_BINARY.getType ==
AttributeType.LARGE_BINARY)
}
// Type Classification Tests
- it should "correctly classify BIG_OBJECT as isSingle type" in {
+ it should "correctly classify LARGE_BINARY as isSingle type" in {
val isSingleTypes = List(
- FileAttributeType.BIG_OBJECT,
+ FileAttributeType.LARGE_BINARY,
FileAttributeType.SINGLE_STRING,
FileAttributeType.BINARY
)
@@ -120,7 +120,7 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with
BeforeAndAfterAll {
}
// Execution Tests
- it should "create BigObject when reading file with BIG_OBJECT type" in {
+ it should "create LargeBinary when reading file with LARGE_BINARY type" in {
val desc = createDescriptor()
desc.setResolvedFileName(URI.create(testFile.toUri.toString))
@@ -137,26 +137,26 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with
BeforeAndAfterAll {
.enforceSchema(desc.sourceSchema())
.getField[Any]("line")
- assert(field.isInstanceOf[BigObject])
- assert(field.asInstanceOf[BigObject].getUri.startsWith("s3://"))
+ assert(field.isInstanceOf[LargeBinary])
+ assert(field.asInstanceOf[LargeBinary].getUri.startsWith("s3://"))
} catch {
case e: Exception =>
info(s"S3 not configured: ${e.getMessage}")
}
}
- // BigObject Tests
- it should "create valid BigObject with correct URI parsing" in {
- val pointer = new BigObject("s3://bucket/path/to/object")
+ // LargeBinary Tests
+ it should "create valid LargeBinary with correct URI parsing" in {
+ val pointer = new LargeBinary("s3://bucket/path/to/object")
assert(pointer.getUri == "s3://bucket/path/to/object")
assert(pointer.getBucketName == "bucket")
assert(pointer.getObjectKey == "path/to/object")
}
- it should "reject invalid BigObject URIs" in {
- assertThrows[IllegalArgumentException](new BigObject("http://invalid"))
- assertThrows[IllegalArgumentException](new BigObject("not-a-uri"))
- assertThrows[IllegalArgumentException](new
BigObject(null.asInstanceOf[String]))
+ it should "reject invalid LargeBinary URIs" in {
+ assertThrows[IllegalArgumentException](new LargeBinary("http://invalid"))
+ assertThrows[IllegalArgumentException](new LargeBinary("not-a-uri"))
+ assertThrows[IllegalArgumentException](new
LargeBinary(null.asInstanceOf[String]))
}
}