rahil-c commented on code in PR #18098:
URL: https://github.com/apache/hudi/pull/18098#discussion_r2956370028


##########
hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala:
##########
@@ -128,7 +128,8 @@ class HoodieSpark3_3ExtendedSqlParser(session: 
SparkSession, delegate: ParserInt
       normalized.contains("create index") ||
       normalized.contains("drop index") ||
       normalized.contains("show indexes") ||
-      normalized.contains("refresh index")
+      normalized.contains("refresh index") ||
+      normalized.contains(" blob")

Review Comment:
   Would this match on something such as `SELECT * FROM blob_table`? If so then 
i think we should maybe match it on the `read_blob` function, or is there a 
more precise way to match here?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala:
##########
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.io.SeekableDataInputStream
+import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, 
StorageConfiguration, StoragePath}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoders, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, 
SpecificInternalRow}
+import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Batched byte range reader that optimizes I/O by combining consecutive reads 
for out-of-line data.
+ *
+ * This reader analyzes sequences of read requests within a partition and 
merges
+ * consecutive or nearby reads into single I/O operations. This significantly 
reduces
+ * the number of seeks and reads when processing sorted data.
+ *
+ * <h3>Schema Requirement:</h3>
+ * The blob column must match the schema defined in {@link HoodieSchema.Blob}:
+ * <pre>
+ * struct {
+ *   type: string                   // "inline" or "out_of_line"
+ *   data: binary (nullable)       // inline data (null for out_of_line)
+ *   reference: struct (nullable) { // file reference (null for inline)
+ *     external_path: string
+ *     offset: long
+ *     length: long
+ *     managed: boolean
+ *   }
+ * }
+ * </pre>
+ *
+ * <h3>Key Features:</h3>
+ * <ul>
+ *   <li>Batches consecutive reads from the same file</li>
+ *   <li>Configurable gap threshold for merging nearby reads</li>
+ *   <li>Lookahead buffer to identify batch opportunities</li>
+ *   <li>Preserves input row order in output</li>
+ * </ul>
+ *
+ * <h3>Usage Example:</h3>
+ * {{{
+ * import org.apache.hudi.udf.BatchedByteRangeReader
+ * import org.apache.spark.sql.functions._
+ * // Read a table with a blob column (e.g. image_data)
+ * val df = spark.read.format("hudi").load("/my_path").select("image_data", 
"record_id")
+ *
+ * // Read with batching (best when data is sorted by external_path, offset)
+ * val result = BatchedByteRangeReader.readBatched(df, structColName = 
"file_info")
+ *
+ * // Result has: image_data, record_id, data
+ * result.show()
+ * }}}
+ *
+ * <h3>Performance Tips:</h3>
+ * <ul>
+ *   <li>Sort input by (blob.reference.external_path, blob.reference.offset) 
for maximum batching effectiveness</li>
+ *   <li>Increase lookaheadSize for better batch detection (at cost of 
memory)</li>
+ *   <li>Tune maxGapBytes based on your data access patterns</li>
+ * </ul>
+ *
+ * @param storage        HoodieStorage instance for file I/O
+ * @param maxGapBytes    Maximum gap between ranges to consider for batching 
(default: 4KB)
+ * @param lookaheadSize  Number of rows to buffer for batch detection 
(default: 50)
+ */
+class BatchedBlobReader(
+    storage: HoodieStorage,
+    maxGapBytes: Int = 4096,
+    lookaheadSize: Int = 50) {
+
+  private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader])
+
+  /**
+   * Process a partition iterator, batching consecutive reads.
+   *
+   * This method consumes the input iterator and produces an output iterator
+   * with each row containing the original data plus a "data" column with the
+   * bytes read from the file.
+   *
+   * @param rows         Iterator of input rows with struct column
+   * @param structColIdx Index of the struct column in the row
+   * @param outputSchema Schema for output rows
+   * @param accessor     Type class for accessing row fields
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Iterator of output rows with data column added
+   */
+  def processPartition[R](
+      rows: Iterator[R],
+      structColIdx: Int,
+      outputSchema: StructType)
+      (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] 
= {
+
+    // Create buffered iterator for lookahead
+    val bufferedRows = rows.buffered
+
+    // Result buffer to maintain order
+    val resultIterator = new Iterator[R] {
+      private var currentBatch: Iterator[R] = Iterator.empty
+      private var rowIndex = 0L
+
+      override def hasNext: Boolean = {
+        if (currentBatch.hasNext) {
+          true
+        } else if (bufferedRows.hasNext) {
+          // Process next batch
+          currentBatch = processNextBatch()
+          currentBatch.hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): R = {
+        if (!hasNext) {
+          throw new NoSuchElementException("No more rows")
+        }
+        currentBatch.next()
+      }
+
+      /**
+       * Collect and process the next batch of rows.
+       */
+      private def processNextBatch(): Iterator[R] = {
+        // Collect up to lookaheadSize rows with their original indices
+        val batch = collectBatch()
+
+        if (batch.isEmpty) {
+          Iterator.empty
+        } else {
+          // Partition the batch into three groups
+          val (inlineRows, outOfLineRows) = 
batch.partition(_.inlineBytes.isDefined)
+          val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 
0)
+
+          // Case 1: Inline — return bytes directly without I/O
+          val inlineResults = inlineRows.map { ri =>
+            RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, 
outputSchema), ri.index)
+          }
+
+          // Case 2: Whole-file reads
+          val wholeFileResults = wholeFileRows.map(readWholeFile(_, 
outputSchema))
+
+          // Case 3: Regular range reads — merge consecutive ranges and batch
+          val mergedRanges = identifyConsecutiveRanges(rangeRows)
+          val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, 
outputSchema))
+
+          // Sort by original index to preserve input order
+          (inlineResults ++ wholeFileResults ++ 
rangeResults).sortBy(_.index).map(_.row).iterator
+        }
+      }
+
+      /**
+       * Collect up to lookaheadSize rows from the input iterator.
+       */
+      private def collectBatch(): Seq[RowInfo[R]] = {
+        val batch = ArrayBuffer[RowInfo[R]]()
+        var collected = 0
+
+        while (bufferedRows.hasNext && collected < lookaheadSize) {
+          val row = bufferedRows.next()
+          val blobStruct = accessor.getStruct(row, structColIdx, 
HoodieSchema.Blob.getFieldCount)
+
+          // Dispatch based on storage_type (field 0)
+          val storageType = accessor.getString(blobStruct, 0)
+
+          if (storageType == HoodieSchema.Blob.INLINE) {
+            // Case 1: Inline — bytes are in field 1
+            val bytes = accessor.getBytes(blobStruct, 1)
+            batch += RowInfo[R](
+              originalRow = row,
+              filePath = "",
+              offset = -1,
+              length = -1,
+              index = rowIndex,
+              inlineBytes = Some(bytes)
+            )
+          } else {
+            // Case 2 or 3: Out-of-line — get reference struct (field 2)
+            val referenceStruct = accessor.getStruct(blobStruct, 2, 
HoodieSchema.Blob.getReferenceFieldCount)
+            val filePath = accessor.getString(referenceStruct, 0)
+            val offsetIsNull = accessor.isNullAt(referenceStruct, 1)
+            val lengthIsNull = accessor.isNullAt(referenceStruct, 2)
+
+            if (offsetIsNull || lengthIsNull) {
+              // Case 2: Whole-file read — no offset/length specified; 
sentinel length = -1
+              batch += RowInfo[R](
+                originalRow = row,
+                filePath = filePath,
+                offset = 0,
+                length = -1,
+                index = rowIndex
+              )
+            } else {
+              // Case 3: Regular range read
+              val offset = accessor.getLong(referenceStruct, 1)
+              val length = accessor.getLong(referenceStruct, 2)
+              batch += RowInfo[R](
+                originalRow = row,
+                filePath = filePath,
+                offset = offset,
+                length = length,
+                index = rowIndex
+              )
+            }
+          }
+
+          rowIndex += 1
+          collected += 1
+        }
+
+        batch.toSeq
+      }
+    }
+
+    resultIterator
+  }
+
+  /**
+   * Identify consecutive ranges that can be batched together.
+   *
+   * This method groups rows by file path, sorts by offset, and merges
+   * ranges that are consecutive or within maxGapBytes of each other.
+   *
+   * @param rows Sequence of row information
+   * @return Sequence of merged ranges
+   */
+  private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): 
Seq[MergedRange[R]] = {
+    // Group by file path
+    val byFile = rows.groupBy(_.filePath)
+
+    val allRanges = ArrayBuffer[MergedRange[R]]()
+
+    byFile.foreach { case (filePath, fileRows) =>
+      // Sort by offset
+      val sorted = fileRows.sortBy(_.offset)
+
+      // Merge consecutive ranges
+      val merged = mergeRanges(sorted, maxGapBytes)
+      allRanges ++= merged
+    }
+
+    allRanges.toSeq
+  }
+
+  /**
+   * Merge consecutive ranges within the gap threshold.
+   *
+   * @param rows   Sorted rows from the same file
+   * @param maxGap Maximum gap to consider for merging
+   * @return Sequence of merged ranges
+   */
+  private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): 
Seq[MergedRange[R]] = {
+
+    val result = ArrayBuffer[MergedRange[R]]()
+    var current: MergedRange[R] = null
+
+    rows.foreach { row =>
+      if (current == null) {
+        // Start first range
+        current = MergedRange[R](
+          filePath = row.filePath,
+          startOffset = row.offset,
+          endOffset = row.offset + row.length,
+          rows = Seq(row)
+        )
+      } else {
+        val gap = row.offset - current.endOffset
+
+        if (gap >= 0 && gap <= maxGap) {
+          // Merge into current range
+          current = current.merge(row)
+        } else {
+          // Save current range and start new one
+          result += current
+          current = MergedRange[R](
+            filePath = row.filePath,
+            startOffset = row.offset,
+            endOffset = row.offset + row.length,
+            rows = Seq(row)
+          )
+        }
+      }
+    }
+
+    // Add final range
+    if (current != null) {
+      result += current
+    }
+
+    result.toSeq
+  }
+
+  /**
+   * Read an entire file and return it as a single row result.
+   *
+   * Used for whole-file out-of-line blobs where no offset or length is 
specified.
+   *
+   * @param rowInfo      Row information with the file path
+   * @param outputSchema Schema for output rows
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Sequence containing a single row result
+   */
+  private def readWholeFile[R](
+      rowInfo: RowInfo[R],
+      outputSchema: StructType)
+      (implicit builder: RowBuilder[R]): RowResult[R] = {
+
+    var inputStream: SeekableDataInputStream = null
+    try {
+      val path = new StoragePath(rowInfo.filePath)
+      val fileLength = storage.getPathInfo(path).getLength.toInt

Review Comment:
   `claude`: Integer overflow — Long.toInt casts in readWholeFile and 
readAndSplitRange will silently wrap for files > 2GB, causing 
`NegativeArraySizeException`.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala:
##########
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.io.SeekableDataInputStream
+import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, 
StorageConfiguration, StoragePath}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoders, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, 
SpecificInternalRow}
+import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Batched byte range reader that optimizes I/O by combining consecutive reads 
for out-of-line data.
+ *
+ * This reader analyzes sequences of read requests within a partition and 
merges
+ * consecutive or nearby reads into single I/O operations. This significantly 
reduces
+ * the number of seeks and reads when processing sorted data.
+ *
+ * <h3>Schema Requirement:</h3>
+ * The blob column must match the schema defined in {@link HoodieSchema.Blob}:
+ * <pre>
+ * struct {
+ *   type: string                   // "inline" or "out_of_line"
+ *   data: binary (nullable)       // inline data (null for out_of_line)
+ *   reference: struct (nullable) { // file reference (null for inline)
+ *     external_path: string
+ *     offset: long
+ *     length: long
+ *     managed: boolean
+ *   }
+ * }
+ * </pre>
+ *
+ * <h3>Key Features:</h3>
+ * <ul>
+ *   <li>Batches consecutive reads from the same file</li>
+ *   <li>Configurable gap threshold for merging nearby reads</li>
+ *   <li>Lookahead buffer to identify batch opportunities</li>
+ *   <li>Preserves input row order in output</li>
+ * </ul>
+ *
+ * <h3>Usage Example:</h3>
+ * {{{
+ * import org.apache.hudi.udf.BatchedByteRangeReader
+ * import org.apache.spark.sql.functions._
+ * // Read a table with a blob column (e.g. image_data)
+ * val df = spark.read.format("hudi").load("/my_path").select("image_data", 
"record_id")
+ *
+ * // Read with batching (best when data is sorted by external_path, offset)
+ * val result = BatchedByteRangeReader.readBatched(df, structColName = 
"file_info")
+ *
+ * // Result has: image_data, record_id, data
+ * result.show()
+ * }}}
+ *
+ * <h3>Performance Tips:</h3>
+ * <ul>
+ *   <li>Sort input by (blob.reference.external_path, blob.reference.offset) 
for maximum batching effectiveness</li>
+ *   <li>Increase lookaheadSize for better batch detection (at cost of 
memory)</li>
+ *   <li>Tune maxGapBytes based on your data access patterns</li>
+ * </ul>
+ *
+ * @param storage        HoodieStorage instance for file I/O
+ * @param maxGapBytes    Maximum gap between ranges to consider for batching 
(default: 4KB)
+ * @param lookaheadSize  Number of rows to buffer for batch detection 
(default: 50)
+ */
+class BatchedBlobReader(
+    storage: HoodieStorage,
+    maxGapBytes: Int = 4096,
+    lookaheadSize: Int = 50) {
+
+  private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader])
+
+  /**
+   * Process a partition iterator, batching consecutive reads.
+   *
+   * This method consumes the input iterator and produces an output iterator
+   * with each row containing the original data plus a "data" column with the
+   * bytes read from the file.
+   *
+   * @param rows         Iterator of input rows with struct column
+   * @param structColIdx Index of the struct column in the row
+   * @param outputSchema Schema for output rows
+   * @param accessor     Type class for accessing row fields
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Iterator of output rows with data column added
+   */
+  def processPartition[R](
+      rows: Iterator[R],
+      structColIdx: Int,
+      outputSchema: StructType)
+      (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] 
= {
+
+    // Create buffered iterator for lookahead
+    val bufferedRows = rows.buffered
+
+    // Result buffer to maintain order
+    val resultIterator = new Iterator[R] {
+      private var currentBatch: Iterator[R] = Iterator.empty
+      private var rowIndex = 0L
+
+      override def hasNext: Boolean = {
+        if (currentBatch.hasNext) {
+          true
+        } else if (bufferedRows.hasNext) {
+          // Process next batch
+          currentBatch = processNextBatch()
+          currentBatch.hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): R = {
+        if (!hasNext) {
+          throw new NoSuchElementException("No more rows")
+        }
+        currentBatch.next()
+      }
+
+      /**
+       * Collect and process the next batch of rows.
+       */
+      private def processNextBatch(): Iterator[R] = {
+        // Collect up to lookaheadSize rows with their original indices
+        val batch = collectBatch()
+
+        if (batch.isEmpty) {
+          Iterator.empty
+        } else {
+          // Partition the batch into three groups
+          val (inlineRows, outOfLineRows) = 
batch.partition(_.inlineBytes.isDefined)
+          val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 
0)
+
+          // Case 1: Inline — return bytes directly without I/O
+          val inlineResults = inlineRows.map { ri =>
+            RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, 
outputSchema), ri.index)
+          }
+
+          // Case 2: Whole-file reads
+          val wholeFileResults = wholeFileRows.map(readWholeFile(_, 
outputSchema))
+
+          // Case 3: Regular range reads — merge consecutive ranges and batch
+          val mergedRanges = identifyConsecutiveRanges(rangeRows)
+          val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, 
outputSchema))
+
+          // Sort by original index to preserve input order
+          (inlineResults ++ wholeFileResults ++ 
rangeResults).sortBy(_.index).map(_.row).iterator
+        }
+      }
+
+      /**
+       * Collect up to lookaheadSize rows from the input iterator.
+       */
+      private def collectBatch(): Seq[RowInfo[R]] = {
+        val batch = ArrayBuffer[RowInfo[R]]()
+        var collected = 0
+
+        while (bufferedRows.hasNext && collected < lookaheadSize) {
+          val row = bufferedRows.next()
+          val blobStruct = accessor.getStruct(row, structColIdx, 
HoodieSchema.Blob.getFieldCount)
+
+          // Dispatch based on storage_type (field 0)
+          val storageType = accessor.getString(blobStruct, 0)
+
+          if (storageType == HoodieSchema.Blob.INLINE) {
+            // Case 1: Inline — bytes are in field 1
+            val bytes = accessor.getBytes(blobStruct, 1)
+            batch += RowInfo[R](
+              originalRow = row,
+              filePath = "",
+              offset = -1,
+              length = -1,
+              index = rowIndex,
+              inlineBytes = Some(bytes)
+            )
+          } else {
+            // Case 2 or 3: Out-of-line — get reference struct (field 2)
+            val referenceStruct = accessor.getStruct(blobStruct, 2, 
HoodieSchema.Blob.getReferenceFieldCount)
+            val filePath = accessor.getString(referenceStruct, 0)
+            val offsetIsNull = accessor.isNullAt(referenceStruct, 1)
+            val lengthIsNull = accessor.isNullAt(referenceStruct, 2)
+
+            if (offsetIsNull || lengthIsNull) {
+              // Case 2: Whole-file read — no offset/length specified; 
sentinel length = -1
+              batch += RowInfo[R](
+                originalRow = row,
+                filePath = filePath,
+                offset = 0,
+                length = -1,
+                index = rowIndex
+              )
+            } else {
+              // Case 3: Regular range read
+              val offset = accessor.getLong(referenceStruct, 1)
+              val length = accessor.getLong(referenceStruct, 2)
+              batch += RowInfo[R](
+                originalRow = row,
+                filePath = filePath,
+                offset = offset,
+                length = length,
+                index = rowIndex
+              )
+            }
+          }
+
+          rowIndex += 1
+          collected += 1
+        }
+
+        batch.toSeq
+      }
+    }
+
+    resultIterator
+  }
+
+  /**
+   * Identify consecutive ranges that can be batched together.
+   *
+   * This method groups rows by file path, sorts by offset, and merges
+   * ranges that are consecutive or within maxGapBytes of each other.
+   *
+   * @param rows Sequence of row information
+   * @return Sequence of merged ranges
+   */
+  private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): 
Seq[MergedRange[R]] = {
+    // Group by file path
+    val byFile = rows.groupBy(_.filePath)

Review Comment:
   So this is the use for where multiple recordKeys refer to the same out of 
line blob and we want to just not repeat downloading the same file again?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala:
##########
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.io.SeekableDataInputStream
+import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, 
StorageConfiguration, StoragePath}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoders, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, 
SpecificInternalRow}
+import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Batched byte range reader that optimizes I/O by combining consecutive reads 
for out-of-line data.
+ *
+ * This reader analyzes sequences of read requests within a partition and 
merges
+ * consecutive or nearby reads into single I/O operations. This significantly 
reduces
+ * the number of seeks and reads when processing sorted data.
+ *
+ * <h3>Schema Requirement:</h3>
+ * The blob column must match the schema defined in {@link HoodieSchema.Blob}:
+ * <pre>
+ * struct {
+ *   type: string                   // "inline" or "out_of_line"
+ *   data: binary (nullable)       // inline data (null for out_of_line)
+ *   reference: struct (nullable) { // file reference (null for inline)
+ *     external_path: string
+ *     offset: long
+ *     length: long
+ *     managed: boolean
+ *   }
+ * }
+ * </pre>
+ *
+ * <h3>Key Features:</h3>
+ * <ul>
+ *   <li>Batches consecutive reads from the same file</li>
+ *   <li>Configurable gap threshold for merging nearby reads</li>
+ *   <li>Lookahead buffer to identify batch opportunities</li>
+ *   <li>Preserves input row order in output</li>
+ * </ul>
+ *
+ * <h3>Usage Example:</h3>
+ * {{{
+ * import org.apache.hudi.udf.BatchedByteRangeReader
+ * import org.apache.spark.sql.functions._
+ * // Read a table with a blob column (e.g. image_data)
+ * val df = spark.read.format("hudi").load("/my_path").select("image_data", 
"record_id")
+ *
+ * // Read with batching (best when data is sorted by external_path, offset)
+ * val result = BatchedByteRangeReader.readBatched(df, structColName = 
"file_info")
+ *
+ * // Result has: image_data, record_id, data
+ * result.show()
+ * }}}
+ *
+ * <h3>Performance Tips:</h3>
+ * <ul>
+ *   <li>Sort input by (blob.reference.external_path, blob.reference.offset) 
for maximum batching effectiveness</li>
+ *   <li>Increase lookaheadSize for better batch detection (at cost of 
memory)</li>
+ *   <li>Tune maxGapBytes based on your data access patterns</li>
+ * </ul>
+ *
+ * @param storage        HoodieStorage instance for file I/O
+ * @param maxGapBytes    Maximum gap between ranges to consider for batching 
(default: 4KB)
+ * @param lookaheadSize  Number of rows to buffer for batch detection 
(default: 50)
+ */
+class BatchedBlobReader(
+    storage: HoodieStorage,
+    maxGapBytes: Int = 4096,
+    lookaheadSize: Int = 50) {
+
+  private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader])
+
+  /**
+   * Process a partition iterator, batching consecutive reads.
+   *
+   * This method consumes the input iterator and produces an output iterator
+   * with each row containing the original data plus a "data" column with the
+   * bytes read from the file.
+   *
+   * @param rows         Iterator of input rows with struct column
+   * @param structColIdx Index of the struct column in the row
+   * @param outputSchema Schema for output rows
+   * @param accessor     Type class for accessing row fields
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Iterator of output rows with data column added
+   */
+  def processPartition[R](
+      rows: Iterator[R],
+      structColIdx: Int,
+      outputSchema: StructType)
+      (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] 
= {
+
+    // Create buffered iterator for lookahead
+    val bufferedRows = rows.buffered
+
+    // Result buffer to maintain order
+    val resultIterator = new Iterator[R] {
+      private var currentBatch: Iterator[R] = Iterator.empty
+      private var rowIndex = 0L
+
+      override def hasNext: Boolean = {
+        if (currentBatch.hasNext) {
+          true
+        } else if (bufferedRows.hasNext) {
+          // Process next batch
+          currentBatch = processNextBatch()
+          currentBatch.hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): R = {
+        if (!hasNext) {
+          throw new NoSuchElementException("No more rows")
+        }
+        currentBatch.next()
+      }
+
+      /**
+       * Collect and process the next batch of rows.
+       */
+      private def processNextBatch(): Iterator[R] = {
+        // Collect up to lookaheadSize rows with their original indices
+        val batch = collectBatch()
+
+        if (batch.isEmpty) {
+          Iterator.empty
+        } else {
+          // Partition the batch into three groups
+          val (inlineRows, outOfLineRows) = 
batch.partition(_.inlineBytes.isDefined)
+          val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 
0)
+
+          // Case 1: Inline — return bytes directly without I/O
+          val inlineResults = inlineRows.map { ri =>

Review Comment:
   This is a no op for now right since we are only targeting out of line blob 
for milestone 1?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala:
##########
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.io.SeekableDataInputStream
+import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, 
StorageConfiguration, StoragePath}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoders, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, 
SpecificInternalRow}
+import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Batched byte range reader that optimizes I/O by combining consecutive reads 
for out-of-line data.
+ *
+ * This reader analyzes sequences of read requests within a partition and 
merges
+ * consecutive or nearby reads into single I/O operations. This significantly 
reduces
+ * the number of seeks and reads when processing sorted data.
+ *
+ * <h3>Schema Requirement:</h3>
+ * The blob column must match the schema defined in {@link HoodieSchema.Blob}:
+ * <pre>
+ * struct {
+ *   type: string                   // "inline" or "out_of_line"
+ *   data: binary (nullable)       // inline data (null for out_of_line)
+ *   reference: struct (nullable) { // file reference (null for inline)
+ *     external_path: string
+ *     offset: long
+ *     length: long
+ *     managed: boolean
+ *   }
+ * }
+ * </pre>
+ *
+ * <h3>Key Features:</h3>
+ * <ul>
+ *   <li>Batches consecutive reads from the same file</li>
+ *   <li>Configurable gap threshold for merging nearby reads</li>
+ *   <li>Lookahead buffer to identify batch opportunities</li>
+ *   <li>Preserves input row order in output</li>
+ * </ul>
+ *
+ * <h3>Usage Example:</h3>
+ * {{{
+ * import org.apache.hudi.udf.BatchedByteRangeReader
+ * import org.apache.spark.sql.functions._
+ * // Read a table with a blob column (e.g. image_data)
+ * val df = spark.read.format("hudi").load("/my_path").select("image_data", 
"record_id")
+ *
+ * // Read with batching (best when data is sorted by external_path, offset)
+ * val result = BatchedByteRangeReader.readBatched(df, structColName = 
"file_info")
+ *
+ * // Result has: image_data, record_id, data
+ * result.show()
+ * }}}
+ *
+ * <h3>Performance Tips:</h3>
+ * <ul>
+ *   <li>Sort input by (blob.reference.external_path, blob.reference.offset) 
for maximum batching effectiveness</li>
+ *   <li>Increase lookaheadSize for better batch detection (at cost of 
memory)</li>
+ *   <li>Tune maxGapBytes based on your data access patterns</li>
+ * </ul>
+ *
+ * @param storage        HoodieStorage instance for file I/O
+ * @param maxGapBytes    Maximum gap between ranges to consider for batching 
(default: 4KB)
+ * @param lookaheadSize  Number of rows to buffer for batch detection 
(default: 50)
+ */
+class BatchedBlobReader(
+    storage: HoodieStorage,
+    maxGapBytes: Int = 4096,
+    lookaheadSize: Int = 50) {
+
+  private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader])
+
+  /**
+   * Process a partition iterator, batching consecutive reads.
+   *
+   * This method consumes the input iterator and produces an output iterator
+   * with each row containing the original data plus a "data" column with the
+   * bytes read from the file.
+   *
+   * @param rows         Iterator of input rows with struct column
+   * @param structColIdx Index of the struct column in the row
+   * @param outputSchema Schema for output rows
+   * @param accessor     Type class for accessing row fields
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Iterator of output rows with data column added
+   */
+  def processPartition[R](
+      rows: Iterator[R],
+      structColIdx: Int,
+      outputSchema: StructType)
+      (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] 
= {
+
+    // Create buffered iterator for lookahead
+    val bufferedRows = rows.buffered
+
+    // Result buffer to maintain order
+    val resultIterator = new Iterator[R] {
+      private var currentBatch: Iterator[R] = Iterator.empty
+      private var rowIndex = 0L
+
+      override def hasNext: Boolean = {
+        if (currentBatch.hasNext) {
+          true
+        } else if (bufferedRows.hasNext) {
+          // Process next batch
+          currentBatch = processNextBatch()
+          currentBatch.hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): R = {
+        if (!hasNext) {
+          throw new NoSuchElementException("No more rows")

Review Comment:
   Should we add more detail to this exception, such as the path we were 
reading from?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.blob.BlobTestHelpers._
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.functions._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+/**
+ * Tests for the read_blob() SQL function.
+ *
+ * This test suite verifies:
+ * <ul>
+ *   <li>Basic SQL integration with read_blob()</li>
+ *   <li>Integration with WHERE clauses, JOINs</li>
+ *   <li>Configuration parameter handling</li>
+ *   <li>Error handling for invalid inputs</li>
+ * </ul>
+ */
+class TestReadBlobSQL extends HoodieClientTestBase {
+
+  @Test
+  def testBasicReadBlobSQL(): Unit = {
+    val filePath = createTestFile(tempDir, "basic.bin", 10000)
+
+    // Create table with blob column
+    val df = sparkSession.createDataFrame(Seq(
+      (1, "record1", filePath, 0L, 100L),
+      (2, "record2", filePath, 100L, 100L),
+      (3, "record3", filePath, 200L, 100L)
+    )).toDF("id", "name", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "name", "file_info")
+
+    df.createOrReplaceTempView("test_table")
+
+    // Use SQL with read_blob
+    val result = sparkSession.sql("""
+      SELECT id, name, read_blob(file_info) as data
+      FROM test_table
+      WHERE id <= 2
+    """)
+
+    val rows = result.collect()
+    assertEquals(2, rows.length)
+
+    // Verify data is binary
+    val data1 = rows(0).getAs[Array[Byte]]("data")
+    assertEquals(100, data1.length)
+
+    // Verify content matches expected pattern
+    assertBytesContent(data1)
+
+    val data2 = rows(1).getAs[Array[Byte]]("data")
+    assertEquals(100, data2.length)
+    assertBytesContent(data2, expectedOffset = 100)
+  }
+
+  @Test
+  def testReadBlobWithJoin(): Unit = {
+    val filePath = createTestFile(tempDir, "join.bin", 10000)
+
+    // Create blob table
+    val blobDF = sparkSession.createDataFrame(Seq(
+      (1, filePath, 0L, 100L),
+      (2, filePath, 100L, 100L)
+    )).toDF("id", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "file_info")
+
+    blobDF.createOrReplaceTempView("blob_table")
+
+    // Create metadata table
+    val metaDF = sparkSession.createDataFrame(Seq(
+      (1, "Alice"),
+      (2, "Bob")
+    )).toDF("id", "name")
+
+    metaDF.createOrReplaceTempView("meta_table")
+
+    // SQL with JOIN
+    val result = sparkSession.sql("""
+      SELECT m.id, m.name, read_blob(b.file_info) as data
+      FROM meta_table m
+      JOIN blob_table b ON m.id = b.id
+      ORDER BY m.id
+    """)
+
+    val rows = result.collect()
+    assertEquals(2, rows.length)
+    assertEquals("Alice", rows(0).getAs[String]("name"))
+    assertEquals(100, rows(0).getAs[Array[Byte]]("data").length)
+    assertEquals("Bob", rows(1).getAs[String]("name"))
+    assertEquals(100, rows(1).getAs[Array[Byte]]("data").length)
+
+    // Verify data content
+    val data1 = rows(0).getAs[Array[Byte]]("data")
+    assertBytesContent(data1)
+  }
+
+  @Test
+  def testReadBlobWithOrderBy(): Unit = {
+    val filePath = createTestFile(tempDir, "order.bin", 10000)
+
+    val df = sparkSession.createDataFrame(Seq(
+      (3, filePath, 200L, 50L),
+      (1, filePath, 0L, 50L),
+      (2, filePath, 100L, 50L)
+    )).toDF("id", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "file_info")
+
+    df.createOrReplaceTempView("order_table")
+
+    // SQL with ORDER BY
+    val result = sparkSession.sql("""
+      SELECT id, read_blob(file_info) as data
+      FROM order_table
+      ORDER BY id
+    """)
+
+    val rows = result.collect()
+    assertEquals(3, rows.length)
+    assertEquals(1, rows(0).getAs[Int]("id"))
+    assertEquals(2, rows(1).getAs[Int]("id"))
+    assertEquals(3, rows(2).getAs[Int]("id"))
+
+    // Verify data content for ordered results
+    val data1 = rows(0).getAs[Array[Byte]]("data")
+    assertBytesContent(data1)
+  }
+
+  @Test
+  def testReadBlobInSubquery(): Unit = {
+    val filePath = createTestFile(tempDir, "subquery.bin", 10000)
+
+    val df = sparkSession.createDataFrame(Seq(
+      (1, "A", filePath, 0L, 100L),
+      (2, "A", filePath, 100L, 100L),
+      (3, "B", filePath, 200L, 100L)
+    )).toDF("id", "category", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "category", "file_info")
+
+    df.createOrReplaceTempView("subquery_table")
+
+    // SQL with subquery
+    val result = sparkSession.sql("""
+      SELECT * FROM (
+        SELECT id, category, read_blob(file_info) as data
+        FROM subquery_table
+      ) WHERE category = 'A'
+    """)
+
+    val rows = result.collect()
+    assertEquals(2, rows.length)
+    rows.foreach { row =>
+      assertEquals("A", row.getAs[String]("category"))
+      assertEquals(100, row.getAs[Array[Byte]]("data").length)
+    }
+  }
+
+  @Test
+  def testConfigurationParameters(): Unit = {
+    val filePath = createTestFile(tempDir, "config.bin", 50000)
+
+    val df = sparkSession.createDataFrame(Seq(
+      (1, filePath, 0L, 100L),
+      (2, filePath, 5000L, 100L),  // 4.9KB gap
+      (3, filePath, 10000L, 100L)
+    )).toDF("id", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "file_info")
+
+    df.createOrReplaceTempView("config_table")
+
+    // Use withSparkConfig to automatically manage configuration
+    withSparkConfig(sparkSession, Map(
+      "hoodie.blob.batching.max.gap.bytes" -> "10000",
+      "hoodie.blob.batching.lookahead.size" -> "100"
+    )) {
+      val result = sparkSession.sql("""
+        SELECT id, read_blob(file_info) as data
+        FROM config_table
+      """)
+
+      val rows = result.collect()
+      assertEquals(3, rows.length)
+
+      // Verify all reads completed successfully
+      rows.foreach { row =>
+        assertEquals(100, row.getAs[Array[Byte]]("data").length)
+      }
+    }
+  }
+
+  @Test
+  def testMultipleReadBlobInSameQuery(): Unit = {

Review Comment:
   @the-other-tim-brown Wanted to share something with you when I tried 
modifying this test slightly.
   
   Used the following test which basically alters the `offset2` for each row 
(to make this more distinguishable)
   ```
   def testMultipleReadBlobInSameQuery(): Unit = {
       val filePath1 = createTestFile(tempDir, "multi1.bin", 10000)
       val filePath2 = createTestFile(tempDir, "multi2.bin", 10000)
   
       // Use different offsets for file1 vs file2 so byte content is 
distinguishable.
       // createTestFile writes bytes as (i % 256), so:
       //   file1 offset=0   -> bytes (0, 1, 2, ...)
       //   file2 offset=500 -> bytes (244, 245, 246, ...)
       val df = sparkSession.createDataFrame(Seq(
         (1, filePath1, 0L, 50L, filePath2, 500L, 50L),
         (2, filePath1, 100L, 50L, filePath2, 600L, 50L)
       )).toDF("id", "external_path1", "offset1", "length1", "external_path2", 
"offset2", "length2")
         .withColumn("file_info1",
           blobStructCol("file_info1", col("external_path1"), col("offset1"), 
col("length1")))
         .withColumn("file_info2",
           blobStructCol("file_info2", col("external_path2"), col("offset2"), 
col("length2")))
         .select("id", "file_info1", "file_info2")
   
       df.createOrReplaceTempView("multi_table")
   
       // SQL with multiple read_blob calls on DIFFERENT blob columns
       val result = sparkSession.sql("""
         SELECT
           id,
           read_blob(file_info1) as data1,
           read_blob(file_info2) as data2
         FROM multi_table
         ORDER BY id
       """)
   
       val rows = result.collect()
       assertEquals(2, rows.length)
   
       // Row 1: data1 = file1 at offset 0, data2 = file2 at offset 500
       val data1_row1 = rows(0).getAs[Array[Byte]]("data1")
       val data2_row1 = rows(0).getAs[Array[Byte]]("data2")
       assertEquals(50, data1_row1.length)
       assertEquals(50, data2_row1.length)
       assertBytesContent(data1_row1, expectedOffset = 0)
       // This assertion will fail if ReadBlobRule only resolves the first blob 
column,
       // because data2 will contain file1's bytes (offset 0) instead of 
file2's bytes (offset 500)
       assertBytesContent(data2_row1, expectedOffset = 500)
   
       // Row 2: data1 = file1 at offset 100, data2 = file2 at offset 600
       val data1_row2 = rows(1).getAs[Array[Byte]]("data1")
       val data2_row2 = rows(1).getAs[Array[Byte]]("data2")
       assertBytesContent(data1_row2, expectedOffset = 100)
       assertBytesContent(data2_row2, expectedOffset = 600)
     }
   ```
   
   I think the issue i am seeing it that `data2` will contain `file_info1`'s 
bytes (offset 0) instead of  `file_info2`'s bytes (offset 500). Let me know if 
this is correctness issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to