yihua commented on code in PR #18098: URL: https://github.com/apache/hudi/pull/18098#discussion_r3041923568
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReaderStrategy.scala: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.HoodieSparkConfUtils +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.conf +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} + +/** + * Spark strategy that converts [[BatchedBlobRead]] logical nodes to [[BatchedBlobReadExec]] physical nodes. + * + * Reads configuration for batching parameters and creates physical plan with appropriate settings. + * + * @param sparkSession SparkSession for accessing configuration + */ +case class BatchedBlobReaderStrategy(sparkSession: SparkSession) extends SparkStrategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case read @ BatchedBlobRead(child, _, _) => + // TODO find proper way to access these configs + val maxGapBytes = HoodieSparkConfUtils.getConfigValue( + Map.empty, sparkSession.sessionState.conf, + BatchedBlobReader.MAX_GAP_BYTES_CONF, + String.valueOf(BatchedBlobReader.DEFAULT_MAX_GAP_BYTES)).toInt + + val lookaheadSize = HoodieSparkConfUtils.getConfigValue( + Map.empty, sparkSession.sessionState.conf, + BatchedBlobReader.LOOKAHEAD_SIZE_CONF, + String.valueOf(BatchedBlobReader.DEFAULT_LOOKAHEAD_SIZE)).toInt + + val storageConf = new HadoopStorageConfiguration(sparkSession.sparkContext.hadoopConfiguration) Review Comment: 🤖 This captures a reference to the live `hadoopConfiguration` which is mutable and not safe to serialize across Spark tasks. The `StorageConfiguration` will be serialized into the physical plan and sent to executors — could you use `new HadoopStorageConfiguration(new SerializableConfiguration(sparkSession.sparkContext.hadoopConfiguration).value())` or similar to ensure a safe copy is used? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala: ########## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.HoodieSparkUtils.sparkAdapter +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StorageConfiguration, StoragePath} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, SpecificInternalRow} +import org.apache.spark.sql.types.{BinaryType, BlobType, DataType, StructField, StructType} +import org.slf4j.LoggerFactory + +import java.io.InputStream + +import scala.collection.mutable.ArrayBuffer + +/** + * Batched byte range reader that optimizes I/O by combining consecutive reads for out-of-line data. + * + * This reader analyzes sequences of read requests within a partition and merges + * consecutive or nearby reads into single I/O operations. This significantly reduces + * the number of seeks and reads when processing sorted data. + * + * <h3>Schema Requirement:</h3> + * The blob column must match the schema defined in {@link HoodieSchema.Blob}: + * <pre> + * struct { + * type: string // "inline" or "out_of_line" + * data: binary (nullable) // inline data (null for out_of_line) + * reference: struct (nullable) { // file reference (null for inline) + * external_path: string + * offset: long + * length: long + * managed: boolean + * } + * } + * </pre> + * + * <h3>Key Features:</h3> + * <ul> + * <li>Batches consecutive reads from the same file</li> + * <li>Configurable gap threshold for merging nearby reads</li> + * <li>Lookahead buffer to identify batch opportunities</li> + * <li>Preserves input row order in output</li> + * </ul> + * + * <h3>Usage Example:</h3> + * {{{ + * import org.apache.hudi.udf.BatchedByteRangeReader + * import org.apache.spark.sql.functions._ + * // Read a table with a blob column (e.g. image_data) + * val df = spark.read.format("hudi").load("/my_path").select("image_data", "record_id") + * + * // Read with batching (best when data is sorted by external_path, offset) + * val result = BatchedByteRangeReader.readBatched(df, structColName = "file_info") + * + * // Result has: image_data, record_id, data + * result.show() + * }}} + * + * <h3>Performance Tips:</h3> + * <ul> + * <li>Sort input by (blob.reference.external_path, blob.reference.offset) for maximum batching effectiveness</li> + * <li>Increase lookaheadSize for better batch detection (at cost of memory)</li> + * <li>Tune maxGapBytes based on your data access patterns</li> + * </ul> + * + * @param storage HoodieStorage instance for file I/O + * @param maxGapBytes Maximum gap between ranges to consider for batching (default: 4KB) + * @param lookaheadSize Number of rows to buffer for batch detection (default: 50) + */ +class BatchedBlobReader( + storage: HoodieStorage, + maxGapBytes: Int = 4096, + lookaheadSize: Int = 50) { + + private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader]) + + /** + * Process a partition iterator, batching consecutive reads. + * + * This method consumes the input iterator and produces an output iterator + * with each row containing the original data plus a "data" column with the + * bytes read from the file. + * + * @param rows Iterator of input rows with struct column + * @param structColIdx Index of the struct column in the row + * @param outputSchema Schema for output rows + * @param accessor Type class for accessing row fields + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Iterator of output rows with data column added + */ + def processPartition[R]( + rows: Iterator[R], + structColIdx: Int, + outputSchema: StructType) + (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] = { + + // Create buffered iterator for lookahead + val bufferedRows = rows.buffered + + // Result buffer to maintain order + val resultIterator = new Iterator[R] { + private var currentBatch: Iterator[R] = Iterator.empty + private var rowIndex = 0L + + override def hasNext: Boolean = { + if (currentBatch.hasNext) { + true + } else if (bufferedRows.hasNext) { + // Process next batch + currentBatch = processNextBatch() + currentBatch.hasNext + } else { + false + } + } + + override def next(): R = { + if (!hasNext) { + throw new NoSuchElementException("No more rows") + } + currentBatch.next() + } + + /** + * Collect and process the next batch of rows. + */ + private def processNextBatch(): Iterator[R] = { + // Collect up to lookaheadSize rows with their original indices + val batch = collectBatch() + + if (batch.isEmpty) { + Iterator.empty + } else { + // Partition the batch into three groups + val (inlineRows, outOfLineRows) = batch.partition(_.inlineBytes.isDefined) + val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 0) + + // Case 1: Inline — return bytes directly without I/O + val inlineResults = inlineRows.map { ri => + RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, outputSchema), ri.index) + } + + // Case 2: Whole-file reads + val wholeFileResults = wholeFileRows.map(readWholeFile(_, outputSchema)) + + // Case 3: Regular range reads — merge consecutive ranges and batch + val mergedRanges = identifyConsecutiveRanges(rangeRows) + val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, outputSchema)) + + // Sort by original index to preserve input order + (inlineResults ++ wholeFileResults ++ rangeResults).sortBy(_.index).map(_.row).iterator + } + } + + /** + * Collect up to lookaheadSize rows from the input iterator. + */ + private def collectBatch(): Seq[RowInfo[R]] = { + val batch = ArrayBuffer[RowInfo[R]]() + var collected = 0 + + while (bufferedRows.hasNext && collected < lookaheadSize) { + val row = bufferedRows.next() + // Handle null struct column (null blob) + if (accessor.isNullAt(row, structColIdx)) { + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(null) + ) + rowIndex += 1 + collected += 1 + } else { + val blobStruct = accessor.getStruct(row, structColIdx, HoodieSchema.Blob.getFieldCount) + // Dispatch based on storage_type (field 0) + val storageType = accessor.getString(blobStruct, 0) + if (storageType == HoodieSchema.Blob.INLINE) { + // Case 1: Inline — bytes are in field 1 + val bytes = accessor.getBytes(blobStruct, 1) + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(bytes) + ) + } else { + // Case 2 or 3: Out-of-line — get reference struct (field 2) + val referenceStruct = accessor.getStruct(blobStruct, 2, HoodieSchema.Blob.getReferenceFieldCount) + val filePath = accessor.getString(referenceStruct, 0) + val offsetIsNull = accessor.isNullAt(referenceStruct, 1) + val lengthIsNull = accessor.isNullAt(referenceStruct, 2) + if (offsetIsNull || lengthIsNull) { + // Case 2: Whole-file read — no offset/length specified; sentinel length = -1 + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = 0, + length = -1, + index = rowIndex + ) + } else { + // Case 3: Regular range read + val offset = accessor.getLong(referenceStruct, 1) + val length = accessor.getLong(referenceStruct, 2) + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = offset, + length = length, + index = rowIndex + ) + } + } + rowIndex += 1 + collected += 1 + } + } + batch.toSeq + } + } + + resultIterator + } + + /** + * Identify consecutive ranges that can be batched together. + * + * This method groups rows by file path, sorts by offset, and merges + * ranges that are consecutive or within maxGapBytes of each other. + * + * @param rows Sequence of row information + * @return Sequence of merged ranges + */ + private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + // Group by file path + val byFile = rows.groupBy(_.filePath) + + val allRanges = ArrayBuffer[MergedRange[R]]() + + byFile.foreach { case (filePath, fileRows) => + // Sort by offset + val sorted = fileRows.sortBy(_.offset) + + // Merge consecutive ranges + val merged = mergeRanges(sorted, maxGapBytes) + allRanges ++= merged + } + + allRanges.toSeq + } + + /** + * Merge consecutive ranges within the gap threshold. + * + * @param rows Sorted rows from the same file + * @param maxGap Maximum gap to consider for merging + * @return Sequence of merged ranges + */ + private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + + val result = ArrayBuffer[MergedRange[R]]() + var current: MergedRange[R] = null + + rows.foreach { row => + if (current == null) { + // Start first range + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } else { + val gap = row.offset - current.endOffset + + if (gap >= 0 && gap <= maxGap) { + // Merge into current range + current = current.merge(row) + } else { + // Save current range and start new one + result += current + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } + } + } + + // Add final range + if (current != null) { + result += current + } + + result.toSeq + } + + /** + * Read an entire file and return it as a single row result. + * + * Used for whole-file out-of-line blobs where no offset or length is specified. + * + * @param rowInfo Row information with the file path + * @param outputSchema Schema for output rows + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) Review Comment: 🤖 The `totalLength` cast to `Int` will silently overflow for merged ranges > 2GB. Even though individual blob lengths may be small, the merged range (`endOffset - startOffset`) can be large if there's a big gap just under `maxGapBytes`. Could you add a guard like `require(totalLength <= Int.MaxValue, ...)` before the cast, or switch to reading in chunks? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReadExec.scala: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.storage.StorageConfiguration + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} + +/** + * Physical plan node that executes batched blob reads. + * + * Reads blob data from storage using [[BatchedBlobReader]] to batch + * reads efficiently when data is sorted by file and position. + * + * @param child Child physical plan + * @param maxGapBytes Maximum gap between reads to batch (from config) + * @param storageConf Storage configuration for file access + * @param lookaheadSize Number of rows to buffer for batch detection + * @param logical The logical plan node this was created from + */ +case class BatchedBlobReadExec(child: SparkPlan, + maxGapBytes: Int, + storageConf: StorageConfiguration[_], + lookaheadSize: Int, + logical: BatchedBlobRead) + extends UnaryExecNode { + + override def output: Seq[Attribute] = logical.output + + override protected def doExecute(): RDD[InternalRow] = { + val childRDD = child.execute() Review Comment: 🤖 The `storageConf` is captured in the closure of `doExecute()` and serialized per-task instead of being broadcast. For large clusters this could add significant overhead to task serialization. Consider broadcasting it similar to what `readBatched` does. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, ExprId, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{DataType, StructType} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +/** + * Transforms queries with `read_blob()` to use lazy batched I/O. + * + * Replaces [[ReadBlobExpression]] markers with [[BatchedBlobRead]] nodes + * that read blob data during physical execution. + * + * Example: `SELECT id, read_blob(image_data) FROM table` + * + * @param spark SparkSession for accessing configuration + */ +case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + case Filter(condition, child) + if containsReadBlobInExpression(condition) + && !child.isInstanceOf[BatchedBlobRead] => + + val blobColumns = extractBlobColumnsFromExpression(condition) + val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) + val newCondition = replaceReadBlobExpression(condition, blobToDataAttr) + Project(child.output, Filter(newCondition, wrappedPlan)) + + case Project(projectList, child) + if containsReadBlobExpression(projectList) + && !child.isInstanceOf[BatchedBlobRead] => + + val blobColumns = extractAllBlobColumns(projectList) + val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) + val newProjectList = transformNamedExpressions(projectList, blobToDataAttr) + Project(newProjectList, wrappedPlan) + } + + private def wrapWithBlobReads( + blobColumns: Seq[AttributeReference], + child: LogicalPlan): (LogicalPlan, Map[ExprId, Attribute]) = { + if (blobColumns.isEmpty) { + throw new IllegalStateException("read_blob() found but no valid blob column reference extracted.") + } + blobColumns.foldLeft((child: LogicalPlan, Map.empty[ExprId, Attribute])) { + case ((currentPlan, mapping), blobAttr) => + // Type compatibility check (early fail for non-struct columns) + blobAttr.dataType match { + case struct: StructType if DataType.equalsIgnoreCaseAndNullability(struct, org.apache.spark.sql.types.BlobType.dataType) => + // Valid blob column + case _ => + throw new IllegalArgumentException( + s"Blob column '${blobAttr.name}' must be compatible with BlobType (type, data, reference struct), found: ${blobAttr.dataType}") + } + val blobRead = BatchedBlobRead(currentPlan, blobAttr) + (blobRead, mapping + (blobAttr.exprId -> blobRead.dataAttr)) + } + } + + private def extractBlobColumnsFromExpression(expr: Expression): Seq[AttributeReference] = { + val seen = mutable.LinkedHashSet.empty[ExprId] + val result = ArrayBuffer.empty[AttributeReference] + collectBlobColumns(expr, seen, result) + result.toSeq + } + + /** + * Check if any expression in the project list contains a ReadBlobExpression. + */ + private def containsReadBlobExpression(projectList: Seq[Expression]): Boolean = { + projectList.exists(expr => containsReadBlobInExpression(expr)) + } + + private def containsReadBlobInExpression(expr: Expression): Boolean = { + expr match { + case _: ReadBlobExpression => true + case other => other.children.exists(containsReadBlobInExpression) + } + } + + private def extractAllBlobColumns(expressions: Seq[Expression]): Seq[AttributeReference] = { + val seen = mutable.LinkedHashSet.empty[ExprId] + val result = ArrayBuffer.empty[AttributeReference] + expressions.foreach(collectBlobColumns(_, seen, result)) + result.toSeq + } + + private def collectBlobColumns( + expr: Expression, + seen: mutable.Set[ExprId], + result: ArrayBuffer[AttributeReference]): Unit = expr match { + case ReadBlobExpression(attr: AttributeReference) => + if (seen.add(attr.exprId)) result += attr + case other => + other.children.foreach(collectBlobColumns(_, seen, result)) + } + + private def transformNamedExpressions( + expressions: Seq[NamedExpression], + blobToDataAttr: Map[ExprId, Attribute]): Seq[NamedExpression] = { + expressions.map { + case alias @ Alias(childExpr, name) => + val rewritten = replaceReadBlobExpression(childExpr, blobToDataAttr) + Alias(rewritten, name)(alias.exprId, alias.qualifier, alias.explicitMetadata) + case attr: AttributeReference => attr + case other => + replaceReadBlobExpression(other, blobToDataAttr).asInstanceOf[NamedExpression] + } + } Review Comment: 🤖 If `attr.exprId` is not found in `blobToDataAttr`, this will throw a `NoSuchElementException` with no useful context. This could happen if `read_blob()` wraps an `AttributeReference` that wasn't collected by `extractBlobColumnsFromExpression` / `extractAllBlobColumns` (e.g., because the expression tree structure doesn't match the pattern in `collectBlobColumns`). Could you use `blobToDataAttr.getOrElse(attr.exprId, throw new AnalysisException(...))` with a descriptive message? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala: ########## @@ -1996,4 +1997,105 @@ class TestCreateTable extends HoodieSparkSqlTestBase { HoodieSparkSqlTestBase.enableComplexKeygenValidation(spark, tableName) checkAnswer(query)(expectedRowsAfter: _*) } + + test("test create table with BLOB column") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |CREATE TABLE $tableName ( + | id BIGINT, + | video BLOB COMMENT 'Product demonstration video' + |) USING hudi + |LOCATION '${tmp.getCanonicalPath}' + |TBLPROPERTIES ( + | primaryKey = 'id' + |) + """.stripMargin) + + // Verify schema has hudi_blob metadata + val schema = spark.table(tableName).schema + val videoField = schema.find(_.name == "video").get + assertTrue(videoField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD)) + assertEquals(HoodieSchemaType.BLOB.name(), videoField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + assertEquals("Product demonstration video", videoField.metadata.getString("comment")) + + // Verify structure matches blob schema + assertTrue(videoField.dataType.isInstanceOf[StructType]) + assertEquals(BlobType(), videoField.dataType) + } + } + + test("test create table with multiple BLOB columns") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |CREATE TABLE $tableName ( + | id BIGINT, + | video BLOB, + | thumbnail blob, + | metadata MAP<STRING, STRING>, + | audio BLOB NOT NULL + |) USING hudi + |LOCATION '${tmp.getCanonicalPath}' + |TBLPROPERTIES ( + | primaryKey = 'id' + |) + """.stripMargin) + + val schema = spark.table(tableName).schema + + // Verify all BLOB columns have the metadata + val blobColumns = Seq("video", "thumbnail", "audio") + blobColumns.foreach { colName => + val field = schema.find(_.name == colName).get + assertTrue(field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD)) + assertEquals(HoodieSchemaType.BLOB.name(), field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + assertTrue(field.dataType.isInstanceOf[StructType]) + + if (colName == "audio") { + assertFalse(field.nullable) + } else { + assertTrue(field.nullable) + } + + val blobStruct = field.dataType.asInstanceOf[StructType] + assertEquals(BlobType(), blobStruct) + } + } + } + + test("test BLOB in nested struct") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |CREATE TABLE $tableName ( + | id BIGINT, + | media STRUCT<title: STRING, content: BLOB> + |) USING hudi + |LOCATION '${tmp.getCanonicalPath}' + |TBLPROPERTIES ( + | primaryKey = 'id' + |) + """.stripMargin) + + val schema = spark.table(tableName).schema + val mediaField = schema.find(_.name == "media").get + assertTrue(mediaField.dataType.isInstanceOf[StructType]) + + val mediaStruct = mediaField.dataType.asInstanceOf[StructType] + val contentField = mediaStruct.find(_.name == "content").get + + // Verify nested BLOB has metadata + assertTrue(contentField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD)) + assertEquals(HoodieSchemaType.BLOB.name(), contentField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + + // Verify structure + assertTrue(contentField.dataType.isInstanceOf[StructType]) + val blobStruct = contentField.dataType.asInstanceOf[StructType] + assertEquals(BlobType(), blobStruct) + } + } Review Comment: 🤖 Good catch. The same issue would occur with HAVING, GROUP BY expressions, or ORDER BY — anywhere `ReadBlobExpression` appears outside a Project or Filter node. Since `ReadBlobRule` only matches `Project` and `Filter`, any other node type leaves the `Unevaluable` expression unresolved, leading to the opaque codegen error. A validation pass that detects unresolved `ReadBlobExpression` nodes after the rule runs and throws a clear error would help a lot here. ########## hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala: ########## @@ -76,6 +76,17 @@ trait SparkAdapter extends Serializable { */ def injectTableFunctions(extensions: SparkSessionExtensions): Unit = {} + /** + * Inject scalar functions into Spark SQL function registry. + * These functions can be used in SQL SELECT clauses. + */ Review Comment: 🤖 These are abstract methods being added to `SparkAdapter` which is a trait implemented by version-specific adapters (Spark 3.3, 3.4, 3.5, etc.). Have all implementations been updated? If any implementation is missing these methods, compilation will fail for that Spark version. I see only 3.3.x and 3.4.x in the PR description — is 3.5.x also covered? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala: ########## @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.blob + +import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.functions._ +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +/** + * Tests for the read_blob() SQL function. + * + * This test suite verifies: + * <ul> + * <li>Basic SQL integration with read_blob()</li> + * <li>Integration with WHERE clauses, JOINs</li> + * <li>Configuration parameter handling</li> + * <li>Error handling for invalid inputs</li> + * </ul> + */ +class TestReadBlobSQL extends HoodieClientTestBase { + + @Test + def testBasicReadBlobSQL(): Unit = { + val filePath = createTestFile(tempDir, "basic.bin", 10000) + + // Create table with blob column + val df = sparkSession.createDataFrame(Seq( + (1, "record1", filePath, 0L, 100L), + (2, "record2", filePath, 100L, 100L), + (3, "record3", filePath, 200L, 100L) + )).toDF("id", "name", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "name", "file_info") + + df.createOrReplaceTempView("test_table") + + // Use SQL with read_blob + val result = sparkSession.sql(""" + SELECT id, name, read_blob(file_info) as data + FROM test_table + WHERE id <= 2 + """) + + val rows = result.collect() + assertEquals(2, rows.length) + + // Verify data is binary + val data1 = rows(0).getAs[Array[Byte]]("data") + assertEquals(100, data1.length) + + // Verify content matches expected pattern + assertBytesContent(data1) + + val data2 = rows(1).getAs[Array[Byte]]("data") + assertEquals(100, data2.length) + assertBytesContent(data2, expectedOffset = 100) + } + + @Test + def testReadBlobWithJoin(): Unit = { + val filePath = createTestFile(tempDir, "join.bin", 10000) + + // Create blob table + val blobDF = sparkSession.createDataFrame(Seq( + (1, filePath, 0L, 100L), + (2, filePath, 100L, 100L) + )).toDF("id", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "file_info") + + blobDF.createOrReplaceTempView("blob_table") + + // Create metadata table + val metaDF = sparkSession.createDataFrame(Seq( + (1, "Alice"), + (2, "Bob") + )).toDF("id", "name") + + metaDF.createOrReplaceTempView("meta_table") + + // SQL with JOIN + val result = sparkSession.sql(""" + SELECT m.id, m.name, read_blob(b.file_info) as data + FROM meta_table m + JOIN blob_table b ON m.id = b.id + ORDER BY m.id + """) + + val rows = result.collect() + assertEquals(2, rows.length) + assertEquals("Alice", rows(0).getAs[String]("name")) + assertEquals(100, rows(0).getAs[Array[Byte]]("data").length) + assertEquals("Bob", rows(1).getAs[String]("name")) + assertEquals(100, rows(1).getAs[Array[Byte]]("data").length) + + // Verify data content + val data1 = rows(0).getAs[Array[Byte]]("data") + assertBytesContent(data1) + } + + @Test + def testReadBlobWithOrderBy(): Unit = { + val filePath = createTestFile(tempDir, "order.bin", 10000) + + val df = sparkSession.createDataFrame(Seq( + (3, filePath, 200L, 50L), + (1, filePath, 0L, 50L), + (2, filePath, 100L, 50L) + )).toDF("id", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "file_info") + + df.createOrReplaceTempView("order_table") + + // SQL with ORDER BY + val result = sparkSession.sql(""" + SELECT id, read_blob(file_info) as data + FROM order_table + ORDER BY id + """) + + val rows = result.collect() + assertEquals(3, rows.length) + assertEquals(1, rows(0).getAs[Int]("id")) + assertEquals(2, rows(1).getAs[Int]("id")) + assertEquals(3, rows(2).getAs[Int]("id")) + + // Verify data content for ordered results + val data1 = rows(0).getAs[Array[Byte]]("data") + assertBytesContent(data1) + } + + @Test + def testReadBlobInSubquery(): Unit = { + val filePath = createTestFile(tempDir, "subquery.bin", 10000) + + val df = sparkSession.createDataFrame(Seq( + (1, "A", filePath, 0L, 100L), + (2, "A", filePath, 100L, 100L), + (3, "B", filePath, 200L, 100L) + )).toDF("id", "category", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "category", "file_info") + + df.createOrReplaceTempView("subquery_table") + + // SQL with subquery + val result = sparkSession.sql(""" + SELECT * FROM ( + SELECT id, category, read_blob(file_info) as data + FROM subquery_table + ) WHERE category = 'A' + """) + + val rows = result.collect() + assertEquals(2, rows.length) + rows.foreach { row => + assertEquals("A", row.getAs[String]("category")) + assertEquals(100, row.getAs[Array[Byte]]("data").length) + } + } + + @Test + def testConfigurationParameters(): Unit = { + val filePath = createTestFile(tempDir, "config.bin", 50000) + + val df = sparkSession.createDataFrame(Seq( + (1, filePath, 0L, 100L), + (2, filePath, 5000L, 100L), // 4.9KB gap + (3, filePath, 10000L, 100L) + )).toDF("id", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "file_info") + + df.createOrReplaceTempView("config_table") + + // Use withSparkConfig to automatically manage configuration + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "10000", + "hoodie.blob.batching.lookahead.size" -> "100" + )) { + val result = sparkSession.sql(""" + SELECT id, read_blob(file_info) as data + FROM config_table + """) + + val rows = result.collect() + assertEquals(3, rows.length) + + // Verify all reads completed successfully + rows.foreach { row => + assertEquals(100, row.getAs[Array[Byte]]("data").length) + } + } + } + + @Test + def testMultipleReadBlobInSameQuery(): Unit = { Review Comment: 🤖 I traced through the code and I think this is a real correctness issue. When `ReadBlobRule.wrapWithBlobReads` handles two blob columns, it creates nested `BatchedBlobRead` nodes: `BatchedBlobRead(BatchedBlobRead(child, file_info1), file_info2)`. Each `BatchedBlobReadExec.doExecute()` appends a column named `__temp__data` (the fixed `DATA_COL` constant). The outer exec's `processRDD` then creates an `outputSchema` with two columns both named `__temp__data`. While the Catalyst attribute resolution uses `ExprId` so the logical plan is fine, the physical execution uses `schema.fieldIndex(columnName)` which is name-based — and when duplicate names exist, `fieldIndex` returns the first match. I'd suggest either giving each `BatchedBlobRead.dataAttr` a unique name (e.g. `__temp__data_${blobAttr.name}`) or restructuring to resolve one blob at a time in separate passes. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala: ########## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.HoodieSparkUtils.sparkAdapter +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StorageConfiguration, StoragePath} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, SpecificInternalRow} +import org.apache.spark.sql.types.{BinaryType, BlobType, DataType, StructField, StructType} +import org.slf4j.LoggerFactory + +import java.io.InputStream + +import scala.collection.mutable.ArrayBuffer + +/** + * Batched byte range reader that optimizes I/O by combining consecutive reads for out-of-line data. + * + * This reader analyzes sequences of read requests within a partition and merges + * consecutive or nearby reads into single I/O operations. This significantly reduces + * the number of seeks and reads when processing sorted data. + * + * <h3>Schema Requirement:</h3> + * The blob column must match the schema defined in {@link HoodieSchema.Blob}: + * <pre> + * struct { + * type: string // "inline" or "out_of_line" + * data: binary (nullable) // inline data (null for out_of_line) + * reference: struct (nullable) { // file reference (null for inline) + * external_path: string + * offset: long + * length: long + * managed: boolean + * } + * } + * </pre> + * + * <h3>Key Features:</h3> + * <ul> + * <li>Batches consecutive reads from the same file</li> + * <li>Configurable gap threshold for merging nearby reads</li> + * <li>Lookahead buffer to identify batch opportunities</li> + * <li>Preserves input row order in output</li> + * </ul> + * + * <h3>Usage Example:</h3> + * {{{ + * import org.apache.hudi.udf.BatchedByteRangeReader + * import org.apache.spark.sql.functions._ + * // Read a table with a blob column (e.g. image_data) + * val df = spark.read.format("hudi").load("/my_path").select("image_data", "record_id") + * + * // Read with batching (best when data is sorted by external_path, offset) + * val result = BatchedByteRangeReader.readBatched(df, structColName = "file_info") + * + * // Result has: image_data, record_id, data + * result.show() + * }}} + * + * <h3>Performance Tips:</h3> + * <ul> + * <li>Sort input by (blob.reference.external_path, blob.reference.offset) for maximum batching effectiveness</li> + * <li>Increase lookaheadSize for better batch detection (at cost of memory)</li> + * <li>Tune maxGapBytes based on your data access patterns</li> + * </ul> + * + * @param storage HoodieStorage instance for file I/O + * @param maxGapBytes Maximum gap between ranges to consider for batching (default: 4KB) + * @param lookaheadSize Number of rows to buffer for batch detection (default: 50) + */ +class BatchedBlobReader( + storage: HoodieStorage, + maxGapBytes: Int = 4096, + lookaheadSize: Int = 50) { + + private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader]) + + /** + * Process a partition iterator, batching consecutive reads. + * + * This method consumes the input iterator and produces an output iterator + * with each row containing the original data plus a "data" column with the + * bytes read from the file. + * + * @param rows Iterator of input rows with struct column + * @param structColIdx Index of the struct column in the row + * @param outputSchema Schema for output rows + * @param accessor Type class for accessing row fields + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Iterator of output rows with data column added + */ + def processPartition[R]( + rows: Iterator[R], + structColIdx: Int, + outputSchema: StructType) + (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] = { + + // Create buffered iterator for lookahead + val bufferedRows = rows.buffered + + // Result buffer to maintain order + val resultIterator = new Iterator[R] { + private var currentBatch: Iterator[R] = Iterator.empty + private var rowIndex = 0L + + override def hasNext: Boolean = { + if (currentBatch.hasNext) { + true + } else if (bufferedRows.hasNext) { + // Process next batch + currentBatch = processNextBatch() + currentBatch.hasNext + } else { + false + } + } + + override def next(): R = { + if (!hasNext) { + throw new NoSuchElementException("No more rows") + } + currentBatch.next() + } + + /** + * Collect and process the next batch of rows. + */ + private def processNextBatch(): Iterator[R] = { + // Collect up to lookaheadSize rows with their original indices + val batch = collectBatch() + + if (batch.isEmpty) { + Iterator.empty + } else { + // Partition the batch into three groups + val (inlineRows, outOfLineRows) = batch.partition(_.inlineBytes.isDefined) + val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 0) + + // Case 1: Inline — return bytes directly without I/O + val inlineResults = inlineRows.map { ri => + RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, outputSchema), ri.index) + } + + // Case 2: Whole-file reads + val wholeFileResults = wholeFileRows.map(readWholeFile(_, outputSchema)) + + // Case 3: Regular range reads — merge consecutive ranges and batch + val mergedRanges = identifyConsecutiveRanges(rangeRows) + val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, outputSchema)) + + // Sort by original index to preserve input order + (inlineResults ++ wholeFileResults ++ rangeResults).sortBy(_.index).map(_.row).iterator + } + } + + /** + * Collect up to lookaheadSize rows from the input iterator. + */ + private def collectBatch(): Seq[RowInfo[R]] = { + val batch = ArrayBuffer[RowInfo[R]]() + var collected = 0 + + while (bufferedRows.hasNext && collected < lookaheadSize) { + val row = bufferedRows.next() + // Handle null struct column (null blob) + if (accessor.isNullAt(row, structColIdx)) { + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(null) + ) + rowIndex += 1 + collected += 1 + } else { + val blobStruct = accessor.getStruct(row, structColIdx, HoodieSchema.Blob.getFieldCount) + // Dispatch based on storage_type (field 0) + val storageType = accessor.getString(blobStruct, 0) + if (storageType == HoodieSchema.Blob.INLINE) { + // Case 1: Inline — bytes are in field 1 + val bytes = accessor.getBytes(blobStruct, 1) + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(bytes) + ) + } else { + // Case 2 or 3: Out-of-line — get reference struct (field 2) + val referenceStruct = accessor.getStruct(blobStruct, 2, HoodieSchema.Blob.getReferenceFieldCount) + val filePath = accessor.getString(referenceStruct, 0) + val offsetIsNull = accessor.isNullAt(referenceStruct, 1) + val lengthIsNull = accessor.isNullAt(referenceStruct, 2) + if (offsetIsNull || lengthIsNull) { + // Case 2: Whole-file read — no offset/length specified; sentinel length = -1 + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = 0, + length = -1, + index = rowIndex + ) + } else { + // Case 3: Regular range read + val offset = accessor.getLong(referenceStruct, 1) + val length = accessor.getLong(referenceStruct, 2) + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = offset, + length = length, + index = rowIndex + ) + } + } + rowIndex += 1 + collected += 1 + } + } + batch.toSeq + } + } + + resultIterator + } + + /** + * Identify consecutive ranges that can be batched together. + * + * This method groups rows by file path, sorts by offset, and merges + * ranges that are consecutive or within maxGapBytes of each other. + * + * @param rows Sequence of row information + * @return Sequence of merged ranges + */ + private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + // Group by file path + val byFile = rows.groupBy(_.filePath) + + val allRanges = ArrayBuffer[MergedRange[R]]() + + byFile.foreach { case (filePath, fileRows) => + // Sort by offset + val sorted = fileRows.sortBy(_.offset) + + // Merge consecutive ranges + val merged = mergeRanges(sorted, maxGapBytes) + allRanges ++= merged + } + + allRanges.toSeq + } + + /** + * Merge consecutive ranges within the gap threshold. + * + * @param rows Sorted rows from the same file + * @param maxGap Maximum gap to consider for merging + * @return Sequence of merged ranges + */ + private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + + val result = ArrayBuffer[MergedRange[R]]() Review Comment: 🤖 When `mergeRanges` encounters a row with `gap < 0` (overlapping ranges), it starts a new range instead of merging. But `identifyConsecutiveRanges` groups by filePath and sorts by offset only — if two rows have the same offset but different lengths, or partially overlap, the later range's `startOffset` could be less than `current.endOffset`, creating a negative gap. The new range would then re-read overlapping bytes, which is wasteful but functionally correct. However, should we assert or log a warning for this unexpected case? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala: ########## @@ -0,0 +1,732 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.HoodieSparkUtils.sparkAdapter +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StorageConfiguration, StoragePath} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Encoders, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, SpecificInternalRow} +import org.apache.spark.sql.types.{BinaryType, StructField, StructType} +import org.slf4j.LoggerFactory + +import scala.collection.mutable.ArrayBuffer + +/** + * Batched byte range reader that optimizes I/O by combining consecutive reads for out-of-line data. + * + * This reader analyzes sequences of read requests within a partition and merges + * consecutive or nearby reads into single I/O operations. This significantly reduces + * the number of seeks and reads when processing sorted data. + * + * <h3>Schema Requirement:</h3> + * The blob column must match the schema defined in {@link HoodieSchema.Blob}: + * <pre> + * struct { + * type: string // "inline" or "out_of_line" + * data: binary (nullable) // inline data (null for out_of_line) + * reference: struct (nullable) { // file reference (null for inline) + * external_path: string + * offset: long + * length: long + * managed: boolean + * } + * } + * </pre> + * + * <h3>Key Features:</h3> + * <ul> + * <li>Batches consecutive reads from the same file</li> + * <li>Configurable gap threshold for merging nearby reads</li> + * <li>Lookahead buffer to identify batch opportunities</li> + * <li>Preserves input row order in output</li> + * </ul> + * + * <h3>Usage Example:</h3> + * {{{ + * import org.apache.hudi.udf.BatchedByteRangeReader + * import org.apache.spark.sql.functions._ + * // Read a table with a blob column (e.g. image_data) + * val df = spark.read.format("hudi").load("/my_path").select("image_data", "record_id") + * + * // Read with batching (best when data is sorted by external_path, offset) + * val result = BatchedByteRangeReader.readBatched(df, structColName = "file_info") + * + * // Result has: image_data, record_id, data + * result.show() + * }}} + * + * <h3>Performance Tips:</h3> + * <ul> + * <li>Sort input by (blob.reference.external_path, blob.reference.offset) for maximum batching effectiveness</li> + * <li>Increase lookaheadSize for better batch detection (at cost of memory)</li> + * <li>Tune maxGapBytes based on your data access patterns</li> + * </ul> + * + * @param storage HoodieStorage instance for file I/O + * @param maxGapBytes Maximum gap between ranges to consider for batching (default: 4KB) + * @param lookaheadSize Number of rows to buffer for batch detection (default: 50) + */ +class BatchedBlobReader( + storage: HoodieStorage, + maxGapBytes: Int = 4096, + lookaheadSize: Int = 50) { + + private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader]) + + /** + * Process a partition iterator, batching consecutive reads. + * + * This method consumes the input iterator and produces an output iterator + * with each row containing the original data plus a "data" column with the + * bytes read from the file. + * + * @param rows Iterator of input rows with struct column + * @param structColIdx Index of the struct column in the row + * @param outputSchema Schema for output rows + * @param accessor Type class for accessing row fields + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Iterator of output rows with data column added + */ + def processPartition[R]( + rows: Iterator[R], + structColIdx: Int, + outputSchema: StructType) + (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] = { + + // Create buffered iterator for lookahead + val bufferedRows = rows.buffered + + // Result buffer to maintain order + val resultIterator = new Iterator[R] { + private var currentBatch: Iterator[R] = Iterator.empty + private var rowIndex = 0L + + override def hasNext: Boolean = { + if (currentBatch.hasNext) { + true + } else if (bufferedRows.hasNext) { + // Process next batch + currentBatch = processNextBatch() + currentBatch.hasNext + } else { + false + } + } + + override def next(): R = { + if (!hasNext) { + throw new NoSuchElementException("No more rows") + } + currentBatch.next() + } + + /** + * Collect and process the next batch of rows. + */ + private def processNextBatch(): Iterator[R] = { + // Collect up to lookaheadSize rows with their original indices + val batch = collectBatch() + + if (batch.isEmpty) { + Iterator.empty + } else { + // Partition the batch into three groups + val (inlineRows, outOfLineRows) = batch.partition(_.inlineBytes.isDefined) + val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 0) + + // Case 1: Inline — return bytes directly without I/O + val inlineResults = inlineRows.map { ri => + RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, outputSchema), ri.index) + } + + // Case 2: Whole-file reads + val wholeFileResults = wholeFileRows.map(readWholeFile(_, outputSchema)) + + // Case 3: Regular range reads — merge consecutive ranges and batch + val mergedRanges = identifyConsecutiveRanges(rangeRows) + val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, outputSchema)) + + // Sort by original index to preserve input order + (inlineResults ++ wholeFileResults ++ rangeResults).sortBy(_.index).map(_.row).iterator + } + } + + /** + * Collect up to lookaheadSize rows from the input iterator. + */ + private def collectBatch(): Seq[RowInfo[R]] = { + val batch = ArrayBuffer[RowInfo[R]]() + var collected = 0 + + while (bufferedRows.hasNext && collected < lookaheadSize) { + val row = bufferedRows.next() + val blobStruct = accessor.getStruct(row, structColIdx, HoodieSchema.Blob.getFieldCount) + + // Dispatch based on storage_type (field 0) + val storageType = accessor.getString(blobStruct, 0) + + if (storageType == HoodieSchema.Blob.INLINE) { + // Case 1: Inline — bytes are in field 1 + val bytes = accessor.getBytes(blobStruct, 1) + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(bytes) + ) + } else { + // Case 2 or 3: Out-of-line — get reference struct (field 2) + val referenceStruct = accessor.getStruct(blobStruct, 2, HoodieSchema.Blob.getReferenceFieldCount) + val filePath = accessor.getString(referenceStruct, 0) + val offsetIsNull = accessor.isNullAt(referenceStruct, 1) + val lengthIsNull = accessor.isNullAt(referenceStruct, 2) + + if (offsetIsNull || lengthIsNull) { + // Case 2: Whole-file read — no offset/length specified; sentinel length = -1 + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = 0, + length = -1, + index = rowIndex + ) + } else { + // Case 3: Regular range read + val offset = accessor.getLong(referenceStruct, 1) + val length = accessor.getLong(referenceStruct, 2) + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = offset, + length = length, + index = rowIndex + ) + } + } + + rowIndex += 1 + collected += 1 + } + + batch.toSeq + } + } + + resultIterator + } + + /** + * Identify consecutive ranges that can be batched together. + * + * This method groups rows by file path, sorts by offset, and merges + * ranges that are consecutive or within maxGapBytes of each other. + * + * @param rows Sequence of row information + * @return Sequence of merged ranges + */ + private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + // Group by file path + val byFile = rows.groupBy(_.filePath) + + val allRanges = ArrayBuffer[MergedRange[R]]() + + byFile.foreach { case (filePath, fileRows) => + // Sort by offset + val sorted = fileRows.sortBy(_.offset) + + // Merge consecutive ranges + val merged = mergeRanges(sorted, maxGapBytes) + allRanges ++= merged + } + + allRanges.toSeq + } + + /** + * Merge consecutive ranges within the gap threshold. + * + * @param rows Sorted rows from the same file + * @param maxGap Maximum gap to consider for merging + * @return Sequence of merged ranges + */ + private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + + val result = ArrayBuffer[MergedRange[R]]() + var current: MergedRange[R] = null + + rows.foreach { row => + if (current == null) { + // Start first range + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } else { + val gap = row.offset - current.endOffset + + if (gap >= 0 && gap <= maxGap) { + // Merge into current range + current = current.merge(row) + } else { + // Save current range and start new one + result += current + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } + } + } + + // Add final range + if (current != null) { + result += current + } + + result.toSeq + } + + /** + * Read an entire file and return it as a single row result. + * + * Used for whole-file out-of-line blobs where no offset or length is specified. + * + * @param rowInfo Row information with the file path + * @param outputSchema Schema for output rows + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Sequence containing a single row result + */ + private def readWholeFile[R]( + rowInfo: RowInfo[R], + outputSchema: StructType) + (implicit builder: RowBuilder[R]): RowResult[R] = { + + var inputStream: SeekableDataInputStream = null + try { + val path = new StoragePath(rowInfo.filePath) + val fileLength = storage.getPathInfo(path).getLength.toInt Review Comment: 🤖 Agreed. Beyond the `Long.toInt` in `readAndSplitRange`, `readWholeFile` also uses `readAllBytes()` which internally allocates an `ArrayList<byte[]>` and is capped at `Integer.MAX_VALUE`. For out-of-line blobs larger than 2GB neither path will work correctly. At minimum, adding an explicit size check with a clear error message (e.g. "blob size exceeds 2GB limit") before the `.toInt` cast would prevent silent corruption. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala: ########## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.HoodieSparkUtils.sparkAdapter +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StorageConfiguration, StoragePath} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, SpecificInternalRow} +import org.apache.spark.sql.types.{BinaryType, BlobType, DataType, StructField, StructType} +import org.slf4j.LoggerFactory + +import java.io.InputStream + +import scala.collection.mutable.ArrayBuffer + +/** + * Batched byte range reader that optimizes I/O by combining consecutive reads for out-of-line data. + * + * This reader analyzes sequences of read requests within a partition and merges + * consecutive or nearby reads into single I/O operations. This significantly reduces + * the number of seeks and reads when processing sorted data. + * + * <h3>Schema Requirement:</h3> + * The blob column must match the schema defined in {@link HoodieSchema.Blob}: + * <pre> + * struct { + * type: string // "inline" or "out_of_line" + * data: binary (nullable) // inline data (null for out_of_line) + * reference: struct (nullable) { // file reference (null for inline) + * external_path: string + * offset: long + * length: long + * managed: boolean + * } + * } + * </pre> + * + * <h3>Key Features:</h3> + * <ul> + * <li>Batches consecutive reads from the same file</li> + * <li>Configurable gap threshold for merging nearby reads</li> + * <li>Lookahead buffer to identify batch opportunities</li> + * <li>Preserves input row order in output</li> + * </ul> + * + * <h3>Usage Example:</h3> + * {{{ + * import org.apache.hudi.udf.BatchedByteRangeReader + * import org.apache.spark.sql.functions._ + * // Read a table with a blob column (e.g. image_data) + * val df = spark.read.format("hudi").load("/my_path").select("image_data", "record_id") + * + * // Read with batching (best when data is sorted by external_path, offset) + * val result = BatchedByteRangeReader.readBatched(df, structColName = "file_info") + * + * // Result has: image_data, record_id, data + * result.show() + * }}} + * + * <h3>Performance Tips:</h3> + * <ul> + * <li>Sort input by (blob.reference.external_path, blob.reference.offset) for maximum batching effectiveness</li> + * <li>Increase lookaheadSize for better batch detection (at cost of memory)</li> + * <li>Tune maxGapBytes based on your data access patterns</li> + * </ul> + * + * @param storage HoodieStorage instance for file I/O + * @param maxGapBytes Maximum gap between ranges to consider for batching (default: 4KB) + * @param lookaheadSize Number of rows to buffer for batch detection (default: 50) + */ +class BatchedBlobReader( + storage: HoodieStorage, + maxGapBytes: Int = 4096, + lookaheadSize: Int = 50) { + + private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader]) + + /** + * Process a partition iterator, batching consecutive reads. + * + * This method consumes the input iterator and produces an output iterator + * with each row containing the original data plus a "data" column with the + * bytes read from the file. + * + * @param rows Iterator of input rows with struct column + * @param structColIdx Index of the struct column in the row + * @param outputSchema Schema for output rows + * @param accessor Type class for accessing row fields + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Iterator of output rows with data column added + */ + def processPartition[R]( + rows: Iterator[R], + structColIdx: Int, + outputSchema: StructType) + (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] = { + + // Create buffered iterator for lookahead + val bufferedRows = rows.buffered + + // Result buffer to maintain order + val resultIterator = new Iterator[R] { + private var currentBatch: Iterator[R] = Iterator.empty + private var rowIndex = 0L + + override def hasNext: Boolean = { + if (currentBatch.hasNext) { + true + } else if (bufferedRows.hasNext) { + // Process next batch + currentBatch = processNextBatch() + currentBatch.hasNext + } else { + false + } + } + + override def next(): R = { + if (!hasNext) { + throw new NoSuchElementException("No more rows") + } + currentBatch.next() + } + + /** + * Collect and process the next batch of rows. + */ + private def processNextBatch(): Iterator[R] = { + // Collect up to lookaheadSize rows with their original indices + val batch = collectBatch() + + if (batch.isEmpty) { + Iterator.empty + } else { + // Partition the batch into three groups + val (inlineRows, outOfLineRows) = batch.partition(_.inlineBytes.isDefined) + val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 0) + + // Case 1: Inline — return bytes directly without I/O + val inlineResults = inlineRows.map { ri => + RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, outputSchema), ri.index) + } + + // Case 2: Whole-file reads + val wholeFileResults = wholeFileRows.map(readWholeFile(_, outputSchema)) + + // Case 3: Regular range reads — merge consecutive ranges and batch + val mergedRanges = identifyConsecutiveRanges(rangeRows) + val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, outputSchema)) + + // Sort by original index to preserve input order + (inlineResults ++ wholeFileResults ++ rangeResults).sortBy(_.index).map(_.row).iterator + } + } + + /** + * Collect up to lookaheadSize rows from the input iterator. + */ + private def collectBatch(): Seq[RowInfo[R]] = { + val batch = ArrayBuffer[RowInfo[R]]() + var collected = 0 + + while (bufferedRows.hasNext && collected < lookaheadSize) { + val row = bufferedRows.next() + // Handle null struct column (null blob) + if (accessor.isNullAt(row, structColIdx)) { + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(null) + ) + rowIndex += 1 + collected += 1 + } else { + val blobStruct = accessor.getStruct(row, structColIdx, HoodieSchema.Blob.getFieldCount) + // Dispatch based on storage_type (field 0) + val storageType = accessor.getString(blobStruct, 0) + if (storageType == HoodieSchema.Blob.INLINE) { + // Case 1: Inline — bytes are in field 1 + val bytes = accessor.getBytes(blobStruct, 1) + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(bytes) + ) + } else { + // Case 2 or 3: Out-of-line — get reference struct (field 2) + val referenceStruct = accessor.getStruct(blobStruct, 2, HoodieSchema.Blob.getReferenceFieldCount) + val filePath = accessor.getString(referenceStruct, 0) + val offsetIsNull = accessor.isNullAt(referenceStruct, 1) + val lengthIsNull = accessor.isNullAt(referenceStruct, 2) + if (offsetIsNull || lengthIsNull) { + // Case 2: Whole-file read — no offset/length specified; sentinel length = -1 + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = 0, + length = -1, + index = rowIndex + ) + } else { + // Case 3: Regular range read + val offset = accessor.getLong(referenceStruct, 1) + val length = accessor.getLong(referenceStruct, 2) + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = offset, + length = length, + index = rowIndex + ) + } + } + rowIndex += 1 + collected += 1 + } + } + batch.toSeq + } + } + + resultIterator + } + + /** + * Identify consecutive ranges that can be batched together. + * + * This method groups rows by file path, sorts by offset, and merges + * ranges that are consecutive or within maxGapBytes of each other. + * + * @param rows Sequence of row information + * @return Sequence of merged ranges + */ + private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + // Group by file path + val byFile = rows.groupBy(_.filePath) + + val allRanges = ArrayBuffer[MergedRange[R]]() + + byFile.foreach { case (filePath, fileRows) => + // Sort by offset + val sorted = fileRows.sortBy(_.offset) + + // Merge consecutive ranges + val merged = mergeRanges(sorted, maxGapBytes) + allRanges ++= merged + } + + allRanges.toSeq + } + + /** + * Merge consecutive ranges within the gap threshold. + * + * @param rows Sorted rows from the same file + * @param maxGap Maximum gap to consider for merging + * @return Sequence of merged ranges + */ + private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + + val result = ArrayBuffer[MergedRange[R]]() + var current: MergedRange[R] = null + + rows.foreach { row => + if (current == null) { + // Start first range + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } else { + val gap = row.offset - current.endOffset + + if (gap >= 0 && gap <= maxGap) { + // Merge into current range + current = current.merge(row) + } else { + // Save current range and start new one + result += current + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } + } + } + + // Add final range + if (current != null) { + result += current + } + + result.toSeq + } + + /** + * Read an entire file and return it as a single row result. + * + * Used for whole-file out-of-line blobs where no offset or length is specified. + * + * @param rowInfo Row information with the file path + * @param outputSchema Schema for output rows + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Sequence containing a single row result + */ + private def readWholeFile[R]( + rowInfo: RowInfo[R], + outputSchema: StructType) + (implicit builder: RowBuilder[R]): RowResult[R] = { + Review Comment: 🤖 Same `Long.toInt` truncation risk here: `rowInfo.length.toInt` will silently wrap for individual blobs > 2GB. Worth adding validation when constructing `RowInfo` or before this slice operation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
