voonhous commented on code in PR #18328:
URL: https://github.com/apache/hudi/pull/18328#discussion_r2980185030
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -398,30 +423,78 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
}
}
+ private def detectVectorColumns(schema: StructType): Map[Int,
HoodieSchema.Vector] =
+
SparkFileFormatInternalRowReaderContext.detectVectorColumnsFromMetadata(schema)
+
+ private def replaceVectorFieldsWithBinary(schema: StructType, vectorCols:
Map[Int, HoodieSchema.Vector]): StructType =
+
SparkFileFormatInternalRowReaderContext.replaceVectorColumnsWithBinary(schema,
vectorCols)
+
+ /**
+ * Wraps an iterator to convert binary VECTOR columns back to typed arrays.
+ * The read schema has BinaryType for vector columns; the target schema has
ArrayType.
+ */
+ private def wrapWithVectorConversion(iter: Iterator[InternalRow],
+ readSchema: StructType,
+ targetSchema: StructType,
+ vectorCols: Map[Int,
HoodieSchema.Vector]): Iterator[InternalRow] = {
+ val vectorProjection = UnsafeProjection.create(targetSchema)
+ val javaVectorCols: java.util.Map[Integer, HoodieSchema.Vector] =
+ vectorCols.map { case (k, v) => (Integer.valueOf(k), v) }.asJava
+ val mapper = VectorConversionUtils.buildRowMapper(readSchema,
javaVectorCols, vectorProjection.apply(_))
+ iter.map(mapper.apply(_))
+ }
+
// executor
private def readBaseFile(file: PartitionedFile, parquetFileReader:
SparkColumnarFileReader, requestedSchema: StructType,
remainingPartitionSchema: StructType,
fixedPartitionIndexes: Set[Int], requiredSchema: StructType,
partitionSchema: StructType, outputSchema:
StructType, filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]):
Iterator[InternalRow] = {
Review Comment:
Possible to reduce boilerplate to this function to reduce its complexity?
There's 3 separate `detectVectorColumns` + `replaceVectorFieldsWithBinary`
calls. We can add:
```
private def withVectorRewrite(schema: StructType): (StructType, Map[Int,
HoodieSchema.Vector]) = {
val vecs = detectVectorColumns(schema)
if (vecs.nonEmpty) (replaceVectorFieldsWithBinary(schema, vecs), vecs)
else (schema, vecs)
...
}
```
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java:
##########
@@ -73,7 +73,8 @@ enum TypeID {
TIME_MILLIS(Integer.class),
TIMESTAMP_MILLIS(Long.class),
LOCAL_TIMESTAMP_MILLIS(Long.class),
- LOCAL_TIMESTAMP_MICROS(Long.class);
+ LOCAL_TIMESTAMP_MICROS(Long.class),
+ VECTOR(ByteBuffer.class);
Review Comment:
Don't quite understand this, vectors aren't accessed as `ByteBuffer` through
the InternalSchema API, we are using `byte[].class` right?
Possible to add a comment here for choosing `ByteBuffer`?
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -538,6 +544,13 @@ private static HoodieSchema
visitInternalPrimitiveToBuildHoodiePrimitiveType(Typ
return HoodieSchema.createFixed(name, null, null,
fixed.getFixedSize());
}
+ case VECTOR: {
+ Types.VectorType vector = (Types.VectorType) primitive;
+ return HoodieSchema.createVector(
+ vector.getDimension(),
+
HoodieSchema.Vector.VectorElementType.fromString(vector.getElementType()));
+ }
Review Comment:
This `StorageBacking` is lost in InternalSchema round-trip here iIUC.
`Types.VectorType` stores storageBacking and `VectorType.get()` accepts it, but
this conversion back to `HoodieSchema` doesn't pass it through.
This is fine for now since only `PARQUET_FIXED_LEN_BYTE_ARRAY` exists, but
it'll silently lose data when new backing types are added. Maybe we should pass
it through or add a comment noting the assumption?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]