yihua commented on code in PR #18431:
URL: https://github.com/apache/hudi/pull/18431#discussion_r3019177209
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -60,6 +60,24 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
recordName: String = "topLevelRecord",
nameSpace: String = "",
metadata: Metadata = Metadata.empty): HoodieSchema = {
+ toHoodieTypeNested(catalystType, nullable, recordName, nameSpace,
metadata, depth = 0)
Review Comment:
Does vector type support always goes through this code path given that this
is Spark specific? What about other engines, is there already a guard to fail
the write if vector type is included in the schema?
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -420,6 +421,11 @@ public static HoodieSchema createNullable(HoodieSchema
schema) {
public static HoodieSchema createArray(HoodieSchema elementSchema) {
ValidationUtils.checkArgument(elementSchema != null, "Element schema
cannot be null");
+ if (elementSchema.getNonNullType().getType() == HoodieSchemaType.VECTOR) {
Review Comment:
Have you considered adding a similar guard in `createRecord` to reject a
VECTOR field nested inside a child record? e.g. `createRecord("inner", ...,
fields=[vectorField])` used as a field of an outer record would slip through.
The Spark converter already catches this, so it's not urgent, but it would make
the Java API defense-in-depth more complete.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala:
##########
@@ -991,10 +991,34 @@ class TestVectorDataSource extends
HoodieSparkClientTestBase {
}
}
+ @Test
+ def testNestedVectorWriteThrows(): Unit = {
+ // A VECTOR nested inside a struct field must be rejected at write time.
+ val meta = vectorMetadata("VECTOR(4)")
+ val nestedStruct = StructType(Seq(
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = meta)
+ ))
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("data", nestedStruct, nullable = false)
+ ))
+ val data = Seq(Row("key_1", Row(Seq(1.0f, 2.0f, 3.0f, 4.0f))))
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
+
+ val ex = assertThrows(classOf[Exception], () => {
+ writeHudiTable(df, "nested_vector_test", basePath + "/nested_vector")
+ })
+ assertTrue(nestedVectorMessageInCauseChain(ex),
+ s"Expected nested VECTOR guard to fire, but got: ${ex.getMessage}")
+ }
+
private def assertArrayEquals(expected: Array[Byte], actual: Array[Byte],
message: String): Unit = {
assertEquals(expected.length, actual.length, s"$message: length mismatch")
expected.zip(actual).zipWithIndex.foreach { case ((e, a), idx) =>
assertEquals(e, a, s"$message: mismatch at index $idx")
}
}
+
+ private def nestedVectorMessageInCauseChain(ex: Throwable): Boolean =
+ ex != null && (Option(ex.getMessage).exists(_.contains("top-level field"))
|| nestedVectorMessageInCauseChain(ex.getCause))
Review Comment:
Similar on error message validation to be stricter
##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java:
##########
@@ -2845,4 +2838,21 @@ public void testBlobTypeDescriptorRoundTrip() {
assertEquals(HoodieSchemaType.BLOB, parsed.getType());
assertInstanceOf(HoodieSchema.Blob.class, parsed);
}
+
+ @Test
+ public void testCreateArrayWithNullableVectorThrows() {
+ HoodieSchema vectorSchema =
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+ () -> HoodieSchema.createArray(vectorSchema));
+ assertTrue(ex.getMessage().contains("top-level fields"));
+ }
+
+ @Test
+ public void testCreateMapWithNullableVectorThrows() {
+ HoodieSchema vectorSchema =
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+ () -> HoodieSchema.createMap(vectorSchema));
+ assertTrue(ex.getMessage().contains("top-level fields"));
Review Comment:
Same here on exception messages
##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java:
##########
@@ -1045,54 +1046,46 @@ void testVectorSerialization() throws Exception {
}
@Test
- void testVectorInNestedStructures() throws Exception {
- // Create vector schema
+ void testVectorAsTopLevelRecordField() {
HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
- // Test vector in record - verify it can be used as a field
List<HoodieSchemaField> fields = Arrays.asList(
HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
HoodieSchemaField.of("embedding", vectorSchema)
);
HoodieSchema recordSchema = HoodieSchema.createRecord("TestRecord", null,
null, fields);
assertEquals(HoodieSchemaType.RECORD, recordSchema.getType());
- // Verify vector field is preserved in the Avro schema
+ // Verify the vector field is preserved in the Avro schema
Schema.Field embeddingField =
recordSchema.getAvroSchema().getField("embedding");
assertNotNull(embeddingField);
HoodieSchema embeddingSchema =
HoodieSchema.fromAvroSchema(embeddingField.schema());
assertVector(embeddingSchema, 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
- // Round-trip record with vector field through JSON
+ // Round-trip the record with the vector field through JSON
String recordJson = recordSchema.toString();
HoodieSchema parsedRecord = HoodieSchema.parse(recordJson);
assertEquals(recordSchema, parsedRecord);
Schema.Field parsedEmbeddingField =
parsedRecord.getAvroSchema().getField("embedding");
assertNotNull(parsedEmbeddingField);
HoodieSchema parsedEmbedding =
HoodieSchema.fromAvroSchema(parsedEmbeddingField.schema());
assertVector(parsedEmbedding, 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ }
- // Test vector in array
- HoodieSchema arraySchema = HoodieSchema.createArray(vectorSchema);
- assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType());
- assertVector(arraySchema.getElementType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
-
- // Round-trip array of vectors through JSON
- String arrayJson = arraySchema.toString();
- HoodieSchema parsedArray = HoodieSchema.parse(arrayJson);
- assertEquals(arraySchema, parsedArray);
- assertVector(parsedArray.getElementType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
-
- // Test vector in map
- HoodieSchema mapSchema = HoodieSchema.createMap(vectorSchema);
- assertEquals(HoodieSchemaType.MAP, mapSchema.getType());
- assertVector(mapSchema.getValueType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ @Test
+ void testVectorAsArrayElementThrows() {
+ HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+ () -> HoodieSchema.createArray(vectorSchema));
+ assertTrue(ex.getMessage().contains("top-level fields"));
Review Comment:
nit: use complete exception message for validation?
##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java:
##########
@@ -1045,54 +1046,46 @@ void testVectorSerialization() throws Exception {
}
@Test
- void testVectorInNestedStructures() throws Exception {
- // Create vector schema
+ void testVectorAsTopLevelRecordField() {
HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
- // Test vector in record - verify it can be used as a field
List<HoodieSchemaField> fields = Arrays.asList(
HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
HoodieSchemaField.of("embedding", vectorSchema)
);
HoodieSchema recordSchema = HoodieSchema.createRecord("TestRecord", null,
null, fields);
assertEquals(HoodieSchemaType.RECORD, recordSchema.getType());
- // Verify vector field is preserved in the Avro schema
+ // Verify the vector field is preserved in the Avro schema
Schema.Field embeddingField =
recordSchema.getAvroSchema().getField("embedding");
assertNotNull(embeddingField);
HoodieSchema embeddingSchema =
HoodieSchema.fromAvroSchema(embeddingField.schema());
assertVector(embeddingSchema, 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
- // Round-trip record with vector field through JSON
+ // Round-trip the record with the vector field through JSON
String recordJson = recordSchema.toString();
HoodieSchema parsedRecord = HoodieSchema.parse(recordJson);
assertEquals(recordSchema, parsedRecord);
Schema.Field parsedEmbeddingField =
parsedRecord.getAvroSchema().getField("embedding");
assertNotNull(parsedEmbeddingField);
HoodieSchema parsedEmbedding =
HoodieSchema.fromAvroSchema(parsedEmbeddingField.schema());
assertVector(parsedEmbedding, 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ }
- // Test vector in array
- HoodieSchema arraySchema = HoodieSchema.createArray(vectorSchema);
- assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType());
- assertVector(arraySchema.getElementType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
-
- // Round-trip array of vectors through JSON
- String arrayJson = arraySchema.toString();
- HoodieSchema parsedArray = HoodieSchema.parse(arrayJson);
- assertEquals(arraySchema, parsedArray);
- assertVector(parsedArray.getElementType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
-
- // Test vector in map
- HoodieSchema mapSchema = HoodieSchema.createMap(vectorSchema);
- assertEquals(HoodieSchemaType.MAP, mapSchema.getType());
- assertVector(mapSchema.getValueType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ @Test
+ void testVectorAsArrayElementThrows() {
+ HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+ () -> HoodieSchema.createArray(vectorSchema));
+ assertTrue(ex.getMessage().contains("top-level fields"));
+ }
- // Round-trip map with vector values through JSON
- String mapJson = mapSchema.toString();
- HoodieSchema parsedMap = HoodieSchema.parse(mapJson);
- assertEquals(mapSchema, parsedMap);
- assertVector(parsedMap.getValueType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ @Test
+ void testVectorAsMapValueThrows() {
+ HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+ () -> HoodieSchema.createMap(vectorSchema));
+ assertTrue(ex.getMessage().contains("top-level fields"));
Review Comment:
Similar here
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala:
##########
@@ -249,6 +249,59 @@ class TestSchemaConverters extends SparkAdapterSupport {
assertEquals(HoodieSchemaType.VARIANT,
nullableField.schema().getNonNullType.getType)
}
+ @Test
+ def testTopLevelVectorStillAllowed(): Unit = {
+ // Regression: top-level VECTOR fields must continue to work
Review Comment:
nit: remove this confusing comment
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala:
##########
@@ -249,6 +249,59 @@ class TestSchemaConverters extends SparkAdapterSupport {
assertEquals(HoodieSchemaType.VARIANT,
nullableField.schema().getNonNullType.getType)
}
+ @Test
+ def testTopLevelVectorStillAllowed(): Unit = {
+ // Regression: top-level VECTOR fields must continue to work
+ val vectorMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+ .build()
+ val sparkType = new StructType(Array[StructField](
+ StructField("id", DataTypes.LongType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = vectorMetadata)
+ ))
+ val hoodieSchema = HoodieSparkSchemaConverters.toHoodieType(sparkType,
recordName = "record")
+ assertEquals(HoodieSchemaType.VECTOR,
hoodieSchema.getField("embedding").get().schema().getType)
+ }
+
+ @Test
+ def testVectorInNestedStructThrows(): Unit = {
+ val vectorMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+ .build()
+ // Outer struct has a nested struct whose field is a VECTOR
+ val innerStruct = new StructType(Array[StructField](
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = vectorMetadata)
+ ))
+ val outerStruct = new StructType(Array[StructField](
+ StructField("id", DataTypes.LongType, nullable = false),
+ StructField("data", innerStruct, nullable = false)
+ ))
+ val exception = assertThrows(classOf[HoodieSchemaException], () => {
+ HoodieSparkSchemaConverters.toHoodieType(outerStruct, recordName =
"record")
+ })
+ assertTrue(exception.getMessage.contains("top-level field"))
+ assertTrue(exception.getMessage.contains("embedding"))
Review Comment:
Same here. Use the complete exception message for validation
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala:
##########
@@ -249,6 +249,59 @@ class TestSchemaConverters extends SparkAdapterSupport {
assertEquals(HoodieSchemaType.VARIANT,
nullableField.schema().getNonNullType.getType)
}
+ @Test
+ def testTopLevelVectorStillAllowed(): Unit = {
+ // Regression: top-level VECTOR fields must continue to work
+ val vectorMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+ .build()
+ val sparkType = new StructType(Array[StructField](
+ StructField("id", DataTypes.LongType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = vectorMetadata)
+ ))
+ val hoodieSchema = HoodieSparkSchemaConverters.toHoodieType(sparkType,
recordName = "record")
+ assertEquals(HoodieSchemaType.VECTOR,
hoodieSchema.getField("embedding").get().schema().getType)
+ }
+
+ @Test
+ def testVectorInNestedStructThrows(): Unit = {
+ val vectorMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+ .build()
+ // Outer struct has a nested struct whose field is a VECTOR
+ val innerStruct = new StructType(Array[StructField](
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = vectorMetadata)
+ ))
+ val outerStruct = new StructType(Array[StructField](
+ StructField("id", DataTypes.LongType, nullable = false),
+ StructField("data", innerStruct, nullable = false)
+ ))
+ val exception = assertThrows(classOf[HoodieSchemaException], () => {
+ HoodieSparkSchemaConverters.toHoodieType(outerStruct, recordName =
"record")
+ })
+ assertTrue(exception.getMessage.contains("top-level field"))
+ assertTrue(exception.getMessage.contains("embedding"))
+ }
+
+ @Test
+ def testVectorInsideArrayOfStructsThrows(): Unit = {
+ // VECTOR nested inside an array of structs: ARRAY<STRUCT<embedding
VECTOR(4)>>
+ val vectorMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+ .build()
+ val innerStruct = new StructType(Array[StructField](
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = vectorMetadata)
+ ))
+ val outerStruct = new StructType(Array[StructField](
+ StructField("items", ArrayType(innerStruct, containsNull = false),
nullable = false)
+ ))
+ val exception = assertThrows(classOf[HoodieSchemaException], () => {
+ HoodieSparkSchemaConverters.toHoodieType(outerStruct, recordName =
"record")
+ })
+ assertTrue(exception.getMessage.contains("top-level field"))
+ assertTrue(exception.getMessage.contains("embedding"))
Review Comment:
Same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]