yihua commented on code in PR #18098: URL: https://github.com/apache/hudi/pull/18098#discussion_r3042097433
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReaderStrategy.scala: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.HoodieSparkConfUtils +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.conf Review Comment: _โ ๏ธ Potential issue_ | _๐ก Minor_ **Remove unused import `SimpleAnalyzer.conf`.** This import is not used anywhere in the file and should be removed. <details> <summary>๐งน Proposed fix</summary> ```diff -import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.conf ``` </details> <!-- suggestion_start --> <details> <summary>๐ Committable suggestion</summary> > โผ๏ธ **IMPORTANT** > Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements. ```suggestion ``` </details> <!-- suggestion_end --> <details> <summary>๐ค Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReaderStrategy.scala` at line 26, Remove the unused import SimpleAnalyzer.conf from BatchedBlobReaderStrategy.scala; locate the import statement "import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.conf" in the BatchedBlobReaderStrategy file and delete that import line so the file no longer references the unused SimpleAnalyzer.conf symbol. ``` </details> <!-- fingerprinting:phantom:poseidon:ocelot:c5d9f041-9071-4084-acbf-7a4c5f4d4e98 --> <!-- This is an auto-generated comment by CodeRabbit --> โ *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096047)) (source:comment#3042096047) ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, ExprId, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{DataType, StructType} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +/** + * Transforms queries with `read_blob()` to use lazy batched I/O. + * + * Replaces [[ReadBlobExpression]] markers with [[BatchedBlobRead]] nodes + * that read blob data during physical execution. + * + * Example: `SELECT id, read_blob(image_data) FROM table` + * + * @param spark SparkSession for accessing configuration + */ +case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + case Filter(condition, child) + if containsReadBlobInExpression(condition) + && !child.isInstanceOf[BatchedBlobRead] => + + val blobColumns = extractBlobColumnsFromExpression(condition) + val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) + val newCondition = replaceReadBlobExpression(condition, blobToDataAttr) + Project(child.output, Filter(newCondition, wrappedPlan)) + + case Project(projectList, child) + if containsReadBlobExpression(projectList) + && !child.isInstanceOf[BatchedBlobRead] => + + val blobColumns = extractAllBlobColumns(projectList) + val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) + val newProjectList = transformNamedExpressions(projectList, blobToDataAttr) + Project(newProjectList, wrappedPlan) + } Review Comment: _โ ๏ธ Potential issue_ | _๐ Major_ **Avoid double blob reads when `read_blob()` appears in both WHERE and SELECT.** With current `Filter` + `Project` independent rewrites, Line 51 drops `dataAttr` via `Project(child.output, ...)`, and the parent `Project` rule wraps again. That can cause the same blob column to be fetched twice in one query. <details> <summary>โก Suggested rewrite shape to share one wrapped plan</summary> ```diff override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + case Project(projectList, Filter(condition, child)) + if containsReadBlobExpression(projectList) + && containsReadBlobInExpression(condition) + && !child.isInstanceOf[BatchedBlobRead] => + + val projectBlobCols = extractAllBlobColumns(projectList) + val filterBlobCols = extractBlobColumnsFromExpression(condition) + val blobColumns = (projectBlobCols ++ filterBlobCols) + .foldLeft((mutable.LinkedHashSet.empty[ExprId], ArrayBuffer.empty[AttributeReference])) { + case ((seen, acc), a) if seen.add(a.exprId) => (seen, acc += a) + case ((seen, acc), _) => (seen, acc) + }._2.toSeq + + val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) + val newCondition = replaceReadBlobExpression(condition, blobToDataAttr) + val newProjectList = transformNamedExpressions(projectList, blobToDataAttr) + Project(newProjectList, Filter(newCondition, wrappedPlan)) + case Filter(condition, child) if containsReadBlobInExpression(condition) && !child.isInstanceOf[BatchedBlobRead] => ``` </details> <!-- suggestion_start --> <details> <summary>๐ Committable suggestion</summary> > โผ๏ธ **IMPORTANT** > Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements. ```suggestion override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { case Project(projectList, Filter(condition, child)) if containsReadBlobExpression(projectList) && containsReadBlobInExpression(condition) && !child.isInstanceOf[BatchedBlobRead] => val projectBlobCols = extractAllBlobColumns(projectList) val filterBlobCols = extractBlobColumnsFromExpression(condition) val blobColumns = (projectBlobCols ++ filterBlobCols) .foldLeft((mutable.LinkedHashSet.empty[ExprId], ArrayBuffer.empty[AttributeReference])) { case ((seen, acc), a) if seen.add(a.exprId) => (seen, acc += a) case ((seen, acc), _) => (seen, acc) }._2.toSeq val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) val newCondition = replaceReadBlobExpression(condition, blobToDataAttr) val newProjectList = transformNamedExpressions(projectList, blobToDataAttr) Project(newProjectList, Filter(newCondition, wrappedPlan)) case Filter(condition, child) if containsReadBlobInExpression(condition) && !child.isInstanceOf[BatchedBlobRead] => val blobColumns = extractBlobColumnsFromExpression(condition) val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) val newCondition = replaceReadBlobExpression(condition, blobToDataAttr) Project(child.output, Filter(newCondition, wrappedPlan)) case Project(projectList, child) if containsReadBlobExpression(projectList) && !child.isInstanceOf[BatchedBlobRead] => val blobColumns = extractAllBlobColumns(projectList) val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) val newProjectList = transformNamedExpressions(projectList, blobToDataAttr) Project(newProjectList, wrappedPlan) } ``` </details> <!-- suggestion_end --> <details> <summary>๐ค Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala` around lines 43 - 61, The current two independent rewrite cases can cause duplicate blob reads when read_blob() appears in both WHERE and SELECT; add a combined case to match Project(projectList, Filter(condition, child)) (or Filter inside Project) before the separate cases, compute the union of blob columns using extractBlobColumnsFromExpression(condition) and extractAllBlobColumns(projectList), call wrapWithBlobReads once to produce (wrappedPlan, blobToDataAttr), then build a single nested plan: Project(transformNamedExpressions(projectList, blobToDataAttr), Filter(replaceReadBlobExpression(condition, blobToDataAttr), wrappedPlan)); keep the existing individual cases as fallbacks. ``` </details> <!-- fingerprinting:phantom:poseidon:hawk:ee0423cb-9f35-47a0-83cc-67a4d6291cb7 --> <!-- This is an auto-generated comment by CodeRabbit --> โ *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096051)) (source:comment#3042096051) ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} + +/** + * Registry of scalar functions for Hudi SQL integration. + * + * These functions are registered as built-in functions that can be used + * in SQL queries. They integrate with Spark's function registry and are + * available in both SQL and DataFrame API contexts. + * + * <h3>Function Registration:</h3> + * Functions are registered via [[SparkAdapter.injectScalarFunctions]] which is + * called during [[HoodieSparkSessionExtension]] initialization. + * + * <h3>Adding New Functions:</h3> + * To add a new scalar function: + * <ol> + * <li>Create a marker expression class (extends Unevaluable)</li> + * <li>Add function definition tuple to [[funcs]] below</li> + * <li>Create a logical plan rule to handle the expression</li> + * <li>Register the rule in [[HoodieAnalysis.customPostHocResolutionRules]]</li> + * </ol> + */ +object ScalarFunctions { + + private val READ_BLOB_FUNC_NAME = "read_blob" + + /** + * Function definitions as tuples of: + * <ul> + * <li>FunctionIdentifier - function name</li> + * <li>ExpressionInfo - metadata for DESCRIBE FUNCTION</li> + * <li>Builder function - (Seq[Expression] => Expression)</li> + * </ul> + */ + val funcs: Seq[(FunctionIdentifier, ExpressionInfo, Seq[Expression] => Expression)] = Seq( + ( + FunctionIdentifier(READ_BLOB_FUNC_NAME), + new ExpressionInfo( + classOf[ReadBlobExpression].getCanonicalName, + READ_BLOB_FUNC_NAME, + """ + |Usage: read_blob(blob_column) - Reads blob data from storage + | + |Reads byte ranges from files referenced in a blob column. The column must have + |metadata hudi_blob=true. Review Comment: _โ ๏ธ Potential issue_ | _๐ก Minor_ **Function docs overstate metadata requirement.** Line 66-67 states `hudi_blob=true` is required, but current rewrite/validation flow is based on BlobType structural compatibility. This can mislead users about what actually fails. <details> <summary>๐ Proposed doc-only fix</summary> ```diff - |Reads byte ranges from files referenced in a blob column. The column must have - |metadata hudi_blob=true. + |Reads byte ranges from files referenced in a blob-compatible column. + |The column must be structurally compatible with HoodieSchema.Blob. ``` </details> <!-- suggestion_start --> <details> <summary>๐ Committable suggestion</summary> > โผ๏ธ **IMPORTANT** > Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements. ```suggestion |Usage: read_blob(blob_column) - Reads blob data from storage | |Reads byte ranges from files referenced in a blob-compatible column. |The column must be structurally compatible with HoodieSchema.Blob. ``` </details> <!-- suggestion_end --> <details> <summary>๐ค Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala` around lines 64 - 67, The README-style docstring for read_blob in ScalarFunctions.scala incorrectly asserts that the column must have metadata hudi_blob=true; update that documentation to state that read_blob requires a column whose type is structurally compatible with BlobType (the validation/rewrite checks BlobType compatibility rather than a specific metadata flag). Edit the usage/comment block near the read_blob function in ScalarFunctions.scala to remove or replace the `hudi_blob=true` claim with a note that the column must be a BlobType-compatible column and mention that structural type validation is performed during rewrite/validation. ``` </details> <!-- fingerprinting:phantom:poseidon:hawk:ee0423cb-9f35-47a0-83cc-67a4d6291cb7 --> <!-- This is an auto-generated comment by CodeRabbit --> โ *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096059)) (source:comment#3042096059) ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBatchedBlobReader.scala: ########## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.blob + +import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.common.schema.HoodieSchema +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hudi.blob.BatchedBlobReader +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import java.util.Collections + +/** + * Tests for BatchedByteRangeReader. + * + * These tests verify the batching behavior and effectiveness of the + * BatchedByteRangeReader compared to non-batched approaches. + */ +class TestBatchedBlobReader extends HoodieClientTestBase { + + @Test + def testBasicBatchedRead(): Unit = { + val filePath = createTestFile(tempDir, "basic.bin", 10000) + + // Create input with struct column + val inputDF = sparkSession.createDataFrame(Seq( + (filePath, 0L, 100L), + (filePath, 100L, 100L), + (filePath, 200L, 100L) + )).toDF("external_path", "offset", "length") + .withColumn("data", blobStructCol("data", col("external_path"), col("offset"), col("length"))) + .select("data") + + // Read with batching + val resultDF = BatchedBlobReader.readBatched(inputDF, storageConf) + + // Verify schema + assertTrue(resultDF.columns.contains("data")) + assertEquals(1, resultDF.columns.length) // data + + // Verify results + val results = resultDF.collect() + assertEquals(3, results.length) + + // Check data content + results.zipWithIndex.foreach { case (row, i) => + val data = row.getAs[Array[Byte]]("data") + assertEquals(100, data.length) + + // Verify content matches expected pattern + assertBytesContent(data, expectedOffset = i * 100) + } Review Comment: _โ ๏ธ Potential issue_ | _๐ก Minor_ **Make this assertion order-deterministic to avoid flaky tests.** `collect()` + `zipWithIndex` assumes stable row ordering, which is not guaranteed. This can intermittently fail the expected-offset checks. <details> <summary>โ Deterministic test adjustment</summary> ```diff - val inputDF = sparkSession.createDataFrame(Seq( + val inputDF = sparkSession.createDataFrame(Seq( (filePath, 0L, 100L), (filePath, 100L, 100L), (filePath, 200L, 100L) )).toDF("external_path", "offset", "length") .withColumn("data", blobStructCol("data", col("external_path"), col("offset"), col("length"))) - .select("data") + .select("offset", "data") @@ - val results = resultDF.collect() + val results = resultDF.orderBy("offset").collect() @@ - assertBytesContent(data, expectedOffset = i * 100) + assertBytesContent(data, expectedOffset = i * 100) ``` </details> <details> <summary>๐ค Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestBatchedBlobReader.scala` around lines 64 - 74, The test assumes deterministic row order after resultDF.collect(); make it deterministic by ordering the DataFrame before collecting (e.g., use resultDF.orderBy("offset").collect()) or, if there is no offset column, sort the collected rows in-memory by a stable key (e.g., results.sortBy(r => r.getAs[String]("fileId"))) before zipping; then run the assertBytesContent checks against the sorted sequence so assertBytesContent(data, expectedOffset = i * 100) always maps to the intended row. Ensure you modify the code that builds "results" (the resultDF -> results path) and keep the assertBytesContent usage unchanged. ``` </details> <!-- fingerprinting:phantom:poseidon:hawk:ee0423cb-9f35-47a0-83cc-67a4d6291cb7 --> <!-- This is an auto-generated comment by CodeRabbit --> โ *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096065)) (source:comment#3042096065) ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala: ########## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.HoodieSparkUtils.sparkAdapter +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StorageConfiguration, StoragePath} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, SpecificInternalRow} +import org.apache.spark.sql.types.{BinaryType, BlobType, DataType, StructField, StructType} +import org.slf4j.LoggerFactory + +import java.io.InputStream + +import scala.collection.mutable.ArrayBuffer + +/** + * Batched byte range reader that optimizes I/O by combining consecutive reads for out-of-line data. + * + * This reader analyzes sequences of read requests within a partition and merges + * consecutive or nearby reads into single I/O operations. This significantly reduces + * the number of seeks and reads when processing sorted data. + * + * <h3>Schema Requirement:</h3> + * The blob column must match the schema defined in {@link HoodieSchema.Blob}: + * <pre> + * struct { + * type: string // "inline" or "out_of_line" + * data: binary (nullable) // inline data (null for out_of_line) + * reference: struct (nullable) { // file reference (null for inline) + * external_path: string + * offset: long + * length: long + * managed: boolean + * } + * } + * </pre> + * + * <h3>Key Features:</h3> + * <ul> + * <li>Batches consecutive reads from the same file</li> + * <li>Configurable gap threshold for merging nearby reads</li> + * <li>Lookahead buffer to identify batch opportunities</li> + * <li>Preserves input row order in output</li> + * </ul> + * + * <h3>Usage Example:</h3> + * {{{ + * import org.apache.hudi.udf.BatchedByteRangeReader + * import org.apache.spark.sql.functions._ + * // Read a table with a blob column (e.g. image_data) + * val df = spark.read.format("hudi").load("/my_path").select("image_data", "record_id") + * + * // Read with batching (best when data is sorted by external_path, offset) + * val result = BatchedByteRangeReader.readBatched(df, structColName = "file_info") + * + * // Result has: image_data, record_id, data + * result.show() + * }}} + * + * <h3>Performance Tips:</h3> + * <ul> + * <li>Sort input by (blob.reference.external_path, blob.reference.offset) for maximum batching effectiveness</li> + * <li>Increase lookaheadSize for better batch detection (at cost of memory)</li> + * <li>Tune maxGapBytes based on your data access patterns</li> + * </ul> + * + * @param storage HoodieStorage instance for file I/O + * @param maxGapBytes Maximum gap between ranges to consider for batching (default: 4KB) + * @param lookaheadSize Number of rows to buffer for batch detection (default: 50) + */ +class BatchedBlobReader( + storage: HoodieStorage, + maxGapBytes: Int = 4096, + lookaheadSize: Int = 50) { + + private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader]) + + /** + * Process a partition iterator, batching consecutive reads. + * + * This method consumes the input iterator and produces an output iterator + * with each row containing the original data plus a "data" column with the + * bytes read from the file. + * + * @param rows Iterator of input rows with struct column + * @param structColIdx Index of the struct column in the row + * @param outputSchema Schema for output rows + * @param accessor Type class for accessing row fields + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Iterator of output rows with data column added + */ + def processPartition[R]( + rows: Iterator[R], + structColIdx: Int, + outputSchema: StructType) + (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] = { + + // Create buffered iterator for lookahead + val bufferedRows = rows.buffered + + // Result buffer to maintain order + val resultIterator = new Iterator[R] { + private var currentBatch: Iterator[R] = Iterator.empty + private var rowIndex = 0L + + override def hasNext: Boolean = { + if (currentBatch.hasNext) { + true + } else if (bufferedRows.hasNext) { + // Process next batch + currentBatch = processNextBatch() + currentBatch.hasNext + } else { + false + } + } + + override def next(): R = { + if (!hasNext) { + throw new NoSuchElementException("No more rows") + } + currentBatch.next() + } + + /** + * Collect and process the next batch of rows. + */ + private def processNextBatch(): Iterator[R] = { + // Collect up to lookaheadSize rows with their original indices + val batch = collectBatch() + + if (batch.isEmpty) { + Iterator.empty + } else { + // Partition the batch into three groups + val (inlineRows, outOfLineRows) = batch.partition(_.inlineBytes.isDefined) + val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 0) + + // Case 1: Inline โ return bytes directly without I/O + val inlineResults = inlineRows.map { ri => + RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, outputSchema), ri.index) + } + + // Case 2: Whole-file reads + val wholeFileResults = wholeFileRows.map(readWholeFile(_, outputSchema)) + + // Case 3: Regular range reads โ merge consecutive ranges and batch + val mergedRanges = identifyConsecutiveRanges(rangeRows) + val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, outputSchema)) + + // Sort by original index to preserve input order + (inlineResults ++ wholeFileResults ++ rangeResults).sortBy(_.index).map(_.row).iterator + } + } + + /** + * Collect up to lookaheadSize rows from the input iterator. + */ + private def collectBatch(): Seq[RowInfo[R]] = { + val batch = ArrayBuffer[RowInfo[R]]() + var collected = 0 + + while (bufferedRows.hasNext && collected < lookaheadSize) { + val row = bufferedRows.next() + // Handle null struct column (null blob) + if (accessor.isNullAt(row, structColIdx)) { + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(null) + ) + rowIndex += 1 + collected += 1 + } else { + val blobStruct = accessor.getStruct(row, structColIdx, HoodieSchema.Blob.getFieldCount) + // Dispatch based on storage_type (field 0) + val storageType = accessor.getString(blobStruct, 0) + if (storageType == HoodieSchema.Blob.INLINE) { + // Case 1: Inline โ bytes are in field 1 + val bytes = accessor.getBytes(blobStruct, 1) + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(bytes) + ) + } else { + // Case 2 or 3: Out-of-line โ get reference struct (field 2) + val referenceStruct = accessor.getStruct(blobStruct, 2, HoodieSchema.Blob.getReferenceFieldCount) + val filePath = accessor.getString(referenceStruct, 0) + val offsetIsNull = accessor.isNullAt(referenceStruct, 1) + val lengthIsNull = accessor.isNullAt(referenceStruct, 2) + if (offsetIsNull || lengthIsNull) { + // Case 2: Whole-file read โ no offset/length specified; sentinel length = -1 + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = 0, + length = -1, + index = rowIndex + ) + } else { + // Case 3: Regular range read + val offset = accessor.getLong(referenceStruct, 1) + val length = accessor.getLong(referenceStruct, 2) + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = offset, + length = length, + index = rowIndex + ) + } + } + rowIndex += 1 + collected += 1 + } + } + batch.toSeq + } + } + + resultIterator + } + + /** + * Identify consecutive ranges that can be batched together. + * + * This method groups rows by file path, sorts by offset, and merges + * ranges that are consecutive or within maxGapBytes of each other. + * + * @param rows Sequence of row information + * @return Sequence of merged ranges + */ + private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + // Group by file path + val byFile = rows.groupBy(_.filePath) + + val allRanges = ArrayBuffer[MergedRange[R]]() + + byFile.foreach { case (filePath, fileRows) => + // Sort by offset + val sorted = fileRows.sortBy(_.offset) + + // Merge consecutive ranges + val merged = mergeRanges(sorted, maxGapBytes) + allRanges ++= merged + } + + allRanges.toSeq + } + + /** + * Merge consecutive ranges within the gap threshold. + * + * @param rows Sorted rows from the same file + * @param maxGap Maximum gap to consider for merging + * @return Sequence of merged ranges + */ + private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + + val result = ArrayBuffer[MergedRange[R]]() + var current: MergedRange[R] = null + + rows.foreach { row => + if (current == null) { + // Start first range + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } else { + val gap = row.offset - current.endOffset + + if (gap >= 0 && gap <= maxGap) { + // Merge into current range + current = current.merge(row) + } else { + // Save current range and start new one + result += current + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } + } + } + + // Add final range + if (current != null) { + result += current + } + + result.toSeq + } + + /** + * Read an entire file and return it as a single row result. + * + * Used for whole-file out-of-line blobs where no offset or length is specified. + * + * @param rowInfo Row information with the file path + * @param outputSchema Schema for output rows + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Sequence containing a single row result + */ + private def readWholeFile[R]( + rowInfo: RowInfo[R], + outputSchema: StructType) + (implicit builder: RowBuilder[R]): RowResult[R] = { + + var inputStream: InputStream = null + try { + val path = new StoragePath(rowInfo.filePath) + inputStream = storage.open(path) + val buffer = inputStream.readAllBytes() + + logger.debug(s"Read entire file ${rowInfo.filePath} (${buffer.length} bytes)") + + RowResult(builder.buildRow(rowInfo.originalRow, buffer, outputSchema), rowInfo.index) + } finally { + if (inputStream != null) { + try { + inputStream.close() + } catch { + case e: Exception => + logger.warn(s"Error closing stream for ${rowInfo.filePath}", e) + } + } + } + } + + /** + * Read a merged range and split it back into individual row results. + * + * This method performs a single I/O operation to read the entire merged + * range, then splits the buffer into individual results for each original + * row. + * + * @param range The merged range to read + * @param outputSchema Schema for output rows + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Sequence of row results with original indices + */ + private def readAndSplitRange[R]( + range: MergedRange[R], + outputSchema: StructType) + (implicit builder: RowBuilder[R]): Seq[RowResult[R]] = { + + var inputStream: SeekableDataInputStream = null + try { + // Get or open file handle + inputStream = storage.openSeekable(new StoragePath(range.filePath), false) + + // Seek to start offset + inputStream.seek(range.startOffset) + + // Read the entire merged range + val totalLength = (range.endOffset - range.startOffset).toInt + val buffer = new Array[Byte](totalLength) + inputStream.readFully(buffer, 0, totalLength) Review Comment: _โ ๏ธ Potential issue_ | _๐ก Minor_ **Potential integer overflow for large blobs exceeding 2GB.** `(range.endOffset - range.startOffset).toInt` will silently overflow for ranges larger than `Int.MaxValue` (~2GB). While unlikely for typical blob sizes, this could cause incorrect reads or `NegativeArraySizeException` for very large files. Consider adding a bounds check: <details> <summary>Proposed fix</summary> ```diff // Read the entire merged range val totalLength = (range.endOffset - range.startOffset).toInt + require(totalLength >= 0, s"Range too large: ${range.endOffset - range.startOffset} bytes exceeds Int.MaxValue") val buffer = new Array[Byte](totalLength) ``` </details> <!-- suggestion_start --> <details> <summary>๐ Committable suggestion</summary> > โผ๏ธ **IMPORTANT** > Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements. ```suggestion val totalLength = (range.endOffset - range.startOffset).toInt require(totalLength >= 0, s"Range too large: ${range.endOffset - range.startOffset} bytes exceeds Int.MaxValue") val buffer = new Array[Byte](totalLength) inputStream.readFully(buffer, 0, totalLength) ``` </details> <!-- suggestion_end --> <details> <summary>๐ค Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala` around lines 393 - 395, The code computes totalLength as (range.endOffset - range.startOffset).toInt which can overflow for lengths > Int.MaxValue; update BatchedBlobReader to compute the length in a Long (e.g., val totalLengthLong = range.endOffset - range.startOffset), check if totalLengthLong > Int.MaxValue and either throw a clear exception (e.g., IllegalArgumentException with the offending range and length) or implement chunked reads, and then allocate the buffer using a safe Int length (or loop reading into the buffer in chunks) before calling inputStream.readFully; ensure you update references to totalLength, buffer allocation, and inputStream.readFully accordingly. ``` </details> <!-- fingerprinting:phantom:medusa:ocelot:c8bb2295-5168-440e-b6fe-d1f9c8bd0940 --> <!-- This is an auto-generated comment by CodeRabbit --> โ *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096037)) (source:comment#3042096037) ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala: ########## @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.blob + +import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.exception.HoodieIOException +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import java.util.Collections + +/** + * Tests for the read_blob() SQL function. + * + * This test suite verifies: + * <ul> + * <li>Basic SQL integration with read_blob()</li> + * <li>Integration with WHERE clauses, JOINs</li> + * <li>Configuration parameter handling</li> + * <li>Error handling for invalid inputs</li> + * </ul> + */ +class TestReadBlobSQL extends HoodieClientTestBase { Review Comment: _โ ๏ธ Potential issue_ | _๐ด Critical_ <details> <summary>๐งฉ Analysis chain</summary> ๐ Script executed: ```shell #!/bin/bash # Verify that HoodieClientTestBase does not override getSparkSessionExtensionsInjector ast-grep --pattern $'class HoodieClientTestBase { $$$ getSparkSessionExtensionsInjector($$$) { $$$ } $$$ }' # Check if the parent harness returns Option.empty() rg -n "getSparkSessionExtensionsInjector" --type java -A 3 ``` Repository: yihua/hudi Length of output: 1831 --- ๐ Script executed: ```shell # Find HoodieClientTestBase definition fd -type f -name "*.scala" -o -name "*.java" | xargs grep -l "class HoodieClientTestBase" # Check TestReadBlobSQL class hierarchy cat -n hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala | head -50 ``` Repository: yihua/hudi Length of output: 2262 --- ๐ Script executed: ```shell # Find HoodieClientTestBase in testutils package fd "HoodieClientTestBase" hudi-spark-datasource/hudi-spark/src/test/ # Also check in broader testutils fd "HoodieClientTestBase" | grep -E "\.(scala|java)$" ``` Repository: yihua/hudi Length of output: 149 --- ๐ Script executed: ```shell # Read HoodieClientTestBase to understand its implementation and parent cat -n hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java ``` Repository: yihua/hudi Length of output: 48365 --- ๐ Script executed: ```shell # Find HoodieSparkSessionExtension and check if it registers read_blob fd -type f "HoodieSparkSessionExtension" | grep -E "\.(scala|java)$" ``` Repository: yihua/hudi Length of output: 224 --- ๐ Script executed: ```shell # Find HoodieSparkSessionExtension find . -name "*HoodieSparkSessionExtension*" ``` Repository: yihua/hudi Length of output: 163 --- ๐ Script executed: ```shell # Read HoodieSparkSessionExtension to see what functions it registers cat -n hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala ``` Repository: yihua/hudi Length of output: 2610 --- ๐ Script executed: ```shell # Read TestReadBlobSQL to see what SQL functions it actually calls cat -n hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala | head -200 ``` Repository: yihua/hudi Length of output: 8075 --- ๐ Script executed: ```shell # Check how sparkSessionExtensionsInjector is used in HoodieSparkClientTestHarness rg -n "sparkSessionExtensionsInjector\|getSparkSessionExtensionsInjector" hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java -A 5 -B 2 ``` Repository: yihua/hudi Length of output: 36 --- ๐ Script executed: ```shell # Read the relevant section of HoodieSparkClientTestHarness around extension injection sed -n '170,210p' hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java ``` Repository: yihua/hudi Length of output: 1732 --- </details> **Tests will fail: `read_blob()` function is not registered.** The test class extends `HoodieClientTestBase`, which extends `HoodieSparkClientTestHarness` (the parent harness's `getSparkSessionExtensionsInjector()` returns `Option.empty()` by default). The `read_blob()` SQL function is registered via `HoodieSparkSessionExtension`, so all SQL queries using `read_blob()` will fail with an "undefined function" error. Override `getSparkSessionExtensionsInjector()` to inject the extension: <details> <summary>Proposed fix</summary> ```diff class TestReadBlobSQL extends HoodieClientTestBase { + + override protected def getSparkSessionExtensionsInjector: Option[Consumer[SparkSessionExtensions]] = { + Option.apply(new Consumer[SparkSessionExtensions] { + override def accept(extensions: SparkSessionExtensions): Unit = { + new HoodieSparkSessionExtension().apply(extensions) + } + }) + } ``` </details> <!-- suggestion_start --> <details> <summary>๐ Committable suggestion</summary> > โผ๏ธ **IMPORTANT** > Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements. ```suggestion class TestReadBlobSQL extends HoodieClientTestBase { override protected def getSparkSessionExtensionsInjector: Option[Consumer[SparkSessionExtensions]] = { Option.apply(new Consumer[SparkSessionExtensions] { override def accept(extensions: SparkSessionExtensions): Unit = { new HoodieSparkSessionExtension().apply(extensions) } }) } ``` </details> <!-- suggestion_end --> <details> <summary>๐ค Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala` at line 45, The test class TestReadBlobSQL fails because read_blob() isn't registered; override getSparkSessionExtensionsInjector() in TestReadBlobSQL (inheriting from HoodieClientTestBase/HoodieSparkClientTestHarness) to return a non-empty Option that injects HoodieSparkSessionExtension so the read_blob SQL function is registered before tests run; implement the override method to return Some(sparkSessionExtensions => sparkSessionExtensions.injectExtension(new HoodieSparkSessionExtension())) or equivalent to register the extension. ``` </details> <!-- fingerprinting:phantom:medusa:ocelot:c8bb2295-5168-440e-b6fe-d1f9c8bd0940 --> <!-- This is an auto-generated comment by CodeRabbit --> โ *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096068)) (source:comment#3042096068) ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala: ########## @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.blob + +import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.exception.HoodieIOException +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import java.util.Collections + +/** + * Tests for the read_blob() SQL function. + * + * This test suite verifies: + * <ul> + * <li>Basic SQL integration with read_blob()</li> + * <li>Integration with WHERE clauses, JOINs</li> + * <li>Configuration parameter handling</li> + * <li>Error handling for invalid inputs</li> + * </ul> + */ +class TestReadBlobSQL extends HoodieClientTestBase { + + @Test + def testBasicReadBlobSQL(): Unit = { + val filePath = createTestFile(tempDir, "basic.bin", 10000) + + // Main DataFrame with blobStructCol + val df = sparkSession.createDataFrame(Seq( + (1, "record1", filePath, 0L, 100L), + (3, "record3", filePath, 100L, 100L), + (4, "record4", filePath, 200L, 100L) + )).toDF("id", "name", "external_path", "offset", "length") + .withColumn("file_info", blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "name", "file_info") + + // Ensure file_info is nullable in the schema + val schema = StructType(df.schema.map { + case StructField("file_info", dt, _, md) => StructField("file_info", dt, nullable = true, md) + case other => other + }) + val dfWithNullable = sparkSession.createDataFrame(df.rdd, schema) + + // DataFrame with a null blob value + val nullRow = Row(2, "record2", null) + val nullDf = sparkSession.createDataFrame(Collections.singletonList(nullRow), schema) + + // Union the null row + val fullDf = dfWithNullable.unionByName(nullDf) + fullDf.createOrReplaceTempView("test_table") + + // Use SQL with read_blob + val result = sparkSession.sql(""" + SELECT id, name, read_blob(file_info) as data + FROM test_table + WHERE id <= 3 + ORDER BY id + """) + + val rows = result.collect() + assertEquals(3, rows.length) + + // Verify data is binary for non-null rows + val data1 = rows(0).getAs[Array[Byte]]("data") + assertEquals(100, data1.length) + assertBytesContent(data1) + + // The null_blob row should have null data + assertTrue(rows(1).isNullAt(2)) + + val data3 = rows(2).getAs[Array[Byte]]("data") + assertEquals(100, data3.length) + assertBytesContent(data3, expectedOffset = 100) + } + + @Test + def testReadBlobWithJoin(): Unit = { + val filePath = createTestFile(tempDir, "join.bin", 10000) + + // Create blob table + val blobDF = sparkSession.createDataFrame(Seq( + (1, filePath, 0L, 100L), + (2, filePath, 100L, 100L) + )).toDF("id", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "file_info") + + blobDF.createOrReplaceTempView("blob_table") + + // Create metadata table + val metaDF = sparkSession.createDataFrame(Seq( + (1, "Alice"), + (2, "Bob") + )).toDF("id", "name") + + metaDF.createOrReplaceTempView("meta_table") + + // SQL with JOIN + val result = sparkSession.sql(""" + SELECT m.id, m.name, read_blob(b.file_info) as data + FROM meta_table m + JOIN blob_table b ON m.id = b.id + ORDER BY m.id + """) + + val rows = result.collect() + assertEquals(2, rows.length) + assertEquals("Alice", rows(0).getAs[String]("name")) + assertEquals(100, rows(0).getAs[Array[Byte]]("data").length) + assertEquals("Bob", rows(1).getAs[String]("name")) + assertEquals(100, rows(1).getAs[Array[Byte]]("data").length) + + // Verify data content + val data1 = rows(0).getAs[Array[Byte]]("data") + assertBytesContent(data1) + } + + @Test + def testReadBlobInSubquery(): Unit = { + val filePath = createTestFile(tempDir, "subquery.bin", 10000) + + val df = sparkSession.createDataFrame(Seq( + (1, "A", filePath, 0L, 100L), + (2, "A", filePath, 100L, 100L), + (3, "B", filePath, 200L, 100L) + )).toDF("id", "category", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "category", "file_info") + + df.createOrReplaceTempView("subquery_table") + + // SQL with subquery + val result = sparkSession.sql(""" + SELECT * FROM ( + SELECT id, category, read_blob(file_info) as data + FROM subquery_table + ) WHERE category = 'A' + """) + + val rows = result.collect() + assertEquals(2, rows.length) + rows.foreach { row => + assertEquals("A", row.getAs[String]("category")) + assertEquals(100, row.getAs[Array[Byte]]("data").length) + } + } + + @Test + def testConfigurationParameters(): Unit = { + val filePath = createTestFile(tempDir, "config.bin", 50000) + + val df = sparkSession.createDataFrame(Seq( + (1, filePath, 0L, 100L), + (2, filePath, 5000L, 100L), // 4.9KB gap + (3, filePath, 10000L, 100L) + )).toDF("id", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "file_info") + + df.createOrReplaceTempView("config_table") + + // Use withSparkConfig to automatically manage configuration + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "10000", + "hoodie.blob.batching.lookahead.size" -> "100" + )) { + val result = sparkSession.sql(""" + SELECT id, read_blob(file_info) as data + FROM config_table + """) + + val rows = result.collect() + assertEquals(3, rows.length) + + // Verify all reads completed successfully + rows.foreach { row => + assertEquals(100, row.getAs[Array[Byte]]("data").length) + } + } + } + + @Test + def testMultipleReadBlobInSameQuery(): Unit = { + val filePath1 = createTestFile(tempDir, "multi1.bin", 10000) + val filePath2 = createTestFile(tempDir, "multi2.bin", 10000) + + val df = sparkSession.createDataFrame(Seq( + (1, filePath1, 0L, 50L, filePath2, 500L, 50L), + (2, filePath1, 100L, 50L, filePath2, 600L, 50L) + )).toDF("id", "external_path1", "offset1", "length1", "external_path2", "offset2", "length2") + .withColumn("file_info1", + blobStructCol("file_info1", col("external_path1"), col("offset1"), col("length1"))) + .withColumn("file_info2", + blobStructCol("file_info2", col("external_path2"), col("offset2"), col("length2"))) + .select("id", "file_info1", "file_info2") + + df.createOrReplaceTempView("multi_table") + + // SQL with multiple read_blob calls + val result = sparkSession.sql(""" + SELECT + id, + read_blob(file_info1) as data1, + read_blob(file_info2) as data2 + FROM multi_table + """) + + val rows = result.collect() + assertEquals(2, rows.length) + + // Row 1: data1 = file1 at offset 0, data2 = file2 at offset 500 + val data1_row1 = rows(0).getAs[Array[Byte]]("data1") + val data2_row1 = rows(0).getAs[Array[Byte]]("data2") + assertEquals(50, data1_row1.length) + assertEquals(50, data2_row1.length) + assertBytesContent(data1_row1, expectedOffset = 0) + assertBytesContent(data2_row1, expectedOffset = 500) + + // Row 2: data1 = file1 at offset 100, data2 = file2 at offset 600 + val data1_row2 = rows(1).getAs[Array[Byte]]("data1") + val data2_row2 = rows(1).getAs[Array[Byte]]("data2") + assertBytesContent(data1_row2, expectedOffset = 100) + assertBytesContent(data2_row2, expectedOffset = 600) + } + + @Test + def testReadBlobWithEmptyResult(): Unit = { + val filePath = createTestFile(tempDir, "empty.bin", 10000) + + val df = sparkSession.createDataFrame(Seq( + (1, filePath, 0L, 100L), + (2, filePath, 100L, 100L) + )).toDF("id", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "file_info") + + df.createOrReplaceTempView("empty_table") + + // SQL that returns no rows + val result = sparkSession.sql(""" + SELECT id, read_blob(file_info) as data + FROM empty_table + WHERE id > 100 + """) + + val rows = result.collect() + assertEquals(0, rows.length) + } + + @Test + def testReadBlobMultipleFiles(): Unit = { + val filePath1 = createTestFile(tempDir, "file1.bin", 10000) + val filePath2 = createTestFile(tempDir, "file2.bin", 10000) + + val df = sparkSession.createDataFrame(Seq( + (1, filePath1, 0L, 100L), + (2, filePath2, 0L, 100L), + (3, filePath1, 100L, 100L), + (4, filePath2, 100L, 100L) + )).toDF("id", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "file_info") + + df.createOrReplaceTempView("multi_file_table") + + // SQL reading from multiple files + val result = sparkSession.sql(""" + SELECT id, read_blob(file_info) as data + FROM multi_file_table + ORDER BY id + """) + + val rows = result.collect() + assertEquals(4, rows.length) + + // Verify all data was read correctly + rows.foreach { row => + assertEquals(100, row.getAs[Array[Byte]]("data").length) + } + } + + @Test + def testReadBlobInWhereClause(): Unit = { + val filePath = createTestFile(tempDir, "where.bin", 10000) + val df = sparkSession.createDataFrame(Seq( + (1, filePath, 0L, 50L), // 50 bytes โ filtered out + (2, filePath, 100L, 100L), // 100 bytes โ passes filter + (3, filePath, 200L, 200L) // 200 bytes โ passes filter + )).toDF("id", "external_path", "offset", "length") + .withColumn("file_info", blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "file_info") + df.createOrReplaceTempView("where_table") + + val result = sparkSession.sql(""" + SELECT id, read_blob(file_info) AS data + FROM where_table + WHERE length(read_blob(file_info)) > 50 + ORDER BY id + """) + + val rows = result.collect() + assertEquals(2, rows.length) + + // validate that rows with IDs 2 and 3 are returned + assertEquals(2, rows(0).getInt(0)) + assertEquals(3, rows(1).getInt(0)) + } + + @Test + def testReadBlobWithCaseWhen(): Unit = { + val filePath = createTestFile(tempDir, "case.bin", 10000) + + val df = sparkSession.createDataFrame(Seq( + (1, true, filePath, 0L, 100L), + (2, false, filePath, 100L, 100L), + (3, true, filePath, 200L, 100L) + )).toDF("id", "should_resolve", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "should_resolve", "file_info") + + df.createOrReplaceTempView("case_table") + + // SQL with CASE WHEN - note: this tests that the expression is handled + // even in conditional contexts + val result = sparkSession.sql(""" + SELECT + id, + should_resolve, + CASE + WHEN should_resolve THEN read_blob(file_info) + ELSE NULL + END as data + FROM case_table + """) + + val rows = result.collect() + assertEquals(3, rows.length) + + // Row 1 should have data + assertTrue(rows(0).getAs[Boolean]("should_resolve")) + assertNotNull(rows(0).get(2)) + + // Row 2 should have null + assertFalse(rows(1).getAs[Boolean]("should_resolve")) + assertTrue(rows(1).isNullAt(2)) + + // Row 3 should have data + assertTrue(rows(2).getAs[Boolean]("should_resolve")) + assertNotNull(rows(2).get(2)) + } + + @Test + def testReadBlobWithMissingFile(): Unit = { + val missingPath = tempDir.resolve("does_not_exist.bin").toString + val df = sparkSession.createDataFrame(Seq( + (1, missingPath, 0L, 10L) + )).toDF("id", "external_path", "offset", "length") + .withColumn("file_info", blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "file_info") + df.createOrReplaceTempView("missing_file_table") + val thrown = assertThrows(classOf[Exception], () => { + sparkSession.sql("SELECT id, read_blob(file_info) as data FROM missing_file_table").collect() + }) + assertTrue(thrown.getCause.isInstanceOf[HoodieIOException]) + } + + @Test + def testReadBlobOnNonBlobColumn(): Unit = { + val df = sparkSession.createDataFrame(Seq( + (1, "not_a_blob") + )).toDF("id", "not_blob") + df.createOrReplaceTempView("non_blob_table") + val thrown = assertThrows(classOf[Exception], () => { + sparkSession.sql("SELECT id, read_blob(not_blob) as data FROM non_blob_table").collect() + }) + assertTrue(thrown.isInstanceOf[IllegalArgumentException]) + assertTrue(thrown.getMessage.contains("must be compatible with BlobType")) Review Comment: _โ ๏ธ Potential issue_ | _๐ก Minor_ **Inconsistent exception unwrapping pattern.** This test checks `thrown.isInstanceOf[IllegalArgumentException]` directly, while `testReadBlobWithMissingFile` (lines 390-393) checks `thrown.getCause.isInstanceOf[HoodieIOException]`. Spark typically wraps exceptions in `SparkException`, so if the `IllegalArgumentException` is wrapped, this assertion will fail. Use consistent exception checking: <details> <summary>Proposed fix</summary> ```diff val thrown = assertThrows(classOf[Exception], () => { sparkSession.sql("SELECT id, read_blob(not_blob) as data FROM non_blob_table").collect() }) - assertTrue(thrown.isInstanceOf[IllegalArgumentException]) - assertTrue(thrown.getMessage.contains("must be compatible with BlobType")) + val cause = if (thrown.isInstanceOf[IllegalArgumentException]) thrown else thrown.getCause + assertTrue(cause.isInstanceOf[IllegalArgumentException]) + assertTrue(cause.getMessage.contains("must be compatible with BlobType")) ``` </details> <!-- suggestion_start --> <details> <summary>๐ Committable suggestion</summary> > โผ๏ธ **IMPORTANT** > Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements. ```suggestion val thrown = assertThrows(classOf[Exception], () => { sparkSession.sql("SELECT id, read_blob(not_blob) as data FROM non_blob_table").collect() }) val cause = if (thrown.isInstanceOf[IllegalArgumentException]) thrown else thrown.getCause assertTrue(cause.isInstanceOf[IllegalArgumentException]) assertTrue(cause.getMessage.contains("must be compatible with BlobType")) ``` </details> <!-- suggestion_end --> <details> <summary>๐ค Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala` around lines 402 - 406, The test's assertion on the exception type is brittle because Spark may wrap the IllegalArgumentException in a SparkException; update the check around the "thrown" from assertThrows in TestReadBlobSQL.scala to mirror the other test's pattern by inspecting both the thrown exception and its cause: if thrown is a SparkException, assert that thrown.getCause is an instance of IllegalArgumentException, otherwise assert thrown is an instance of IllegalArgumentException (use the SparkException and IllegalArgumentException class names and the "thrown" variable from this test to locate where to change the assertions). ``` </details> <!-- fingerprinting:phantom:medusa:ocelot:c8bb2295-5168-440e-b6fe-d1f9c8bd0940 --> <!-- This is an auto-generated comment by CodeRabbit --> โ *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/23#discussion_r3042096074)) (source:comment#3042096074) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
