voonhous commented on code in PR #18098:
URL: https://github.com/apache/hudi/pull/18098#discussion_r3021303052


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression, ExprId, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Transforms queries with `read_blob()` to use lazy batched I/O.
+ *
+ * Replaces [[ReadBlobExpression]] markers with [[BatchedBlobRead]] nodes
+ * that read blob data during physical execution.
+ *
+ * Example: `SELECT id, read_blob(image_data) FROM table`
+ *
+ * @param spark SparkSession for accessing configuration
+ */
+case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp 
{
+    case Filter(condition, child)
+      if containsReadBlobInExpression(condition) =>

Review Comment:
   No check for `!child.isInstanceOf[BatchedBlobRead]` like the Project branch 
has. 
   
   Could this cause infinite rule application if the optimizer re-applies rules?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression, ExprId, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Transforms queries with `read_blob()` to use lazy batched I/O.
+ *
+ * Replaces [[ReadBlobExpression]] markers with [[BatchedBlobRead]] nodes
+ * that read blob data during physical execution.
+ *
+ * Example: `SELECT id, read_blob(image_data) FROM table`
+ *
+ * @param spark SparkSession for accessing configuration
+ */
+case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp 
{

Review Comment:
   Plan matches the `Filter` and `Project` pattern. 
   
   IIUC, if there are other rules like aggregate or sorts, there might be 
errors thrown right? Should we be more defensive about this?
   
   An error might be good as a catch-all case that throws a clear 
`AnalysisException("read_blob() is only supported in SELECT and WHERE 
clauses")` for all other non-supported patterns.
   
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala:
##########
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.io.SeekableDataInputStream
+import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, 
StorageConfiguration, StoragePath}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoders, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, 
SpecificInternalRow}
+import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Batched byte range reader that optimizes I/O by combining consecutive reads 
for out-of-line data.
+ *
+ * This reader analyzes sequences of read requests within a partition and 
merges
+ * consecutive or nearby reads into single I/O operations. This significantly 
reduces
+ * the number of seeks and reads when processing sorted data.
+ *
+ * <h3>Schema Requirement:</h3>
+ * The blob column must match the schema defined in {@link HoodieSchema.Blob}:
+ * <pre>
+ * struct {
+ *   type: string                   // "inline" or "out_of_line"
+ *   data: binary (nullable)       // inline data (null for out_of_line)
+ *   reference: struct (nullable) { // file reference (null for inline)
+ *     external_path: string
+ *     offset: long
+ *     length: long
+ *     managed: boolean
+ *   }
+ * }
+ * </pre>
+ *
+ * <h3>Key Features:</h3>
+ * <ul>
+ *   <li>Batches consecutive reads from the same file</li>
+ *   <li>Configurable gap threshold for merging nearby reads</li>
+ *   <li>Lookahead buffer to identify batch opportunities</li>
+ *   <li>Preserves input row order in output</li>
+ * </ul>
+ *
+ * <h3>Usage Example:</h3>
+ * {{{
+ * import org.apache.hudi.udf.BatchedByteRangeReader
+ * import org.apache.spark.sql.functions._
+ * // Read a table with a blob column (e.g. image_data)
+ * val df = spark.read.format("hudi").load("/my_path").select("image_data", 
"record_id")
+ *
+ * // Read with batching (best when data is sorted by external_path, offset)
+ * val result = BatchedByteRangeReader.readBatched(df, structColName = 
"file_info")
+ *
+ * // Result has: image_data, record_id, data
+ * result.show()
+ * }}}
+ *
+ * <h3>Performance Tips:</h3>
+ * <ul>
+ *   <li>Sort input by (blob.reference.external_path, blob.reference.offset) 
for maximum batching effectiveness</li>
+ *   <li>Increase lookaheadSize for better batch detection (at cost of 
memory)</li>
+ *   <li>Tune maxGapBytes based on your data access patterns</li>
+ * </ul>
+ *
+ * @param storage        HoodieStorage instance for file I/O
+ * @param maxGapBytes    Maximum gap between ranges to consider for batching 
(default: 4KB)
+ * @param lookaheadSize  Number of rows to buffer for batch detection 
(default: 50)
+ */
+class BatchedBlobReader(
+    storage: HoodieStorage,
+    maxGapBytes: Int = 4096,
+    lookaheadSize: Int = 50) {
+
+  private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader])
+
+  /**
+   * Process a partition iterator, batching consecutive reads.
+   *
+   * This method consumes the input iterator and produces an output iterator
+   * with each row containing the original data plus a "data" column with the
+   * bytes read from the file.
+   *
+   * @param rows         Iterator of input rows with struct column
+   * @param structColIdx Index of the struct column in the row
+   * @param outputSchema Schema for output rows
+   * @param accessor     Type class for accessing row fields
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Iterator of output rows with data column added
+   */
+  def processPartition[R](
+      rows: Iterator[R],
+      structColIdx: Int,
+      outputSchema: StructType)
+      (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] 
= {
+
+    // Create buffered iterator for lookahead
+    val bufferedRows = rows.buffered
+
+    // Result buffer to maintain order
+    val resultIterator = new Iterator[R] {
+      private var currentBatch: Iterator[R] = Iterator.empty
+      private var rowIndex = 0L
+
+      override def hasNext: Boolean = {
+        if (currentBatch.hasNext) {
+          true
+        } else if (bufferedRows.hasNext) {
+          // Process next batch
+          currentBatch = processNextBatch()
+          currentBatch.hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): R = {
+        if (!hasNext) {
+          throw new NoSuchElementException("No more rows")
+        }
+        currentBatch.next()
+      }
+
+      /**
+       * Collect and process the next batch of rows.
+       */
+      private def processNextBatch(): Iterator[R] = {
+        // Collect up to lookaheadSize rows with their original indices
+        val batch = collectBatch()
+
+        if (batch.isEmpty) {
+          Iterator.empty
+        } else {
+          // Partition the batch into three groups
+          val (inlineRows, outOfLineRows) = 
batch.partition(_.inlineBytes.isDefined)
+          val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 
0)
+
+          // Case 1: Inline — return bytes directly without I/O
+          val inlineResults = inlineRows.map { ri =>
+            RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, 
outputSchema), ri.index)
+          }
+
+          // Case 2: Whole-file reads
+          val wholeFileResults = wholeFileRows.map(readWholeFile(_, 
outputSchema))
+
+          // Case 3: Regular range reads — merge consecutive ranges and batch
+          val mergedRanges = identifyConsecutiveRanges(rangeRows)
+          val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, 
outputSchema))
+
+          // Sort by original index to preserve input order
+          (inlineResults ++ wholeFileResults ++ 
rangeResults).sortBy(_.index).map(_.row).iterator
+        }
+      }
+
+      /**
+       * Collect up to lookaheadSize rows from the input iterator.
+       */
+      private def collectBatch(): Seq[RowInfo[R]] = {
+        val batch = ArrayBuffer[RowInfo[R]]()
+        var collected = 0
+
+        while (bufferedRows.hasNext && collected < lookaheadSize) {
+          val row = bufferedRows.next()
+          val blobStruct = accessor.getStruct(row, structColIdx, 
HoodieSchema.Blob.getFieldCount)
+
+          // Dispatch based on storage_type (field 0)
+          val storageType = accessor.getString(blobStruct, 0)
+
+          if (storageType == HoodieSchema.Blob.INLINE) {
+            // Case 1: Inline — bytes are in field 1
+            val bytes = accessor.getBytes(blobStruct, 1)
+            batch += RowInfo[R](
+              originalRow = row,
+              filePath = "",
+              offset = -1,
+              length = -1,
+              index = rowIndex,
+              inlineBytes = Some(bytes)
+            )
+          } else {
+            // Case 2 or 3: Out-of-line — get reference struct (field 2)
+            val referenceStruct = accessor.getStruct(blobStruct, 2, 
HoodieSchema.Blob.getReferenceFieldCount)
+            val filePath = accessor.getString(referenceStruct, 0)
+            val offsetIsNull = accessor.isNullAt(referenceStruct, 1)
+            val lengthIsNull = accessor.isNullAt(referenceStruct, 2)
+
+            if (offsetIsNull || lengthIsNull) {
+              // Case 2: Whole-file read — no offset/length specified; 
sentinel length = -1
+              batch += RowInfo[R](
+                originalRow = row,
+                filePath = filePath,
+                offset = 0,
+                length = -1,
+                index = rowIndex
+              )
+            } else {
+              // Case 3: Regular range read
+              val offset = accessor.getLong(referenceStruct, 1)
+              val length = accessor.getLong(referenceStruct, 2)
+              batch += RowInfo[R](
+                originalRow = row,
+                filePath = filePath,
+                offset = offset,
+                length = length,
+                index = rowIndex
+              )
+            }
+          }
+
+          rowIndex += 1
+          collected += 1
+        }
+
+        batch.toSeq
+      }
+    }
+
+    resultIterator
+  }
+
+  /**
+   * Identify consecutive ranges that can be batched together.
+   *
+   * This method groups rows by file path, sorts by offset, and merges
+   * ranges that are consecutive or within maxGapBytes of each other.
+   *
+   * @param rows Sequence of row information
+   * @return Sequence of merged ranges
+   */
+  private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): 
Seq[MergedRange[R]] = {
+    // Group by file path
+    val byFile = rows.groupBy(_.filePath)
+
+    val allRanges = ArrayBuffer[MergedRange[R]]()
+
+    byFile.foreach { case (filePath, fileRows) =>
+      // Sort by offset
+      val sorted = fileRows.sortBy(_.offset)
+
+      // Merge consecutive ranges
+      val merged = mergeRanges(sorted, maxGapBytes)
+      allRanges ++= merged
+    }
+
+    allRanges.toSeq
+  }
+
+  /**
+   * Merge consecutive ranges within the gap threshold.
+   *
+   * @param rows   Sorted rows from the same file
+   * @param maxGap Maximum gap to consider for merging
+   * @return Sequence of merged ranges
+   */
+  private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): 
Seq[MergedRange[R]] = {
+
+    val result = ArrayBuffer[MergedRange[R]]()
+    var current: MergedRange[R] = null
+
+    rows.foreach { row =>
+      if (current == null) {
+        // Start first range
+        current = MergedRange[R](
+          filePath = row.filePath,
+          startOffset = row.offset,
+          endOffset = row.offset + row.length,
+          rows = Seq(row)
+        )
+      } else {
+        val gap = row.offset - current.endOffset
+
+        if (gap >= 0 && gap <= maxGap) {
+          // Merge into current range
+          current = current.merge(row)
+        } else {
+          // Save current range and start new one
+          result += current
+          current = MergedRange[R](
+            filePath = row.filePath,
+            startOffset = row.offset,
+            endOffset = row.offset + row.length,
+            rows = Seq(row)
+          )
+        }
+      }
+    }
+
+    // Add final range
+    if (current != null) {
+      result += current
+    }
+
+    result.toSeq
+  }

Review Comment:
   IIUC, the code here is to merge ranges that are within gap threshold so that 
1 read can access near ranges.
   
   What about ranges that overlap each other?
   
   For example, if row A covers [0, 100) and row B covers [80, 150):            
                                                                                
                                                                                
                                        
   - gap = 80 - 100 = -20
   - Condition `gap >= 0 && gap <= maxGap` is false (negative)
   - So it starts a new range at [80, 150) instead of merging into [0, 150)
   
   If we're intentionally ignoring overlaps, we should call this out in 
comments.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala:
##########
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.blob.BlobTestHelpers._
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.functions._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+/**
+ * Tests for the read_blob() SQL function.
+ *
+ * This test suite verifies:
+ * <ul>
+ *   <li>Basic SQL integration with read_blob()</li>
+ *   <li>Integration with WHERE clauses, JOINs</li>
+ *   <li>Configuration parameter handling</li>
+ *   <li>Error handling for invalid inputs</li>
+ * </ul>
+ */
+class TestReadBlobSQL extends HoodieClientTestBase {
+
+  @Test
+  def testBasicReadBlobSQL(): Unit = {
+    val filePath = createTestFile(tempDir, "basic.bin", 10000)
+
+    // Create table with blob column
+    val df = sparkSession.createDataFrame(Seq(
+      (1, "record1", filePath, 0L, 100L),
+      (2, "record2", filePath, 100L, 100L),
+      (3, "record3", filePath, 200L, 100L)

Review Comment:
   What happens with null blob columns?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala:
##########
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.blob.BlobTestHelpers._
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.functions._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+/**
+ * Tests for the read_blob() SQL function.
+ *
+ * This test suite verifies:
+ * <ul>
+ *   <li>Basic SQL integration with read_blob()</li>
+ *   <li>Integration with WHERE clauses, JOINs</li>
+ *   <li>Configuration parameter handling</li>
+ *   <li>Error handling for invalid inputs</li>
+ * </ul>
+ */
+class TestReadBlobSQL extends HoodieClientTestBase {
+
+  @Test
+  def testBasicReadBlobSQL(): Unit = {
+    val filePath = createTestFile(tempDir, "basic.bin", 10000)
+
+    // Create table with blob column
+    val df = sparkSession.createDataFrame(Seq(
+      (1, "record1", filePath, 0L, 100L),
+      (2, "record2", filePath, 100L, 100L),
+      (3, "record3", filePath, 200L, 100L)
+    )).toDF("id", "name", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "name", "file_info")
+
+    df.createOrReplaceTempView("test_table")
+
+    // Use SQL with read_blob
+    val result = sparkSession.sql("""
+      SELECT id, name, read_blob(file_info) as data
+      FROM test_table
+      WHERE id <= 2
+    """)
+
+    val rows = result.collect()
+    assertEquals(2, rows.length)
+
+    // Verify data is binary
+    val data1 = rows(0).getAs[Array[Byte]]("data")
+    assertEquals(100, data1.length)
+
+    // Verify content matches expected pattern
+    assertBytesContent(data1)
+
+    val data2 = rows(1).getAs[Array[Byte]]("data")
+    assertEquals(100, data2.length)
+    assertBytesContent(data2, expectedOffset = 100)
+  }
+
+  @Test
+  def testReadBlobWithJoin(): Unit = {
+    val filePath = createTestFile(tempDir, "join.bin", 10000)
+
+    // Create blob table
+    val blobDF = sparkSession.createDataFrame(Seq(
+      (1, filePath, 0L, 100L),
+      (2, filePath, 100L, 100L)

Review Comment:
   What happens for filePaths that do not exist?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBatchedBlobReader.scala:
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.blob.BlobTestHelpers._
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hudi.blob.BatchedBlobReader
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+/**
+ * Tests for BatchedByteRangeReader.
+ *
+ * These tests verify the batching behavior and effectiveness of the
+ * BatchedByteRangeReader compared to non-batched approaches.
+ */
+class TestBatchedBlobReader extends HoodieClientTestBase {
+
+  @Test
+  def testBasicBatchedRead(): Unit = {
+    val filePath = createTestFile(tempDir, "basic.bin", 10000)
+
+    // Create input with struct column
+    val inputDF = sparkSession.createDataFrame(Seq(
+      (filePath, 0L, 100L),
+      (filePath, 100L, 100L),
+      (filePath, 200L, 100L)
+    )).toDF("external_path", "offset", "length")
+      .withColumn("data", blobStructCol("data", col("external_path"), 
col("offset"), col("length")))
+      .select("data")
+
+    // Read with batching
+    val resultDF = BatchedBlobReader.readBatched(inputDF, storageConf)
+
+    // Verify schema
+    assertTrue(resultDF.columns.contains("data"))
+    assertEquals(1, resultDF.columns.length) // data
+
+    // Verify results
+    val results = resultDF.collect()
+    assertEquals(3, results.length)
+
+    // Check data content
+    results.zipWithIndex.foreach { case (row, i) =>
+      val data = row.getAs[Array[Byte]]("data")
+      assertEquals(100, data.length)
+
+      // Verify content matches expected pattern
+      assertBytesContent(data, expectedOffset = i * 100)
+    }
+  }
+
+  @Test
+  def testNoBatchingDifferentFiles(): Unit = {
+    // Create different files
+    val file1 = createTestFile(tempDir, "file1.bin", 5000)
+    val file2 = createTestFile(tempDir, "file2.bin", 5000)
+    val file3 = createTestFile(tempDir, "file3.bin", 5000)
+
+    // Reads from different files (no batching possible)
+    val inputDF = sparkSession.createDataFrame(Seq(
+      (file1, 0L, 100L),
+      (file2, 0L, 100L),
+      (file3, 0L, 100L)
+    )).toDF("external_path", "offset", "length")
+      .withColumn("data", blobStructCol("data", col("external_path"), 
col("offset"), col("length")))
+      .select("data")
+
+    val resultDF = BatchedBlobReader.readBatched(inputDF, storageConf)
+
+    val results = resultDF.collect()
+    assertEquals(3, results.length)
+
+    // Verify all reads succeeded
+    results.foreach { row =>
+      val data = row.getAs[Array[Byte]]("data")
+      assertEquals(100, data.length)
+    }
+  }
+
+  @Test
+  def testGapThresholdSmallGaps(): Unit = {
+
+    val filePath = createTestFile(tempDir, "small-gaps.bin", 10000)
+
+    // Reads with small gaps (should batch with default threshold of 4KB)
+    val inputDF = sparkSession.createDataFrame(Seq(
+      (filePath, 0L, 100L),
+      (filePath, 120L, 100L),    // 20 byte gap
+      (filePath, 240L, 100L),    // 20 byte gap
+      (filePath, 360L, 100L)     // 20 byte gap
+    )).toDF("external_path", "offset", "length")
+      .withColumn("data", blobStructCol("data", col("external_path"), 
col("offset"), col("length")))
+      .select("data")
+
+    // Use default maxGapBytes=4096 which should batch these
+    val resultDF = BatchedBlobReader.readBatched(inputDF, storageConf, 
maxGapBytes = 4096)
+
+    val results = resultDF.collect()
+    assertEquals(4, results.length)
+
+    results.foreach { row =>
+      val data = row.getAs[Array[Byte]]("data")
+      assertEquals(100, data.length)
+    }
+  }
+
+  @Test

Review Comment:
   Possible to add a test for overlapping ranges?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression, ExprId, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Transforms queries with `read_blob()` to use lazy batched I/O.
+ *
+ * Replaces [[ReadBlobExpression]] markers with [[BatchedBlobRead]] nodes
+ * that read blob data during physical execution.
+ *
+ * Example: `SELECT id, read_blob(image_data) FROM table`
+ *
+ * @param spark SparkSession for accessing configuration
+ */
+case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp 
{
+    case Filter(condition, child)
+      if containsReadBlobInExpression(condition) =>
+
+      val blobColumns = extractBlobColumnsFromExpression(condition)
+      val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child)
+      val newCondition = replaceReadBlobExpression(condition, blobToDataAttr)
+      Project(child.output, Filter(newCondition, wrappedPlan))
+
+    case Project(projectList, child)
+      if containsReadBlobExpression(projectList)
+        && !child.isInstanceOf[BatchedBlobRead] =>
+
+      val blobColumns = extractAllBlobColumns(projectList)
+      val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child)
+      val newProjectList = transformNamedExpressions(projectList, 
blobToDataAttr)
+      Project(newProjectList, wrappedPlan)
+  }
+
+  private def wrapWithBlobReads(
+      blobColumns: Seq[AttributeReference],
+      child: LogicalPlan): (LogicalPlan, Map[ExprId, Attribute]) = {
+    if (blobColumns.isEmpty) {
+      throw new IllegalStateException("read_blob() found but no valid blob 
column reference extracted.")
+    }
+    blobColumns.foldLeft((child: LogicalPlan, Map.empty[ExprId, Attribute])) {
+      case ((currentPlan, mapping), blobAttr) =>
+        val blobRead = BatchedBlobRead(currentPlan, blobAttr)
+        (blobRead, mapping + (blobAttr.exprId -> blobRead.dataAttr))
+    }
+  }
+
+  private def extractBlobColumnsFromExpression(expr: Expression): 
Seq[AttributeReference] = {
+    val seen = mutable.LinkedHashSet.empty[ExprId]
+    val result = ArrayBuffer.empty[AttributeReference]
+    collectBlobColumns(expr, seen, result)
+    result.toSeq
+  }
+
+  /**
+   * Check if any expression in the project list contains a ReadBlobExpression.
+   */
+  private def containsReadBlobExpression(projectList: Seq[Expression]): 
Boolean = {
+    projectList.exists(expr => containsReadBlobInExpression(expr))
+  }
+
+  private def containsReadBlobInExpression(expr: Expression): Boolean = {
+    expr match {
+      case _: ReadBlobExpression => true
+      case other => other.children.exists(containsReadBlobInExpression)
+    }
+  }
+
+  private def extractAllBlobColumns(expressions: Seq[Expression]): 
Seq[AttributeReference] = {
+    val seen = mutable.LinkedHashSet.empty[ExprId]
+    val result = ArrayBuffer.empty[AttributeReference]
+    expressions.foreach(collectBlobColumns(_, seen, result))
+    result.toSeq
+  }
+
+  private def collectBlobColumns(
+      expr: Expression,
+      seen: mutable.Set[ExprId],
+      result: ArrayBuffer[AttributeReference]): Unit = expr match {
+    case ReadBlobExpression(attr: AttributeReference) =>
+      if (seen.add(attr.exprId)) result += attr
+    case other =>
+      other.children.foreach(collectBlobColumns(_, seen, result))
+  }
+
+  private def transformNamedExpressions(
+      expressions: Seq[NamedExpression],
+      blobToDataAttr: Map[ExprId, Attribute]): Seq[NamedExpression] = {
+    expressions.map {
+      case alias @ Alias(childExpr, name) =>
+        val rewritten = replaceReadBlobExpression(childExpr, blobToDataAttr)
+        Alias(rewritten, name)(alias.exprId, alias.qualifier, 
alias.explicitMetadata)
+      case attr: AttributeReference => attr
+      case other =>
+        replaceReadBlobExpression(other, 
blobToDataAttr).asInstanceOf[NamedExpression]
+    }
+  }
+
+  private def replaceReadBlobExpression(
+      expr: Expression,
+      blobToDataAttr: Map[ExprId, Attribute]): Expression = expr match {
+    case ReadBlobExpression(attr: AttributeReference) =>
+      blobToDataAttr(attr.exprId)

Review Comment:
   Let's add a test for applying `read_blob()` on a non-blob column.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala:
##########
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.io.SeekableDataInputStream
+import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, 
StorageConfiguration, StoragePath}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoders, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, 
SpecificInternalRow}
+import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Batched byte range reader that optimizes I/O by combining consecutive reads 
for out-of-line data.
+ *
+ * This reader analyzes sequences of read requests within a partition and 
merges
+ * consecutive or nearby reads into single I/O operations. This significantly 
reduces
+ * the number of seeks and reads when processing sorted data.
+ *
+ * <h3>Schema Requirement:</h3>
+ * The blob column must match the schema defined in {@link HoodieSchema.Blob}:
+ * <pre>
+ * struct {
+ *   type: string                   // "inline" or "out_of_line"
+ *   data: binary (nullable)       // inline data (null for out_of_line)
+ *   reference: struct (nullable) { // file reference (null for inline)
+ *     external_path: string
+ *     offset: long
+ *     length: long
+ *     managed: boolean
+ *   }
+ * }
+ * </pre>
+ *
+ * <h3>Key Features:</h3>
+ * <ul>
+ *   <li>Batches consecutive reads from the same file</li>
+ *   <li>Configurable gap threshold for merging nearby reads</li>
+ *   <li>Lookahead buffer to identify batch opportunities</li>
+ *   <li>Preserves input row order in output</li>
+ * </ul>
+ *
+ * <h3>Usage Example:</h3>
+ * {{{
+ * import org.apache.hudi.udf.BatchedByteRangeReader
+ * import org.apache.spark.sql.functions._
+ * // Read a table with a blob column (e.g. image_data)
+ * val df = spark.read.format("hudi").load("/my_path").select("image_data", 
"record_id")
+ *
+ * // Read with batching (best when data is sorted by external_path, offset)
+ * val result = BatchedByteRangeReader.readBatched(df, structColName = 
"file_info")
+ *
+ * // Result has: image_data, record_id, data
+ * result.show()
+ * }}}
+ *
+ * <h3>Performance Tips:</h3>
+ * <ul>
+ *   <li>Sort input by (blob.reference.external_path, blob.reference.offset) 
for maximum batching effectiveness</li>
+ *   <li>Increase lookaheadSize for better batch detection (at cost of 
memory)</li>
+ *   <li>Tune maxGapBytes based on your data access patterns</li>
+ * </ul>
+ *
+ * @param storage        HoodieStorage instance for file I/O
+ * @param maxGapBytes    Maximum gap between ranges to consider for batching 
(default: 4KB)
+ * @param lookaheadSize  Number of rows to buffer for batch detection 
(default: 50)
+ */
+class BatchedBlobReader(
+    storage: HoodieStorage,
+    maxGapBytes: Int = 4096,
+    lookaheadSize: Int = 50) {
+
+  private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader])
+
+  /**
+   * Process a partition iterator, batching consecutive reads.
+   *
+   * This method consumes the input iterator and produces an output iterator
+   * with each row containing the original data plus a "data" column with the
+   * bytes read from the file.
+   *
+   * @param rows         Iterator of input rows with struct column
+   * @param structColIdx Index of the struct column in the row
+   * @param outputSchema Schema for output rows
+   * @param accessor     Type class for accessing row fields
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Iterator of output rows with data column added
+   */
+  def processPartition[R](
+      rows: Iterator[R],
+      structColIdx: Int,
+      outputSchema: StructType)
+      (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] 
= {
+
+    // Create buffered iterator for lookahead
+    val bufferedRows = rows.buffered
+
+    // Result buffer to maintain order
+    val resultIterator = new Iterator[R] {
+      private var currentBatch: Iterator[R] = Iterator.empty
+      private var rowIndex = 0L
+
+      override def hasNext: Boolean = {
+        if (currentBatch.hasNext) {
+          true
+        } else if (bufferedRows.hasNext) {
+          // Process next batch
+          currentBatch = processNextBatch()
+          currentBatch.hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): R = {
+        if (!hasNext) {
+          throw new NoSuchElementException("No more rows")
+        }
+        currentBatch.next()
+      }
+
+      /**
+       * Collect and process the next batch of rows.
+       */
+      private def processNextBatch(): Iterator[R] = {
+        // Collect up to lookaheadSize rows with their original indices
+        val batch = collectBatch()
+
+        if (batch.isEmpty) {
+          Iterator.empty
+        } else {
+          // Partition the batch into three groups
+          val (inlineRows, outOfLineRows) = 
batch.partition(_.inlineBytes.isDefined)
+          val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 
0)
+
+          // Case 1: Inline — return bytes directly without I/O
+          val inlineResults = inlineRows.map { ri =>
+            RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, 
outputSchema), ri.index)
+          }
+
+          // Case 2: Whole-file reads
+          val wholeFileResults = wholeFileRows.map(readWholeFile(_, 
outputSchema))
+
+          // Case 3: Regular range reads — merge consecutive ranges and batch
+          val mergedRanges = identifyConsecutiveRanges(rangeRows)
+          val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, 
outputSchema))
+
+          // Sort by original index to preserve input order
+          (inlineResults ++ wholeFileResults ++ 
rangeResults).sortBy(_.index).map(_.row).iterator
+        }
+      }
+
+      /**
+       * Collect up to lookaheadSize rows from the input iterator.
+       */
+      private def collectBatch(): Seq[RowInfo[R]] = {
+        val batch = ArrayBuffer[RowInfo[R]]()
+        var collected = 0
+
+        while (bufferedRows.hasNext && collected < lookaheadSize) {
+          val row = bufferedRows.next()
+          val blobStruct = accessor.getStruct(row, structColIdx, 
HoodieSchema.Blob.getFieldCount)
+
+          // Dispatch based on storage_type (field 0)
+          val storageType = accessor.getString(blobStruct, 0)
+
+          if (storageType == HoodieSchema.Blob.INLINE) {
+            // Case 1: Inline — bytes are in field 1
+            val bytes = accessor.getBytes(blobStruct, 1)
+            batch += RowInfo[R](
+              originalRow = row,
+              filePath = "",
+              offset = -1,
+              length = -1,
+              index = rowIndex,
+              inlineBytes = Some(bytes)
+            )
+          } else {
+            // Case 2 or 3: Out-of-line — get reference struct (field 2)
+            val referenceStruct = accessor.getStruct(blobStruct, 2, 
HoodieSchema.Blob.getReferenceFieldCount)
+            val filePath = accessor.getString(referenceStruct, 0)
+            val offsetIsNull = accessor.isNullAt(referenceStruct, 1)
+            val lengthIsNull = accessor.isNullAt(referenceStruct, 2)
+
+            if (offsetIsNull || lengthIsNull) {
+              // Case 2: Whole-file read — no offset/length specified; 
sentinel length = -1
+              batch += RowInfo[R](
+                originalRow = row,
+                filePath = filePath,
+                offset = 0,
+                length = -1,
+                index = rowIndex
+              )
+            } else {
+              // Case 3: Regular range read
+              val offset = accessor.getLong(referenceStruct, 1)
+              val length = accessor.getLong(referenceStruct, 2)
+              batch += RowInfo[R](
+                originalRow = row,
+                filePath = filePath,
+                offset = offset,
+                length = length,
+                index = rowIndex
+              )
+            }
+          }
+
+          rowIndex += 1
+          collected += 1
+        }
+
+        batch.toSeq
+      }
+    }
+
+    resultIterator
+  }
+
+  /**
+   * Identify consecutive ranges that can be batched together.
+   *
+   * This method groups rows by file path, sorts by offset, and merges
+   * ranges that are consecutive or within maxGapBytes of each other.
+   *
+   * @param rows Sequence of row information
+   * @return Sequence of merged ranges
+   */
+  private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): 
Seq[MergedRange[R]] = {
+    // Group by file path
+    val byFile = rows.groupBy(_.filePath)
+
+    val allRanges = ArrayBuffer[MergedRange[R]]()
+
+    byFile.foreach { case (filePath, fileRows) =>
+      // Sort by offset
+      val sorted = fileRows.sortBy(_.offset)
+
+      // Merge consecutive ranges
+      val merged = mergeRanges(sorted, maxGapBytes)
+      allRanges ++= merged
+    }
+
+    allRanges.toSeq
+  }
+
+  /**
+   * Merge consecutive ranges within the gap threshold.
+   *
+   * @param rows   Sorted rows from the same file
+   * @param maxGap Maximum gap to consider for merging
+   * @return Sequence of merged ranges
+   */
+  private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): 
Seq[MergedRange[R]] = {
+
+    val result = ArrayBuffer[MergedRange[R]]()
+    var current: MergedRange[R] = null
+
+    rows.foreach { row =>
+      if (current == null) {
+        // Start first range
+        current = MergedRange[R](
+          filePath = row.filePath,
+          startOffset = row.offset,
+          endOffset = row.offset + row.length,
+          rows = Seq(row)
+        )
+      } else {
+        val gap = row.offset - current.endOffset
+
+        if (gap >= 0 && gap <= maxGap) {
+          // Merge into current range
+          current = current.merge(row)
+        } else {
+          // Save current range and start new one
+          result += current
+          current = MergedRange[R](
+            filePath = row.filePath,
+            startOffset = row.offset,
+            endOffset = row.offset + row.length,
+            rows = Seq(row)
+          )
+        }
+      }
+    }
+
+    // Add final range
+    if (current != null) {
+      result += current
+    }
+
+    result.toSeq
+  }
+
+  /**
+   * Read an entire file and return it as a single row result.
+   *
+   * Used for whole-file out-of-line blobs where no offset or length is 
specified.
+   *
+   * @param rowInfo      Row information with the file path
+   * @param outputSchema Schema for output rows
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Sequence containing a single row result
+   */
+  private def readWholeFile[R](
+      rowInfo: RowInfo[R],
+      outputSchema: StructType)
+      (implicit builder: RowBuilder[R]): RowResult[R] = {
+
+    var inputStream: SeekableDataInputStream = null
+    try {
+      val path = new StoragePath(rowInfo.filePath)
+      val fileLength = storage.getPathInfo(path).getLength.toInt

Review Comment:
   Possible to throw an exception here?
   ```
   if (totalLength > Integer.MAX_VALUE) {                                       
                                                     
       throw new UnsupportedOperationException(s"Blob range of $totalLength 
bytes exceeds 2GB JVM array limit")
   } 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to