voonhous commented on code in PR #18328:
URL: https://github.com/apache/hudi/pull/18328#discussion_r2993530241


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -290,4 +307,50 @@ object SparkFileFormatInternalRowReaderContext {
     field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
   }
 
+  /**
+   * Detects VECTOR columns from HoodieSchema.
+   * Delegates to [[VectorConversionUtils.detectVectorColumns]].
+   * @return Map of ordinal to Vector schema for VECTOR fields.
+   */
+  private[hudi] def detectVectorColumns(schema: HoodieSchema): Map[Int, 
HoodieSchema.Vector] = {
+    VectorConversionUtils.detectVectorColumns(schema).asScala.map { case (k, 
v) => (k.intValue(), v) }.toMap
+  }
+
+  /**
+   * Detects VECTOR columns from Spark StructType metadata.
+   * Delegates to [[VectorConversionUtils.detectVectorColumnsFromMetadata]].
+   * @return Map of ordinal to Vector schema for VECTOR fields.
+   */
+  def detectVectorColumnsFromMetadata(schema: StructType): Map[Int, 
HoodieSchema.Vector] = {
+    VectorConversionUtils.detectVectorColumnsFromMetadata(schema).asScala.map 
{ case (k, v) => (k.intValue(), v) }.toMap
+  }
+
+  /**
+   * Replaces ArrayType with BinaryType for VECTOR columns so the Parquet 
reader
+   * can read FIXED_LEN_BYTE_ARRAY data without type mismatch.
+   * Delegates to [[VectorConversionUtils.replaceVectorColumnsWithBinary]].
+   */
+  def replaceVectorColumnsWithBinary(structType: StructType, vectorColumns: 
Map[Int, HoodieSchema.Vector]): StructType = {
+    val javaMap = vectorColumns.map { case (k, v) => (Integer.valueOf(k), 
v.asInstanceOf[AnyRef]) }.asJava
+    VectorConversionUtils.replaceVectorColumnsWithBinary(structType, javaMap)
+  }
+
+  /**
+   * Wraps an iterator to convert binary VECTOR columns back to typed arrays.
+   * Unpacks bytes from FIXED_LEN_BYTE_ARRAY into GenericArrayData using the 
canonical vector byte order.
+   */
+  private[hudi] def wrapWithVectorConversion(
+      iterator: ClosableIterator[InternalRow],
+      vectorColumns: Map[Int, HoodieSchema.Vector],
+      readSchema: StructType): ClosableIterator[InternalRow] = {
+    val javaVectorCols: java.util.Map[Integer, HoodieSchema.Vector] =
+      vectorColumns.map { case (k, v) => (Integer.valueOf(k), v) }.asJava
+    val mapper = VectorConversionUtils.buildRowMapper(readSchema, 
javaVectorCols, row => row)

Review Comment:
   This looks unsafe. The callback is an identity here, i.e. returning row as 
the converted. `buildRowMapper` reuses a single GenericInternalRow buffer 
across all calls. If a downstream operator retains a reference to a previous 
row, it would silently see overwritten data and cause SDC/correctness issues.
   
   There are other call sites, e.g. HoodieSparkParquetReader.java that are 
doing this:
   
   ```java
   VectorConversionUtils.buildRowMapper(readStructSchema, vectorColumnInfo, 
vectorProjection::apply);
   ```
   
   Maybe we can do something similar and add:
   
   ```scala
   val projection = UnsafeProjection.create(readSchema)
   val mapper = VectorConversionUtils.buildRowMapper(readSchema, 
javaVectorCols, projection.apply(_))
   ```



##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -248,8 +248,20 @@ private Type convertField(String fieldName, HoodieSchema 
schema, Type.Repetition
           builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, 
repetition).length(schema.getFixedSize());
         }
         break;
+      case VECTOR:
+        // Vectors are stored as bare FIXED_LEN_BYTE_ARRAY without a Parquet 
logical type annotation.
+        // Vector semantics (dimension, element type) are resolved from 
HoodieSchema (the table's
+        // stored schema), not from the Parquet file schema. The reverse 
direction
+        // (FIXED_LEN_BYTE_ARRAY → HoodieSchema) currently maps to generic 
FIXED; this is
+        // acceptable because the read path detects vectors from the 
HoodieSchema, not from Parquet.
+        // TODO: Consider adding VectorLogicalTypeAnnotation for fully 
self-describing Parquet files.
+        HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+        int fixedSize = vectorSchema.getDimension()
+                * vectorSchema.getVectorElementType().getElementSize();
+        builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, 
repetition).length(fixedSize);
+        break;
       case UNION:
-        return convertUnion(fieldName, schema, repetition, schemaPath);
+    return convertUnion(fieldName, schema, repetition, schemaPath);

Review Comment:
   NIT: Don't think this is a required hcange.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+/**
+ * Shared utility methods for vector column handling during Parquet read/write.
+ *
+ * Vectors are stored as Parquet FIXED_LEN_BYTE_ARRAY columns. On read, Spark 
maps these
+ * to BinaryType. This class provides the canonical conversion between the 
binary
+ * representation and Spark's typed ArrayData (float[], double[], byte[]).
+ *
+ */
+public final class VectorConversionUtils {
+
+  private VectorConversionUtils() {
+  }
+
+  /**
+   * Detects VECTOR columns in a HoodieSchema record and returns a map of 
field ordinal
+   * to the corresponding {@link HoodieSchema.Vector} schema.
+   *
+   * @param schema a HoodieSchema of type RECORD (or null)
+   * @return map from field index to Vector schema; empty map if schema is 
null or has no vectors
+   */
+  public static Map<Integer, HoodieSchema.Vector> 
detectVectorColumns(HoodieSchema schema) {
+    Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
+    if (schema == null) {
+      return vectorColumnInfo;
+    }
+    List<HoodieSchemaField> fields = schema.getFields();
+    for (int i = 0; i < fields.size(); i++) {
+      HoodieSchema fieldSchema = fields.get(i).schema().getNonNullType();
+      if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+        vectorColumnInfo.put(i, (HoodieSchema.Vector) fieldSchema);
+      }
+    }
+    return vectorColumnInfo;
+  }
+
+  /**
+   * Detects VECTOR columns from Spark StructType metadata annotations.
+   * Fields with metadata key {@link HoodieSchema#TYPE_METADATA_FIELD} 
starting with "VECTOR"
+   * are parsed and included.
+   *
+   * @param schema Spark StructType
+   * @return map from field index to Vector schema; empty map if no vectors 
found
+   */
+  public static Map<Integer, HoodieSchema.Vector> 
detectVectorColumnsFromMetadata(StructType schema) {
+    Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
+    if (schema == null) {
+      return vectorColumnInfo;
+    }
+    StructField[] fields = schema.fields();
+    for (int i = 0; i < fields.length; i++) {
+      StructField field = fields[i];
+      if (field.metadata().contains(HoodieSchema.TYPE_METADATA_FIELD)) {
+        String typeStr = 
field.metadata().getString(HoodieSchema.TYPE_METADATA_FIELD);
+        if (typeStr.startsWith("VECTOR")) {
+          HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeStr);
+          if (parsed.getType() == HoodieSchemaType.VECTOR) {
+            vectorColumnInfo.put(i, (HoodieSchema.Vector) parsed);
+          }
+        }
+      }
+    }
+    return vectorColumnInfo;
+  }
+
+  /**
+   * Replaces ArrayType with BinaryType for VECTOR columns so the Parquet 
reader
+   * can read FIXED_LEN_BYTE_ARRAY data without type mismatch.
+   *
+   * @param structType     the original Spark schema
+   * @param vectorColumns  map of ordinal to vector info (only the key set is 
used)
+   * @return a new StructType with vector columns replaced by BinaryType
+   */
+  public static StructType replaceVectorColumnsWithBinary(StructType 
structType, Map<Integer, ?> vectorColumns) {
+    StructField[] fields = structType.fields();
+    StructField[] newFields = new StructField[fields.length];
+    for (int i = 0; i < fields.length; i++) {
+      if (vectorColumns.containsKey(i)) {
+        // Preserve the original field metadata (including hudi_type) so that 
downstream code
+        // calling detectVectorColumnsFromMetadata on the modified schema 
still finds vectors.
+        newFields[i] = new StructField(fields[i].name(), BinaryType$.MODULE$, 
fields[i].nullable(), fields[i].metadata());
+      } else {
+        newFields[i] = fields[i];
+      }
+    }
+    return new StructType(newFields);
+  }
+
+  /**
+   * Converts binary bytes from a FIXED_LEN_BYTE_ARRAY Parquet column back to 
a typed array
+   * based on the vector's element type and dimension.
+   *
+   * @param bytes        raw bytes read from Parquet
+   * @param vectorSchema the vector schema describing dimension and element 
type
+   * @return an ArrayData containing the decoded float[], double[], or byte[] 
array
+   * @throws IllegalArgumentException if byte array length doesn't match 
expected size
+   */
+  public static ArrayData convertBinaryToVectorArray(byte[] bytes, 
HoodieSchema.Vector vectorSchema) {
+    return convertBinaryToVectorArray(bytes, vectorSchema.getDimension(), 
vectorSchema.getVectorElementType());
+  }
+
+  /**
+   * Converts binary bytes from a FIXED_LEN_BYTE_ARRAY Parquet column back to 
a typed array.
+   *
+   * @param bytes    raw bytes read from Parquet
+   * @param dim      vector dimension (number of elements)
+   * @param elemType element type (FLOAT, DOUBLE, or INT8)
+   * @return an ArrayData containing the decoded float[], double[], or byte[] 
array
+   * @throws IllegalArgumentException if byte array length doesn't match 
expected size
+   */
+  public static ArrayData convertBinaryToVectorArray(byte[] bytes, int dim,
+                                                     
HoodieSchema.Vector.VectorElementType elemType) {
+    int expectedSize = dim * elemType.getElementSize();
+    checkArgument(bytes.length == expectedSize,
+        "Vector byte array length mismatch: expected " + expectedSize + " but 
got " + bytes.length);
+    ByteBuffer buffer = 
ByteBuffer.wrap(bytes).order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+    switch (elemType) {
+      case FLOAT:
+        float[] floats = new float[dim];
+        for (int j = 0; j < dim; j++) {
+          floats[j] = buffer.getFloat();
+        }
+        return new GenericArrayData(floats);
+      case DOUBLE:
+        double[] doubles = new double[dim];
+        for (int j = 0; j < dim; j++) {
+          doubles[j] = buffer.getDouble();
+        }
+        return new GenericArrayData(doubles);
+      case INT8:
+        byte[] int8s = new byte[dim];
+        buffer.get(int8s);
+        // Use UnsafeArrayData to avoid boxing each byte into a Byte object.
+        // GenericArrayData(byte[]) would box every element into Object[].
+        return UnsafeArrayData.fromPrimitiveArray(int8s);
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported vector element type: " + elemType);
+    }
+  }
+
+  /**
+   * Returns a {@link Function} that converts a single {@link InternalRow} by 
converting binary
+   * vector columns back to typed arrays and then applying the given 
projection callback.
+   *
+   * <p>Ordinals in {@code vectorColumns} must be relative to {@code 
readSchema} — the schema
+   * that has {@code BinaryType} for vector columns (as produced by
+   * {@link #replaceVectorColumnsWithBinary}).
+   *
+   * <p><b>Thread safety:</b> The returned function is NOT thread-safe; it 
reuses a single
+   * {@link GenericInternalRow} buffer across calls. Each call to this factory 
creates its own
+   * buffer, so separate functions returned by separate calls are independent.
+   *
+   * @param readSchema         the Spark schema of incoming rows (BinaryType 
for vector columns)
+   * @param vectorColumns      map of ordinal → Vector schema for vector 
columns, keyed by
+   *                           ordinals relative to {@code readSchema}
+   * @param projectionCallback called with the converted {@link 
GenericInternalRow}; must copy
+   *                           any data it needs to retain (e.g. {@code 
UnsafeProjection::apply})
+   * @return a function that converts one row and returns the projected result
+   */
+  public static Function<InternalRow, InternalRow> buildRowMapper(
+      StructType readSchema,
+      Map<Integer, HoodieSchema.Vector> vectorColumns,
+      Function<InternalRow, InternalRow> projectionCallback) {
+    GenericInternalRow converted = new 
GenericInternalRow(readSchema.fields().length);
+    return row -> {
+      convertRowVectorColumns(row, converted, readSchema, vectorColumns);
+      return projectionCallback.apply(converted);
+    };
+  }
+
+  /**
+   * Delegates to {@link 
HoodieSchema#buildVectorColumnsMetadataValue(HoodieSchema)}.
+   */
+  public static String buildVectorColumnsMetadataValue(HoodieSchema schema) {
+    return HoodieSchema.buildVectorColumnsMetadataValue(schema);
+  }

Review Comment:
   Nit: Unnecessary delegation, doesn't seem to be required. Feel free to 
ignore since this is a nit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to