yihua commented on code in PR #18098:
URL: https://github.com/apache/hudi/pull/18098#discussion_r3042097433


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReaderStrategy.scala:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.hudi.HoodieSparkConfUtils
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.conf

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸก Minor_
   
   **Remove unused import `SimpleAnalyzer.conf`.**
   
   This import is not used anywhere in the file and should be removed.
   
   
   <details>
   <summary>๐Ÿงน Proposed fix</summary>
   
   ```diff
   -import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.conf
   ```
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>๐Ÿ“ Committable suggestion</summary>
   
   > โ€ผ๏ธ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
   
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReaderStrategy.scala`
   at line 26, Remove the unused import SimpleAnalyzer.conf from
   BatchedBlobReaderStrategy.scala; locate the import statement "import
   org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.conf" in the
   BatchedBlobReaderStrategy file and delete that import line so the file no 
longer
   references the unused SimpleAnalyzer.conf symbol.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:ocelot:c5d9f041-9071-4084-acbf-7a4c5f4d4e98 -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096047)) 
(source:comment#3042096047)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression, ExprId, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Transforms queries with `read_blob()` to use lazy batched I/O.
+ *
+ * Replaces [[ReadBlobExpression]] markers with [[BatchedBlobRead]] nodes
+ * that read blob data during physical execution.
+ *
+ * Example: `SELECT id, read_blob(image_data) FROM table`
+ *
+ * @param spark SparkSession for accessing configuration
+ */
+case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp 
{
+    case Filter(condition, child)
+      if containsReadBlobInExpression(condition)
+        && !child.isInstanceOf[BatchedBlobRead] =>
+
+      val blobColumns = extractBlobColumnsFromExpression(condition)
+      val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child)
+      val newCondition = replaceReadBlobExpression(condition, blobToDataAttr)
+      Project(child.output, Filter(newCondition, wrappedPlan))
+
+    case Project(projectList, child)
+      if containsReadBlobExpression(projectList)
+        && !child.isInstanceOf[BatchedBlobRead] =>
+
+      val blobColumns = extractAllBlobColumns(projectList)
+      val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child)
+      val newProjectList = transformNamedExpressions(projectList, 
blobToDataAttr)
+      Project(newProjectList, wrappedPlan)
+  }

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸ  Major_
   
   **Avoid double blob reads when `read_blob()` appears in both WHERE and 
SELECT.**
   
   With current `Filter` + `Project` independent rewrites, Line 51 drops 
`dataAttr` via `Project(child.output, ...)`, and the parent `Project` rule 
wraps again. That can cause the same blob column to be fetched twice in one 
query.
   
   
   <details>
   <summary>โšก Suggested rewrite shape to share one wrapped plan</summary>
   
   ```diff
    override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperatorsUp {
   +  case Project(projectList, Filter(condition, child))
   +      if containsReadBlobExpression(projectList)
   +        && containsReadBlobInExpression(condition)
   +        && !child.isInstanceOf[BatchedBlobRead] =>
   +
   +    val projectBlobCols = extractAllBlobColumns(projectList)
   +    val filterBlobCols = extractBlobColumnsFromExpression(condition)
   +    val blobColumns = (projectBlobCols ++ filterBlobCols)
   +      .foldLeft((mutable.LinkedHashSet.empty[ExprId], 
ArrayBuffer.empty[AttributeReference])) {
   +        case ((seen, acc), a) if seen.add(a.exprId) => (seen, acc += a)
   +        case ((seen, acc), _) => (seen, acc)
   +      }._2.toSeq
   +
   +    val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, 
child)
   +    val newCondition = replaceReadBlobExpression(condition, blobToDataAttr)
   +    val newProjectList = transformNamedExpressions(projectList, 
blobToDataAttr)
   +    Project(newProjectList, Filter(newCondition, wrappedPlan))
   +
      case Filter(condition, child)
        if containsReadBlobInExpression(condition)
          && !child.isInstanceOf[BatchedBlobRead] =>
   ```
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>๐Ÿ“ Committable suggestion</summary>
   
   > โ€ผ๏ธ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
     override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperatorsUp {
       case Project(projectList, Filter(condition, child))
         if containsReadBlobExpression(projectList)
           && containsReadBlobInExpression(condition)
           && !child.isInstanceOf[BatchedBlobRead] =>
   
         val projectBlobCols = extractAllBlobColumns(projectList)
         val filterBlobCols = extractBlobColumnsFromExpression(condition)
         val blobColumns = (projectBlobCols ++ filterBlobCols)
           .foldLeft((mutable.LinkedHashSet.empty[ExprId], 
ArrayBuffer.empty[AttributeReference])) {
             case ((seen, acc), a) if seen.add(a.exprId) => (seen, acc += a)
             case ((seen, acc), _) => (seen, acc)
           }._2.toSeq
   
         val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, 
child)
         val newCondition = replaceReadBlobExpression(condition, blobToDataAttr)
         val newProjectList = transformNamedExpressions(projectList, 
blobToDataAttr)
         Project(newProjectList, Filter(newCondition, wrappedPlan))
   
       case Filter(condition, child)
         if containsReadBlobInExpression(condition)
           && !child.isInstanceOf[BatchedBlobRead] =>
   
         val blobColumns = extractBlobColumnsFromExpression(condition)
         val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, 
child)
         val newCondition = replaceReadBlobExpression(condition, blobToDataAttr)
         Project(child.output, Filter(newCondition, wrappedPlan))
   
       case Project(projectList, child)
         if containsReadBlobExpression(projectList)
           && !child.isInstanceOf[BatchedBlobRead] =>
   
         val blobColumns = extractAllBlobColumns(projectList)
         val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, 
child)
         val newProjectList = transformNamedExpressions(projectList, 
blobToDataAttr)
         Project(newProjectList, wrappedPlan)
     }
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala`
   around lines 43 - 61, The current two independent rewrite cases can cause
   duplicate blob reads when read_blob() appears in both WHERE and SELECT; add a
   combined case to match Project(projectList, Filter(condition, child)) (or 
Filter
   inside Project) before the separate cases, compute the union of blob columns
   using extractBlobColumnsFromExpression(condition) and
   extractAllBlobColumns(projectList), call wrapWithBlobReads once to produce
   (wrappedPlan, blobToDataAttr), then build a single nested plan:
   Project(transformNamedExpressions(projectList, blobToDataAttr),
   Filter(replaceReadBlobExpression(condition, blobToDataAttr), wrappedPlan)); 
keep
   the existing individual cases as fallbacks.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:hawk:ee0423cb-9f35-47a0-83cc-67a4d6291cb7 -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096051)) 
(source:comment#3042096051)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
+
+/**
+ * Registry of scalar functions for Hudi SQL integration.
+ *
+ * These functions are registered as built-in functions that can be used
+ * in SQL queries. They integrate with Spark's function registry and are
+ * available in both SQL and DataFrame API contexts.
+ *
+ * <h3>Function Registration:</h3>
+ * Functions are registered via [[SparkAdapter.injectScalarFunctions]] which is
+ * called during [[HoodieSparkSessionExtension]] initialization.
+ *
+ * <h3>Adding New Functions:</h3>
+ * To add a new scalar function:
+ * <ol>
+ *   <li>Create a marker expression class (extends Unevaluable)</li>
+ *   <li>Add function definition tuple to [[funcs]] below</li>
+ *   <li>Create a logical plan rule to handle the expression</li>
+ *   <li>Register the rule in 
[[HoodieAnalysis.customPostHocResolutionRules]]</li>
+ * </ol>
+ */
+object ScalarFunctions {
+
+  private val READ_BLOB_FUNC_NAME = "read_blob"
+
+  /**
+   * Function definitions as tuples of:
+   * <ul>
+   *   <li>FunctionIdentifier - function name</li>
+   *   <li>ExpressionInfo - metadata for DESCRIBE FUNCTION</li>
+   *   <li>Builder function - (Seq[Expression] => Expression)</li>
+   * </ul>
+   */
+  val funcs: Seq[(FunctionIdentifier, ExpressionInfo, Seq[Expression] => 
Expression)] = Seq(
+    (
+      FunctionIdentifier(READ_BLOB_FUNC_NAME),
+      new ExpressionInfo(
+        classOf[ReadBlobExpression].getCanonicalName,
+        READ_BLOB_FUNC_NAME,
+        """
+          |Usage: read_blob(blob_column) - Reads blob data from storage
+          |
+          |Reads byte ranges from files referenced in a blob column. The 
column must have
+          |metadata hudi_blob=true.

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸก Minor_
   
   **Function docs overstate metadata requirement.**
   
   Line 66-67 states `hudi_blob=true` is required, but current 
rewrite/validation flow is based on BlobType structural compatibility. This can 
mislead users about what actually fails.
   
   
   <details>
   <summary>๐Ÿ“ Proposed doc-only fix</summary>
   
   ```diff
   -          |Reads byte ranges from files referenced in a blob column. The 
column must have
   -          |metadata hudi_blob=true.
   +          |Reads byte ranges from files referenced in a blob-compatible 
column.
   +          |The column must be structurally compatible with 
HoodieSchema.Blob.
   ```
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>๐Ÿ“ Committable suggestion</summary>
   
   > โ€ผ๏ธ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
             |Usage: read_blob(blob_column) - Reads blob data from storage
             |
             |Reads byte ranges from files referenced in a blob-compatible 
column.
             |The column must be structurally compatible with HoodieSchema.Blob.
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala`
   around lines 64 - 67, The README-style docstring for read_blob in
   ScalarFunctions.scala incorrectly asserts that the column must have metadata
   hudi_blob=true; update that documentation to state that read_blob requires a
   column whose type is structurally compatible with BlobType (the
   validation/rewrite checks BlobType compatibility rather than a specific 
metadata
   flag). Edit the usage/comment block near the read_blob function in
   ScalarFunctions.scala to remove or replace the `hudi_blob=true` claim with a
   note that the column must be a BlobType-compatible column and mention that
   structural type validation is performed during rewrite/validation.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:hawk:ee0423cb-9f35-47a0-83cc-67a4d6291cb7 -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096059)) 
(source:comment#3042096059)



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBatchedBlobReader.scala:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.blob.BlobTestHelpers._
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hudi.blob.BatchedBlobReader
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import java.util.Collections
+
+/**
+ * Tests for BatchedByteRangeReader.
+ *
+ * These tests verify the batching behavior and effectiveness of the
+ * BatchedByteRangeReader compared to non-batched approaches.
+ */
+class TestBatchedBlobReader extends HoodieClientTestBase {
+
+  @Test
+  def testBasicBatchedRead(): Unit = {
+    val filePath = createTestFile(tempDir, "basic.bin", 10000)
+
+    // Create input with struct column
+    val inputDF = sparkSession.createDataFrame(Seq(
+      (filePath, 0L, 100L),
+      (filePath, 100L, 100L),
+      (filePath, 200L, 100L)
+    )).toDF("external_path", "offset", "length")
+      .withColumn("data", blobStructCol("data", col("external_path"), 
col("offset"), col("length")))
+      .select("data")
+
+    // Read with batching
+    val resultDF = BatchedBlobReader.readBatched(inputDF, storageConf)
+
+    // Verify schema
+    assertTrue(resultDF.columns.contains("data"))
+    assertEquals(1, resultDF.columns.length) // data
+
+    // Verify results
+    val results = resultDF.collect()
+    assertEquals(3, results.length)
+
+    // Check data content
+    results.zipWithIndex.foreach { case (row, i) =>
+      val data = row.getAs[Array[Byte]]("data")
+      assertEquals(100, data.length)
+
+      // Verify content matches expected pattern
+      assertBytesContent(data, expectedOffset = i * 100)
+    }

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸก Minor_
   
   **Make this assertion order-deterministic to avoid flaky tests.**
   
   `collect()` + `zipWithIndex` assumes stable row ordering, which is not 
guaranteed. This can intermittently fail the expected-offset checks.
   
   
   <details>
   <summary>โœ… Deterministic test adjustment</summary>
   
   ```diff
   -    val inputDF = sparkSession.createDataFrame(Seq(
   +    val inputDF = sparkSession.createDataFrame(Seq(
          (filePath, 0L, 100L),
          (filePath, 100L, 100L),
          (filePath, 200L, 100L)
        )).toDF("external_path", "offset", "length")
          .withColumn("data", blobStructCol("data", col("external_path"), 
col("offset"), col("length")))
   -      .select("data")
   +      .select("offset", "data")
   @@
   -    val results = resultDF.collect()
   +    val results = resultDF.orderBy("offset").collect()
   @@
   -      assertBytesContent(data, expectedOffset = i * 100)
   +      assertBytesContent(data, expectedOffset = i * 100)
   ```
   </details>
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBatchedBlobReader.scala`
   around lines 64 - 74, The test assumes deterministic row order after
   resultDF.collect(); make it deterministic by ordering the DataFrame before
   collecting (e.g., use resultDF.orderBy("offset").collect()) or, if there is 
no
   offset column, sort the collected rows in-memory by a stable key (e.g.,
   results.sortBy(r => r.getAs[String]("fileId"))) before zipping; then run the
   assertBytesContent checks against the sorted sequence so
   assertBytesContent(data, expectedOffset = i * 100) always maps to the 
intended
   row. Ensure you modify the code that builds "results" (the resultDF -> 
results
   path) and keep the assertBytesContent usage unchanged.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:hawk:ee0423cb-9f35-47a0-83cc-67a4d6291cb7 -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096065)) 
(source:comment#3042096065)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala:
##########
@@ -0,0 +1,751 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.io.SeekableDataInputStream
+import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, 
StorageConfiguration, StoragePath}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, 
SpecificInternalRow}
+import org.apache.spark.sql.types.{BinaryType, BlobType, DataType, 
StructField, StructType}
+import org.slf4j.LoggerFactory
+
+import java.io.InputStream
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Batched byte range reader that optimizes I/O by combining consecutive reads 
for out-of-line data.
+ *
+ * This reader analyzes sequences of read requests within a partition and 
merges
+ * consecutive or nearby reads into single I/O operations. This significantly 
reduces
+ * the number of seeks and reads when processing sorted data.
+ *
+ * <h3>Schema Requirement:</h3>
+ * The blob column must match the schema defined in {@link HoodieSchema.Blob}:
+ * <pre>
+ * struct {
+ *   type: string                   // "inline" or "out_of_line"
+ *   data: binary (nullable)       // inline data (null for out_of_line)
+ *   reference: struct (nullable) { // file reference (null for inline)
+ *     external_path: string
+ *     offset: long
+ *     length: long
+ *     managed: boolean
+ *   }
+ * }
+ * </pre>
+ *
+ * <h3>Key Features:</h3>
+ * <ul>
+ *   <li>Batches consecutive reads from the same file</li>
+ *   <li>Configurable gap threshold for merging nearby reads</li>
+ *   <li>Lookahead buffer to identify batch opportunities</li>
+ *   <li>Preserves input row order in output</li>
+ * </ul>
+ *
+ * <h3>Usage Example:</h3>
+ * {{{
+ * import org.apache.hudi.udf.BatchedByteRangeReader
+ * import org.apache.spark.sql.functions._
+ * // Read a table with a blob column (e.g. image_data)
+ * val df = spark.read.format("hudi").load("/my_path").select("image_data", 
"record_id")
+ *
+ * // Read with batching (best when data is sorted by external_path, offset)
+ * val result = BatchedByteRangeReader.readBatched(df, structColName = 
"file_info")
+ *
+ * // Result has: image_data, record_id, data
+ * result.show()
+ * }}}
+ *
+ * <h3>Performance Tips:</h3>
+ * <ul>
+ *   <li>Sort input by (blob.reference.external_path, blob.reference.offset) 
for maximum batching effectiveness</li>
+ *   <li>Increase lookaheadSize for better batch detection (at cost of 
memory)</li>
+ *   <li>Tune maxGapBytes based on your data access patterns</li>
+ * </ul>
+ *
+ * @param storage        HoodieStorage instance for file I/O
+ * @param maxGapBytes    Maximum gap between ranges to consider for batching 
(default: 4KB)
+ * @param lookaheadSize  Number of rows to buffer for batch detection 
(default: 50)
+ */
+class BatchedBlobReader(
+    storage: HoodieStorage,
+    maxGapBytes: Int = 4096,
+    lookaheadSize: Int = 50) {
+
+  private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader])
+
+  /**
+   * Process a partition iterator, batching consecutive reads.
+   *
+   * This method consumes the input iterator and produces an output iterator
+   * with each row containing the original data plus a "data" column with the
+   * bytes read from the file.
+   *
+   * @param rows         Iterator of input rows with struct column
+   * @param structColIdx Index of the struct column in the row
+   * @param outputSchema Schema for output rows
+   * @param accessor     Type class for accessing row fields
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Iterator of output rows with data column added
+   */
+  def processPartition[R](
+      rows: Iterator[R],
+      structColIdx: Int,
+      outputSchema: StructType)
+      (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] 
= {
+
+    // Create buffered iterator for lookahead
+    val bufferedRows = rows.buffered
+
+    // Result buffer to maintain order
+    val resultIterator = new Iterator[R] {
+      private var currentBatch: Iterator[R] = Iterator.empty
+      private var rowIndex = 0L
+
+      override def hasNext: Boolean = {
+        if (currentBatch.hasNext) {
+          true
+        } else if (bufferedRows.hasNext) {
+          // Process next batch
+          currentBatch = processNextBatch()
+          currentBatch.hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): R = {
+        if (!hasNext) {
+          throw new NoSuchElementException("No more rows")
+        }
+        currentBatch.next()
+      }
+
+      /**
+       * Collect and process the next batch of rows.
+       */
+      private def processNextBatch(): Iterator[R] = {
+        // Collect up to lookaheadSize rows with their original indices
+        val batch = collectBatch()
+
+        if (batch.isEmpty) {
+          Iterator.empty
+        } else {
+          // Partition the batch into three groups
+          val (inlineRows, outOfLineRows) = 
batch.partition(_.inlineBytes.isDefined)
+          val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 
0)
+
+          // Case 1: Inline โ€” return bytes directly without I/O
+          val inlineResults = inlineRows.map { ri =>
+            RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, 
outputSchema), ri.index)
+          }
+
+          // Case 2: Whole-file reads
+          val wholeFileResults = wholeFileRows.map(readWholeFile(_, 
outputSchema))
+
+          // Case 3: Regular range reads โ€” merge consecutive ranges and batch
+          val mergedRanges = identifyConsecutiveRanges(rangeRows)
+          val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, 
outputSchema))
+
+          // Sort by original index to preserve input order
+          (inlineResults ++ wholeFileResults ++ 
rangeResults).sortBy(_.index).map(_.row).iterator
+        }
+      }
+
+      /**
+       * Collect up to lookaheadSize rows from the input iterator.
+       */
+      private def collectBatch(): Seq[RowInfo[R]] = {
+        val batch = ArrayBuffer[RowInfo[R]]()
+        var collected = 0
+
+        while (bufferedRows.hasNext && collected < lookaheadSize) {
+          val row = bufferedRows.next()
+          // Handle null struct column (null blob)
+          if (accessor.isNullAt(row, structColIdx)) {
+            batch += RowInfo[R](
+              originalRow = row,
+              filePath = "",
+              offset = -1,
+              length = -1,
+              index = rowIndex,
+              inlineBytes = Some(null)
+            )
+            rowIndex += 1
+            collected += 1
+          } else {
+            val blobStruct = accessor.getStruct(row, structColIdx, 
HoodieSchema.Blob.getFieldCount)
+            // Dispatch based on storage_type (field 0)
+            val storageType = accessor.getString(blobStruct, 0)
+            if (storageType == HoodieSchema.Blob.INLINE) {
+              // Case 1: Inline โ€” bytes are in field 1
+              val bytes = accessor.getBytes(blobStruct, 1)
+              batch += RowInfo[R](
+                originalRow = row,
+                filePath = "",
+                offset = -1,
+                length = -1,
+                index = rowIndex,
+                inlineBytes = Some(bytes)
+              )
+            } else {
+              // Case 2 or 3: Out-of-line โ€” get reference struct (field 2)
+              val referenceStruct = accessor.getStruct(blobStruct, 2, 
HoodieSchema.Blob.getReferenceFieldCount)
+              val filePath = accessor.getString(referenceStruct, 0)
+              val offsetIsNull = accessor.isNullAt(referenceStruct, 1)
+              val lengthIsNull = accessor.isNullAt(referenceStruct, 2)
+              if (offsetIsNull || lengthIsNull) {
+                // Case 2: Whole-file read โ€” no offset/length specified; 
sentinel length = -1
+                batch += RowInfo[R](
+                  originalRow = row,
+                  filePath = filePath,
+                  offset = 0,
+                  length = -1,
+                  index = rowIndex
+                )
+              } else {
+                // Case 3: Regular range read
+                val offset = accessor.getLong(referenceStruct, 1)
+                val length = accessor.getLong(referenceStruct, 2)
+                batch += RowInfo[R](
+                  originalRow = row,
+                  filePath = filePath,
+                  offset = offset,
+                  length = length,
+                  index = rowIndex
+                )
+              }
+            }
+            rowIndex += 1
+            collected += 1
+          }
+        }
+        batch.toSeq
+      }
+    }
+
+    resultIterator
+  }
+
+  /**
+   * Identify consecutive ranges that can be batched together.
+   *
+   * This method groups rows by file path, sorts by offset, and merges
+   * ranges that are consecutive or within maxGapBytes of each other.
+   *
+   * @param rows Sequence of row information
+   * @return Sequence of merged ranges
+   */
+  private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): 
Seq[MergedRange[R]] = {
+    // Group by file path
+    val byFile = rows.groupBy(_.filePath)
+
+    val allRanges = ArrayBuffer[MergedRange[R]]()
+
+    byFile.foreach { case (filePath, fileRows) =>
+      // Sort by offset
+      val sorted = fileRows.sortBy(_.offset)
+
+      // Merge consecutive ranges
+      val merged = mergeRanges(sorted, maxGapBytes)
+      allRanges ++= merged
+    }
+
+    allRanges.toSeq
+  }
+
+  /**
+   * Merge consecutive ranges within the gap threshold.
+   *
+   * @param rows   Sorted rows from the same file
+   * @param maxGap Maximum gap to consider for merging
+   * @return Sequence of merged ranges
+   */
+  private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): 
Seq[MergedRange[R]] = {
+
+    val result = ArrayBuffer[MergedRange[R]]()
+    var current: MergedRange[R] = null
+
+    rows.foreach { row =>
+      if (current == null) {
+        // Start first range
+        current = MergedRange[R](
+          filePath = row.filePath,
+          startOffset = row.offset,
+          endOffset = row.offset + row.length,
+          rows = Seq(row)
+        )
+      } else {
+        val gap = row.offset - current.endOffset
+
+        if (gap >= 0 && gap <= maxGap) {
+          // Merge into current range
+          current = current.merge(row)
+        } else {
+          // Save current range and start new one
+          result += current
+          current = MergedRange[R](
+            filePath = row.filePath,
+            startOffset = row.offset,
+            endOffset = row.offset + row.length,
+            rows = Seq(row)
+          )
+        }
+      }
+    }
+
+    // Add final range
+    if (current != null) {
+      result += current
+    }
+
+    result.toSeq
+  }
+
+  /**
+   * Read an entire file and return it as a single row result.
+   *
+   * Used for whole-file out-of-line blobs where no offset or length is 
specified.
+   *
+   * @param rowInfo      Row information with the file path
+   * @param outputSchema Schema for output rows
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Sequence containing a single row result
+   */
+  private def readWholeFile[R](
+      rowInfo: RowInfo[R],
+      outputSchema: StructType)
+      (implicit builder: RowBuilder[R]): RowResult[R] = {
+
+    var inputStream: InputStream = null
+    try {
+      val path = new StoragePath(rowInfo.filePath)
+      inputStream = storage.open(path)
+      val buffer = inputStream.readAllBytes()
+
+      logger.debug(s"Read entire file ${rowInfo.filePath} (${buffer.length} 
bytes)")
+
+      RowResult(builder.buildRow(rowInfo.originalRow, buffer, outputSchema), 
rowInfo.index)
+    } finally {
+      if (inputStream != null) {
+        try {
+          inputStream.close()
+        } catch {
+          case e: Exception =>
+            logger.warn(s"Error closing stream for ${rowInfo.filePath}", e)
+        }
+      }
+    }
+  }
+
+  /**
+   * Read a merged range and split it back into individual row results.
+   *
+   * This method performs a single I/O operation to read the entire merged
+   * range, then splits the buffer into individual results for each original
+   * row.
+   *
+   * @param range        The merged range to read
+   * @param outputSchema Schema for output rows
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Sequence of row results with original indices
+   */
+  private def readAndSplitRange[R](
+      range: MergedRange[R],
+      outputSchema: StructType)
+      (implicit builder: RowBuilder[R]): Seq[RowResult[R]] = {
+
+    var inputStream: SeekableDataInputStream = null
+    try {
+      // Get or open file handle
+      inputStream = storage.openSeekable(new StoragePath(range.filePath), 
false)
+
+      // Seek to start offset
+      inputStream.seek(range.startOffset)
+
+      // Read the entire merged range
+      val totalLength = (range.endOffset - range.startOffset).toInt
+      val buffer = new Array[Byte](totalLength)
+      inputStream.readFully(buffer, 0, totalLength)

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸก Minor_
   
   **Potential integer overflow for large blobs exceeding 2GB.**
   
   `(range.endOffset - range.startOffset).toInt` will silently overflow for 
ranges larger than `Int.MaxValue` (~2GB). While unlikely for typical blob 
sizes, this could cause incorrect reads or `NegativeArraySizeException` for 
very large files.
   
   Consider adding a bounds check:
   
   
   
   <details>
   <summary>Proposed fix</summary>
   
   ```diff
          // Read the entire merged range
          val totalLength = (range.endOffset - range.startOffset).toInt
   +      require(totalLength >= 0, s"Range too large: ${range.endOffset - 
range.startOffset} bytes exceeds Int.MaxValue")
          val buffer = new Array[Byte](totalLength)
   ```
   
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>๐Ÿ“ Committable suggestion</summary>
   
   > โ€ผ๏ธ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
         val totalLength = (range.endOffset - range.startOffset).toInt
         require(totalLength >= 0, s"Range too large: ${range.endOffset - 
range.startOffset} bytes exceeds Int.MaxValue")
         val buffer = new Array[Byte](totalLength)
         inputStream.readFully(buffer, 0, totalLength)
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala`
   around lines 393 - 395, The code computes totalLength as (range.endOffset -
   range.startOffset).toInt which can overflow for lengths > Int.MaxValue; 
update
   BatchedBlobReader to compute the length in a Long (e.g., val totalLengthLong 
=
   range.endOffset - range.startOffset), check if totalLengthLong > Int.MaxValue
   and either throw a clear exception (e.g., IllegalArgumentException with the
   offending range and length) or implement chunked reads, and then allocate the
   buffer using a safe Int length (or loop reading into the buffer in chunks)
   before calling inputStream.readFully; ensure you update references to
   totalLength, buffer allocation, and inputStream.readFully accordingly.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:ocelot:c8bb2295-5168-440e-b6fe-d1f9c8bd0940 -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096037)) 
(source:comment#3042096037)



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala:
##########
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.blob.BlobTestHelpers._
+import org.apache.hudi.exception.HoodieIOException
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import java.util.Collections
+
+/**
+ * Tests for the read_blob() SQL function.
+ *
+ * This test suite verifies:
+ * <ul>
+ *   <li>Basic SQL integration with read_blob()</li>
+ *   <li>Integration with WHERE clauses, JOINs</li>
+ *   <li>Configuration parameter handling</li>
+ *   <li>Error handling for invalid inputs</li>
+ * </ul>
+ */
+class TestReadBlobSQL extends HoodieClientTestBase {

Review Comment:
   _โš ๏ธ Potential issue_ | _๐Ÿ”ด Critical_
   
   <details>
   <summary>๐Ÿงฉ Analysis chain</summary>
   
   ๐Ÿ Script executed:
   
   ```shell
   #!/bin/bash
   # Verify that HoodieClientTestBase does not override 
getSparkSessionExtensionsInjector
   ast-grep --pattern $'class HoodieClientTestBase {
     $$$
     getSparkSessionExtensionsInjector($$$) {
       $$$
     }
     $$$
   }'
   
   # Check if the parent harness returns Option.empty()
   rg -n "getSparkSessionExtensionsInjector" --type java -A 3
   ```
   
   Repository: yihua/hudi
   
   Length of output: 1831
   
   ---
   
   ๐Ÿ Script executed:
   
   ```shell
   # Find HoodieClientTestBase definition
   fd -type f -name "*.scala" -o -name "*.java" | xargs grep -l "class 
HoodieClientTestBase"
   
   # Check TestReadBlobSQL class hierarchy
   cat -n 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala
 | head -50
   ```
   
   Repository: yihua/hudi
   
   Length of output: 2262
   
   ---
   
   ๐Ÿ Script executed:
   
   ```shell
   # Find HoodieClientTestBase in testutils package
   fd "HoodieClientTestBase" hudi-spark-datasource/hudi-spark/src/test/
   
   # Also check in broader testutils
   fd "HoodieClientTestBase" | grep -E "\.(scala|java)$"
   ```
   
   Repository: yihua/hudi
   
   Length of output: 149
   
   ---
   
   ๐Ÿ Script executed:
   
   ```shell
   # Read HoodieClientTestBase to understand its implementation and parent
   cat -n 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 48365
   
   ---
   
   ๐Ÿ Script executed:
   
   ```shell
   # Find HoodieSparkSessionExtension and check if it registers read_blob
   fd -type f "HoodieSparkSessionExtension" | grep -E "\.(scala|java)$"
   ```
   
   Repository: yihua/hudi
   
   Length of output: 224
   
   ---
   
   ๐Ÿ Script executed:
   
   ```shell
   # Find HoodieSparkSessionExtension
   find . -name "*HoodieSparkSessionExtension*"
   ```
   
   Repository: yihua/hudi
   
   Length of output: 163
   
   ---
   
   ๐Ÿ Script executed:
   
   ```shell
   # Read HoodieSparkSessionExtension to see what functions it registers
   cat -n 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
   ```
   
   Repository: yihua/hudi
   
   Length of output: 2610
   
   ---
   
   ๐Ÿ Script executed:
   
   ```shell
   # Read TestReadBlobSQL to see what SQL functions it actually calls
   cat -n 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala
 | head -200
   ```
   
   Repository: yihua/hudi
   
   Length of output: 8075
   
   ---
   
   ๐Ÿ Script executed:
   
   ```shell
   # Check how sparkSessionExtensionsInjector is used in 
HoodieSparkClientTestHarness
   rg -n "sparkSessionExtensionsInjector\|getSparkSessionExtensionsInjector" 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
 -A 5 -B 2
   ```
   
   Repository: yihua/hudi
   
   Length of output: 36
   
   ---
   
   ๐Ÿ Script executed:
   
   ```shell
   # Read the relevant section of HoodieSparkClientTestHarness around extension 
injection
   sed -n '170,210p' 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 1732
   
   ---
   
   
   
   </details>
   
   **Tests will fail: `read_blob()` function is not registered.**
   
   The test class extends `HoodieClientTestBase`, which extends 
`HoodieSparkClientTestHarness` (the parent harness's 
`getSparkSessionExtensionsInjector()` returns `Option.empty()` by default). The 
`read_blob()` SQL function is registered via `HoodieSparkSessionExtension`, so 
all SQL queries using `read_blob()` will fail with an "undefined function" 
error.
   
   Override `getSparkSessionExtensionsInjector()` to inject the extension:
   
   <details>
   <summary>Proposed fix</summary>
   
   ```diff
    class TestReadBlobSQL extends HoodieClientTestBase {
   +
   +  override protected def getSparkSessionExtensionsInjector: 
Option[Consumer[SparkSessionExtensions]] = {
   +    Option.apply(new Consumer[SparkSessionExtensions] {
   +      override def accept(extensions: SparkSessionExtensions): Unit = {
   +        new HoodieSparkSessionExtension().apply(extensions)
   +      }
   +    })
   +  }
   ```
   
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>๐Ÿ“ Committable suggestion</summary>
   
   > โ€ผ๏ธ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
   class TestReadBlobSQL extends HoodieClientTestBase {
   
     override protected def getSparkSessionExtensionsInjector: 
Option[Consumer[SparkSessionExtensions]] = {
       Option.apply(new Consumer[SparkSessionExtensions] {
         override def accept(extensions: SparkSessionExtensions): Unit = {
           new HoodieSparkSessionExtension().apply(extensions)
         }
       })
     }
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala`
   at line 45, The test class TestReadBlobSQL fails because read_blob() isn't
   registered; override getSparkSessionExtensionsInjector() in TestReadBlobSQL
   (inheriting from HoodieClientTestBase/HoodieSparkClientTestHarness) to 
return a
   non-empty Option that injects HoodieSparkSessionExtension so the read_blob 
SQL
   function is registered before tests run; implement the override method to 
return
   Some(sparkSessionExtensions => sparkSessionExtensions.injectExtension(new
   HoodieSparkSessionExtension())) or equivalent to register the extension.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:ocelot:c8bb2295-5168-440e-b6fe-d1f9c8bd0940 -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096068)) 
(source:comment#3042096068)



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala:
##########
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.blob
+
+import org.apache.hudi.blob.BlobTestHelpers._
+import org.apache.hudi.exception.HoodieIOException
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import java.util.Collections
+
+/**
+ * Tests for the read_blob() SQL function.
+ *
+ * This test suite verifies:
+ * <ul>
+ *   <li>Basic SQL integration with read_blob()</li>
+ *   <li>Integration with WHERE clauses, JOINs</li>
+ *   <li>Configuration parameter handling</li>
+ *   <li>Error handling for invalid inputs</li>
+ * </ul>
+ */
+class TestReadBlobSQL extends HoodieClientTestBase {
+
+  @Test
+  def testBasicReadBlobSQL(): Unit = {
+    val filePath = createTestFile(tempDir, "basic.bin", 10000)
+
+    // Main DataFrame with blobStructCol
+    val df = sparkSession.createDataFrame(Seq(
+      (1, "record1", filePath, 0L, 100L),
+      (3, "record3", filePath, 100L, 100L),
+      (4, "record4", filePath, 200L, 100L)
+    )).toDF("id", "name", "external_path", "offset", "length")
+      .withColumn("file_info", blobStructCol("file_info", 
col("external_path"), col("offset"), col("length")))
+      .select("id", "name", "file_info")
+
+    // Ensure file_info is nullable in the schema
+    val schema = StructType(df.schema.map {
+      case StructField("file_info", dt, _, md) => StructField("file_info", dt, 
nullable = true, md)
+      case other => other
+    })
+    val dfWithNullable = sparkSession.createDataFrame(df.rdd, schema)
+
+    // DataFrame with a null blob value
+    val nullRow = Row(2, "record2", null)
+    val nullDf = 
sparkSession.createDataFrame(Collections.singletonList(nullRow), schema)
+
+    // Union the null row
+    val fullDf = dfWithNullable.unionByName(nullDf)
+    fullDf.createOrReplaceTempView("test_table")
+
+    // Use SQL with read_blob
+    val result = sparkSession.sql("""
+      SELECT id, name, read_blob(file_info) as data
+      FROM test_table
+      WHERE id <= 3
+      ORDER BY id
+    """)
+
+    val rows = result.collect()
+    assertEquals(3, rows.length)
+
+    // Verify data is binary for non-null rows
+    val data1 = rows(0).getAs[Array[Byte]]("data")
+    assertEquals(100, data1.length)
+    assertBytesContent(data1)
+
+    // The null_blob row should have null data
+    assertTrue(rows(1).isNullAt(2))
+
+    val data3 = rows(2).getAs[Array[Byte]]("data")
+    assertEquals(100, data3.length)
+    assertBytesContent(data3, expectedOffset = 100)
+  }
+
+  @Test
+  def testReadBlobWithJoin(): Unit = {
+    val filePath = createTestFile(tempDir, "join.bin", 10000)
+
+    // Create blob table
+    val blobDF = sparkSession.createDataFrame(Seq(
+      (1, filePath, 0L, 100L),
+      (2, filePath, 100L, 100L)
+    )).toDF("id", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "file_info")
+
+    blobDF.createOrReplaceTempView("blob_table")
+
+    // Create metadata table
+    val metaDF = sparkSession.createDataFrame(Seq(
+      (1, "Alice"),
+      (2, "Bob")
+    )).toDF("id", "name")
+
+    metaDF.createOrReplaceTempView("meta_table")
+
+    // SQL with JOIN
+    val result = sparkSession.sql("""
+      SELECT m.id, m.name, read_blob(b.file_info) as data
+      FROM meta_table m
+      JOIN blob_table b ON m.id = b.id
+      ORDER BY m.id
+    """)
+
+    val rows = result.collect()
+    assertEquals(2, rows.length)
+    assertEquals("Alice", rows(0).getAs[String]("name"))
+    assertEquals(100, rows(0).getAs[Array[Byte]]("data").length)
+    assertEquals("Bob", rows(1).getAs[String]("name"))
+    assertEquals(100, rows(1).getAs[Array[Byte]]("data").length)
+
+    // Verify data content
+    val data1 = rows(0).getAs[Array[Byte]]("data")
+    assertBytesContent(data1)
+  }
+
+  @Test
+  def testReadBlobInSubquery(): Unit = {
+    val filePath = createTestFile(tempDir, "subquery.bin", 10000)
+
+    val df = sparkSession.createDataFrame(Seq(
+      (1, "A", filePath, 0L, 100L),
+      (2, "A", filePath, 100L, 100L),
+      (3, "B", filePath, 200L, 100L)
+    )).toDF("id", "category", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "category", "file_info")
+
+    df.createOrReplaceTempView("subquery_table")
+
+    // SQL with subquery
+    val result = sparkSession.sql("""
+      SELECT * FROM (
+        SELECT id, category, read_blob(file_info) as data
+        FROM subquery_table
+      ) WHERE category = 'A'
+    """)
+
+    val rows = result.collect()
+    assertEquals(2, rows.length)
+    rows.foreach { row =>
+      assertEquals("A", row.getAs[String]("category"))
+      assertEquals(100, row.getAs[Array[Byte]]("data").length)
+    }
+  }
+
+  @Test
+  def testConfigurationParameters(): Unit = {
+    val filePath = createTestFile(tempDir, "config.bin", 50000)
+
+    val df = sparkSession.createDataFrame(Seq(
+      (1, filePath, 0L, 100L),
+      (2, filePath, 5000L, 100L),  // 4.9KB gap
+      (3, filePath, 10000L, 100L)
+    )).toDF("id", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "file_info")
+
+    df.createOrReplaceTempView("config_table")
+
+    // Use withSparkConfig to automatically manage configuration
+    withSparkConfig(sparkSession, Map(
+      "hoodie.blob.batching.max.gap.bytes" -> "10000",
+      "hoodie.blob.batching.lookahead.size" -> "100"
+    )) {
+      val result = sparkSession.sql("""
+        SELECT id, read_blob(file_info) as data
+        FROM config_table
+      """)
+
+      val rows = result.collect()
+      assertEquals(3, rows.length)
+
+      // Verify all reads completed successfully
+      rows.foreach { row =>
+        assertEquals(100, row.getAs[Array[Byte]]("data").length)
+      }
+    }
+  }
+
+  @Test
+  def testMultipleReadBlobInSameQuery(): Unit = {
+    val filePath1 = createTestFile(tempDir, "multi1.bin", 10000)
+    val filePath2 = createTestFile(tempDir, "multi2.bin", 10000)
+
+    val df = sparkSession.createDataFrame(Seq(
+      (1, filePath1, 0L, 50L, filePath2, 500L, 50L),
+      (2, filePath1, 100L, 50L, filePath2, 600L, 50L)
+    )).toDF("id", "external_path1", "offset1", "length1", "external_path2", 
"offset2", "length2")
+      .withColumn("file_info1",
+        blobStructCol("file_info1", col("external_path1"), col("offset1"), 
col("length1")))
+      .withColumn("file_info2",
+        blobStructCol("file_info2", col("external_path2"), col("offset2"), 
col("length2")))
+      .select("id", "file_info1", "file_info2")
+
+    df.createOrReplaceTempView("multi_table")
+
+    // SQL with multiple read_blob calls
+    val result = sparkSession.sql("""
+      SELECT
+        id,
+        read_blob(file_info1) as data1,
+        read_blob(file_info2) as data2
+      FROM multi_table
+    """)
+
+    val rows = result.collect()
+    assertEquals(2, rows.length)
+
+    // Row 1: data1 = file1 at offset 0, data2 = file2 at offset 500
+    val data1_row1 = rows(0).getAs[Array[Byte]]("data1")
+    val data2_row1 = rows(0).getAs[Array[Byte]]("data2")
+    assertEquals(50, data1_row1.length)
+    assertEquals(50, data2_row1.length)
+    assertBytesContent(data1_row1, expectedOffset = 0)
+    assertBytesContent(data2_row1, expectedOffset = 500)
+
+    // Row 2: data1 = file1 at offset 100, data2 = file2 at offset 600
+    val data1_row2 = rows(1).getAs[Array[Byte]]("data1")
+    val data2_row2 = rows(1).getAs[Array[Byte]]("data2")
+    assertBytesContent(data1_row2, expectedOffset = 100)
+    assertBytesContent(data2_row2, expectedOffset = 600)
+  }
+
+  @Test
+  def testReadBlobWithEmptyResult(): Unit = {
+    val filePath = createTestFile(tempDir, "empty.bin", 10000)
+
+    val df = sparkSession.createDataFrame(Seq(
+      (1, filePath, 0L, 100L),
+      (2, filePath, 100L, 100L)
+    )).toDF("id", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "file_info")
+
+    df.createOrReplaceTempView("empty_table")
+
+    // SQL that returns no rows
+    val result = sparkSession.sql("""
+      SELECT id, read_blob(file_info) as data
+      FROM empty_table
+      WHERE id > 100
+    """)
+
+    val rows = result.collect()
+    assertEquals(0, rows.length)
+  }
+
+  @Test
+  def testReadBlobMultipleFiles(): Unit = {
+    val filePath1 = createTestFile(tempDir, "file1.bin", 10000)
+    val filePath2 = createTestFile(tempDir, "file2.bin", 10000)
+
+    val df = sparkSession.createDataFrame(Seq(
+      (1, filePath1, 0L, 100L),
+      (2, filePath2, 0L, 100L),
+      (3, filePath1, 100L, 100L),
+      (4, filePath2, 100L, 100L)
+    )).toDF("id", "external_path", "offset", "length")
+    .withColumn("file_info",
+      blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "file_info")
+
+    df.createOrReplaceTempView("multi_file_table")
+
+    // SQL reading from multiple files
+    val result = sparkSession.sql("""
+      SELECT id, read_blob(file_info) as data
+      FROM multi_file_table
+      ORDER BY id
+    """)
+
+    val rows = result.collect()
+    assertEquals(4, rows.length)
+
+    // Verify all data was read correctly
+    rows.foreach { row =>
+      assertEquals(100, row.getAs[Array[Byte]]("data").length)
+    }
+  }
+
+  @Test
+  def testReadBlobInWhereClause(): Unit = {
+    val filePath = createTestFile(tempDir, "where.bin", 10000)
+    val df = sparkSession.createDataFrame(Seq(
+      (1, filePath, 0L, 50L),   // 50 bytes โ€” filtered out
+      (2, filePath, 100L, 100L), // 100 bytes โ€” passes filter
+      (3, filePath, 200L, 200L)  // 200 bytes โ€” passes filter
+    )).toDF("id", "external_path", "offset", "length")
+      .withColumn("file_info", blobStructCol("file_info", 
col("external_path"), col("offset"), col("length")))
+      .select("id", "file_info")
+    df.createOrReplaceTempView("where_table")
+
+    val result = sparkSession.sql("""
+      SELECT id, read_blob(file_info) AS data
+      FROM where_table
+      WHERE length(read_blob(file_info)) > 50
+      ORDER BY id
+    """)
+
+    val rows = result.collect()
+    assertEquals(2, rows.length)
+
+    // validate that rows with IDs 2 and 3 are returned
+    assertEquals(2, rows(0).getInt(0))
+    assertEquals(3, rows(1).getInt(0))
+  }
+
+  @Test
+  def testReadBlobWithCaseWhen(): Unit = {
+    val filePath = createTestFile(tempDir, "case.bin", 10000)
+
+    val df = sparkSession.createDataFrame(Seq(
+      (1, true, filePath, 0L, 100L),
+      (2, false, filePath, 100L, 100L),
+      (3, true, filePath, 200L, 100L)
+    )).toDF("id", "should_resolve", "external_path", "offset", "length")
+      .withColumn("file_info",
+        blobStructCol("file_info", col("external_path"), col("offset"), 
col("length")))
+      .select("id", "should_resolve", "file_info")
+
+    df.createOrReplaceTempView("case_table")
+
+    // SQL with CASE WHEN - note: this tests that the expression is handled
+    // even in conditional contexts
+    val result = sparkSession.sql("""
+      SELECT
+        id,
+        should_resolve,
+        CASE
+          WHEN should_resolve THEN read_blob(file_info)
+          ELSE NULL
+        END as data
+      FROM case_table
+    """)
+
+    val rows = result.collect()
+    assertEquals(3, rows.length)
+
+    // Row 1 should have data
+    assertTrue(rows(0).getAs[Boolean]("should_resolve"))
+    assertNotNull(rows(0).get(2))
+
+    // Row 2 should have null
+    assertFalse(rows(1).getAs[Boolean]("should_resolve"))
+    assertTrue(rows(1).isNullAt(2))
+
+    // Row 3 should have data
+    assertTrue(rows(2).getAs[Boolean]("should_resolve"))
+    assertNotNull(rows(2).get(2))
+  }
+
+  @Test
+  def testReadBlobWithMissingFile(): Unit = {
+    val missingPath = tempDir.resolve("does_not_exist.bin").toString
+    val df = sparkSession.createDataFrame(Seq(
+      (1, missingPath, 0L, 10L)
+    )).toDF("id", "external_path", "offset", "length")
+      .withColumn("file_info", blobStructCol("file_info", 
col("external_path"), col("offset"), col("length")))
+      .select("id", "file_info")
+    df.createOrReplaceTempView("missing_file_table")
+    val thrown = assertThrows(classOf[Exception], () => {
+      sparkSession.sql("SELECT id, read_blob(file_info) as data FROM 
missing_file_table").collect()
+    })
+    assertTrue(thrown.getCause.isInstanceOf[HoodieIOException])
+  }
+
+  @Test
+  def testReadBlobOnNonBlobColumn(): Unit = {
+    val df = sparkSession.createDataFrame(Seq(
+      (1, "not_a_blob")
+    )).toDF("id", "not_blob")
+    df.createOrReplaceTempView("non_blob_table")
+    val thrown = assertThrows(classOf[Exception], () => {
+      sparkSession.sql("SELECT id, read_blob(not_blob) as data FROM 
non_blob_table").collect()
+    })
+    assertTrue(thrown.isInstanceOf[IllegalArgumentException])
+    assertTrue(thrown.getMessage.contains("must be compatible with BlobType"))

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸก Minor_
   
   **Inconsistent exception unwrapping pattern.**
   
   This test checks `thrown.isInstanceOf[IllegalArgumentException]` directly, 
while `testReadBlobWithMissingFile` (lines 390-393) checks 
`thrown.getCause.isInstanceOf[HoodieIOException]`. Spark typically wraps 
exceptions in `SparkException`, so if the `IllegalArgumentException` is 
wrapped, this assertion will fail.
   
   Use consistent exception checking:
   
   
   
   <details>
   <summary>Proposed fix</summary>
   
   ```diff
        val thrown = assertThrows(classOf[Exception], () => {
          sparkSession.sql("SELECT id, read_blob(not_blob) as data FROM 
non_blob_table").collect()
        })
   -    assertTrue(thrown.isInstanceOf[IllegalArgumentException])
   -    assertTrue(thrown.getMessage.contains("must be compatible with 
BlobType"))
   +    val cause = if (thrown.isInstanceOf[IllegalArgumentException]) thrown 
else thrown.getCause
   +    assertTrue(cause.isInstanceOf[IllegalArgumentException])
   +    assertTrue(cause.getMessage.contains("must be compatible with 
BlobType"))
   ```
   
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>๐Ÿ“ Committable suggestion</summary>
   
   > โ€ผ๏ธ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
       val thrown = assertThrows(classOf[Exception], () => {
         sparkSession.sql("SELECT id, read_blob(not_blob) as data FROM 
non_blob_table").collect()
       })
       val cause = if (thrown.isInstanceOf[IllegalArgumentException]) thrown 
else thrown.getCause
       assertTrue(cause.isInstanceOf[IllegalArgumentException])
       assertTrue(cause.getMessage.contains("must be compatible with BlobType"))
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala`
   around lines 402 - 406, The test's assertion on the exception type is brittle
   because Spark may wrap the IllegalArgumentException in a SparkException; 
update
   the check around the "thrown" from assertThrows in TestReadBlobSQL.scala to
   mirror the other test's pattern by inspecting both the thrown exception and 
its
   cause: if thrown is a SparkException, assert that thrown.getCause is an 
instance
   of IllegalArgumentException, otherwise assert thrown is an instance of
   IllegalArgumentException (use the SparkException and IllegalArgumentException
   class names and the "thrown" variable from this test to locate where to 
change
   the assertions).
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:ocelot:c8bb2295-5168-440e-b6fe-d1f9c8bd0940 -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096074)) 
(source:comment#3042096074)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to