the-other-tim-brown commented on code in PR #18098: URL: https://github.com/apache/hudi/pull/18098#discussion_r2957165991
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala: ########## @@ -0,0 +1,732 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.HoodieSparkUtils.sparkAdapter +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StorageConfiguration, StoragePath} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Encoders, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, SpecificInternalRow} +import org.apache.spark.sql.types.{BinaryType, StructField, StructType} +import org.slf4j.LoggerFactory + +import scala.collection.mutable.ArrayBuffer + +/** + * Batched byte range reader that optimizes I/O by combining consecutive reads for out-of-line data. + * + * This reader analyzes sequences of read requests within a partition and merges + * consecutive or nearby reads into single I/O operations. This significantly reduces + * the number of seeks and reads when processing sorted data. + * + * <h3>Schema Requirement:</h3> + * The blob column must match the schema defined in {@link HoodieSchema.Blob}: + * <pre> + * struct { + * type: string // "inline" or "out_of_line" + * data: binary (nullable) // inline data (null for out_of_line) + * reference: struct (nullable) { // file reference (null for inline) + * external_path: string + * offset: long + * length: long + * managed: boolean + * } + * } + * </pre> + * + * <h3>Key Features:</h3> + * <ul> + * <li>Batches consecutive reads from the same file</li> + * <li>Configurable gap threshold for merging nearby reads</li> + * <li>Lookahead buffer to identify batch opportunities</li> + * <li>Preserves input row order in output</li> + * </ul> + * + * <h3>Usage Example:</h3> + * {{{ + * import org.apache.hudi.udf.BatchedByteRangeReader + * import org.apache.spark.sql.functions._ + * // Read a table with a blob column (e.g. image_data) + * val df = spark.read.format("hudi").load("/my_path").select("image_data", "record_id") + * + * // Read with batching (best when data is sorted by external_path, offset) + * val result = BatchedByteRangeReader.readBatched(df, structColName = "file_info") + * + * // Result has: image_data, record_id, data + * result.show() + * }}} + * + * <h3>Performance Tips:</h3> + * <ul> + * <li>Sort input by (blob.reference.external_path, blob.reference.offset) for maximum batching effectiveness</li> + * <li>Increase lookaheadSize for better batch detection (at cost of memory)</li> + * <li>Tune maxGapBytes based on your data access patterns</li> + * </ul> + * + * @param storage HoodieStorage instance for file I/O + * @param maxGapBytes Maximum gap between ranges to consider for batching (default: 4KB) + * @param lookaheadSize Number of rows to buffer for batch detection (default: 50) + */ +class BatchedBlobReader( + storage: HoodieStorage, + maxGapBytes: Int = 4096, + lookaheadSize: Int = 50) { + + private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader]) + + /** + * Process a partition iterator, batching consecutive reads. + * + * This method consumes the input iterator and produces an output iterator + * with each row containing the original data plus a "data" column with the + * bytes read from the file. + * + * @param rows Iterator of input rows with struct column + * @param structColIdx Index of the struct column in the row + * @param outputSchema Schema for output rows + * @param accessor Type class for accessing row fields + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Iterator of output rows with data column added + */ + def processPartition[R]( + rows: Iterator[R], + structColIdx: Int, + outputSchema: StructType) + (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] = { + + // Create buffered iterator for lookahead + val bufferedRows = rows.buffered + + // Result buffer to maintain order + val resultIterator = new Iterator[R] { + private var currentBatch: Iterator[R] = Iterator.empty + private var rowIndex = 0L + + override def hasNext: Boolean = { + if (currentBatch.hasNext) { + true + } else if (bufferedRows.hasNext) { + // Process next batch + currentBatch = processNextBatch() + currentBatch.hasNext + } else { + false + } + } + + override def next(): R = { + if (!hasNext) { + throw new NoSuchElementException("No more rows") + } + currentBatch.next() + } + + /** + * Collect and process the next batch of rows. + */ + private def processNextBatch(): Iterator[R] = { + // Collect up to lookaheadSize rows with their original indices + val batch = collectBatch() + + if (batch.isEmpty) { + Iterator.empty + } else { + // Partition the batch into three groups + val (inlineRows, outOfLineRows) = batch.partition(_.inlineBytes.isDefined) + val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 0) + + // Case 1: Inline — return bytes directly without I/O + val inlineResults = inlineRows.map { ri => + RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, outputSchema), ri.index) + } + + // Case 2: Whole-file reads + val wholeFileResults = wholeFileRows.map(readWholeFile(_, outputSchema)) + + // Case 3: Regular range reads — merge consecutive ranges and batch + val mergedRanges = identifyConsecutiveRanges(rangeRows) + val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, outputSchema)) + + // Sort by original index to preserve input order + (inlineResults ++ wholeFileResults ++ rangeResults).sortBy(_.index).map(_.row).iterator + } + } + + /** + * Collect up to lookaheadSize rows from the input iterator. + */ + private def collectBatch(): Seq[RowInfo[R]] = { + val batch = ArrayBuffer[RowInfo[R]]() + var collected = 0 + + while (bufferedRows.hasNext && collected < lookaheadSize) { + val row = bufferedRows.next() + val blobStruct = accessor.getStruct(row, structColIdx, HoodieSchema.Blob.getFieldCount) + + // Dispatch based on storage_type (field 0) + val storageType = accessor.getString(blobStruct, 0) + + if (storageType == HoodieSchema.Blob.INLINE) { + // Case 1: Inline — bytes are in field 1 + val bytes = accessor.getBytes(blobStruct, 1) + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(bytes) + ) + } else { + // Case 2 or 3: Out-of-line — get reference struct (field 2) + val referenceStruct = accessor.getStruct(blobStruct, 2, HoodieSchema.Blob.getReferenceFieldCount) + val filePath = accessor.getString(referenceStruct, 0) + val offsetIsNull = accessor.isNullAt(referenceStruct, 1) + val lengthIsNull = accessor.isNullAt(referenceStruct, 2) + + if (offsetIsNull || lengthIsNull) { + // Case 2: Whole-file read — no offset/length specified; sentinel length = -1 + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = 0, + length = -1, + index = rowIndex + ) + } else { + // Case 3: Regular range read + val offset = accessor.getLong(referenceStruct, 1) + val length = accessor.getLong(referenceStruct, 2) + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = offset, + length = length, + index = rowIndex + ) + } + } + + rowIndex += 1 + collected += 1 + } + + batch.toSeq + } + } + + resultIterator + } + + /** + * Identify consecutive ranges that can be batched together. + * + * This method groups rows by file path, sorts by offset, and merges + * ranges that are consecutive or within maxGapBytes of each other. + * + * @param rows Sequence of row information + * @return Sequence of merged ranges + */ + private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + // Group by file path + val byFile = rows.groupBy(_.filePath) + + val allRanges = ArrayBuffer[MergedRange[R]]() + + byFile.foreach { case (filePath, fileRows) => + // Sort by offset + val sorted = fileRows.sortBy(_.offset) + + // Merge consecutive ranges + val merged = mergeRanges(sorted, maxGapBytes) + allRanges ++= merged + } + + allRanges.toSeq + } + + /** + * Merge consecutive ranges within the gap threshold. + * + * @param rows Sorted rows from the same file + * @param maxGap Maximum gap to consider for merging + * @return Sequence of merged ranges + */ + private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + + val result = ArrayBuffer[MergedRange[R]]() + var current: MergedRange[R] = null + + rows.foreach { row => + if (current == null) { + // Start first range + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } else { + val gap = row.offset - current.endOffset + + if (gap >= 0 && gap <= maxGap) { + // Merge into current range + current = current.merge(row) + } else { + // Save current range and start new one + result += current + current = MergedRange[R]( + filePath = row.filePath, + startOffset = row.offset, + endOffset = row.offset + row.length, + rows = Seq(row) + ) + } + } + } + + // Add final range + if (current != null) { + result += current + } + + result.toSeq + } + + /** + * Read an entire file and return it as a single row result. + * + * Used for whole-file out-of-line blobs where no offset or length is specified. + * + * @param rowInfo Row information with the file path + * @param outputSchema Schema for output rows + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Sequence containing a single row result + */ + private def readWholeFile[R]( + rowInfo: RowInfo[R], + outputSchema: StructType) + (implicit builder: RowBuilder[R]): RowResult[R] = { + + var inputStream: SeekableDataInputStream = null + try { + val path = new StoragePath(rowInfo.filePath) + val fileLength = storage.getPathInfo(path).getLength.toInt Review Comment: any suggestions for how to fix it? the reader requires int -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
