This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 3065e8dbd GH-2961: Cycle detection in AvroSchemaConverter to prevent
infinite recursion (#3272)
3065e8dbd is described below
commit 3065e8dbd61f55280893b3e5fc44548d670a3461
Author: Arnav Balyan <[email protected]>
AuthorDate: Tue Sep 2 11:12:54 2025 +0530
GH-2961: Cycle detection in AvroSchemaConverter to prevent infinite
recursion (#3272)
---
.../apache/parquet/avro/AvroSchemaConverter.java | 85 +++++++++++++++++++---
.../parquet/avro/TestAvroSchemaConverter.java | 80 ++++++++++++++++++++
2 files changed, 153 insertions(+), 12 deletions(-)
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 8e5a58df8..58e9c2e19 100644
---
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -51,6 +51,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -150,16 +151,23 @@ public class AvroSchemaConverter {
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
- return new MessageType(avroSchema.getFullName(),
convertFields(avroSchema.getFields(), ""));
+ return new MessageType(
+ avroSchema.getFullName(),
+ convertFields(avroSchema.getFields(), "", new IdentityHashMap<Schema,
Void>()));
}
private List<Type> convertFields(List<Schema.Field> fields, String
schemaPath) {
+ return convertFields(fields, schemaPath, new IdentityHashMap<Schema,
Void>());
+ }
+
+ private List<Type> convertFields(
+ List<Schema.Field> fields, String schemaPath, IdentityHashMap<Schema,
Void> seenSchemas) {
List<Type> types = new ArrayList<Type>();
for (Schema.Field field : fields) {
if (field.schema().getType().equals(Schema.Type.NULL)) {
continue; // Avro nulls are not encoded, unless they are null unions
}
- types.add(convertField(field, appendPath(schemaPath, field.name())));
+ types.add(convertField(field, appendPath(schemaPath, field.name()),
seenSchemas));
}
return types;
}
@@ -168,11 +176,37 @@ public class AvroSchemaConverter {
return convertField(fieldName, schema, Type.Repetition.REQUIRED,
schemaPath);
}
+ private Type convertField(
+ String fieldName, Schema schema, String schemaPath,
IdentityHashMap<Schema, Void> seenSchemas) {
+ return convertField(fieldName, schema, Type.Repetition.REQUIRED,
schemaPath, seenSchemas);
+ }
+
@SuppressWarnings("deprecation")
private Type convertField(String fieldName, Schema schema, Type.Repetition
repetition, String schemaPath) {
- Types.PrimitiveBuilder<PrimitiveType> builder;
+ return convertField(fieldName, schema, repetition, schemaPath, new
IdentityHashMap<Schema, Void>());
+ }
+
+ @SuppressWarnings("deprecation")
+ private Type convertField(
+ String fieldName,
+ Schema schema,
+ Type.Repetition repetition,
+ String schemaPath,
+ IdentityHashMap<Schema, Void> seenSchemas) {
Schema.Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();
+
+ if (type.equals(Schema.Type.RECORD) || type.equals(Schema.Type.ENUM) ||
type.equals(Schema.Type.FIXED)) {
+ // If this schema has already been seen in the current branch, we have a
recursion loop
+ if (seenSchemas.containsKey(schema)) {
+ throw new UnsupportedOperationException(
+ "Recursive Avro schemas are not supported by parquet-avro: " +
schema.getFullName());
+ }
+ seenSchemas = new IdentityHashMap<>(seenSchemas);
+ seenSchemas.put(schema, null);
+ }
+
+ Types.PrimitiveBuilder<PrimitiveType> builder;
if (type.equals(Schema.Type.BOOLEAN)) {
builder = Types.primitive(BOOLEAN, repetition);
} else if (type.equals(Schema.Type.INT)) {
@@ -195,21 +229,24 @@ public class AvroSchemaConverter {
builder = Types.primitive(BINARY, repetition).as(stringType());
}
} else if (type.equals(Schema.Type.RECORD)) {
- return new GroupType(repetition, fieldName,
convertFields(schema.getFields(), schemaPath));
+ return new GroupType(repetition, fieldName,
convertFields(schema.getFields(), schemaPath, seenSchemas));
} else if (type.equals(Schema.Type.ENUM)) {
builder = Types.primitive(BINARY, repetition).as(enumType());
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(
- repetition, fieldName, convertField("array",
schema.getElementType(), REPEATED, schemaPath));
+ repetition,
+ fieldName,
+ convertField("array", schema.getElementType(), REPEATED,
schemaPath, seenSchemas));
} else {
return ConversionPatterns.listOfElements(
repetition,
fieldName,
- convertField(AvroWriteSupport.LIST_ELEMENT_NAME,
schema.getElementType(), schemaPath));
+ convertField(
+ AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(),
schemaPath, seenSchemas));
}
} else if (type.equals(Schema.Type.MAP)) {
- Type valType = convertField("value", schema.getValueType(), schemaPath);
+ Type valType = convertField("value", schema.getValueType(), schemaPath,
seenSchemas);
// avro map key type is always string
return ConversionPatterns.stringKeyMapType(repetition, fieldName,
valType);
} else if (type.equals(Schema.Type.FIXED)) {
@@ -223,7 +260,7 @@ public class AvroSchemaConverter {
builder = Types.primitive(FIXED_LEN_BYTE_ARRAY,
repetition).length(schema.getFixedSize());
}
} else if (type.equals(Schema.Type.UNION)) {
- return convertUnion(fieldName, schema, repetition, schemaPath);
+ return convertUnion(fieldName, schema, repetition, schemaPath,
seenSchemas);
} else {
throw new UnsupportedOperationException("Cannot convert Avro type " +
type);
}
@@ -246,6 +283,15 @@ public class AvroSchemaConverter {
}
private Type convertUnion(String fieldName, Schema schema, Type.Repetition
repetition, String schemaPath) {
+ return convertUnion(fieldName, schema, repetition, schemaPath, new
IdentityHashMap<Schema, Void>());
+ }
+
+ private Type convertUnion(
+ String fieldName,
+ Schema schema,
+ Type.Repetition repetition,
+ String schemaPath,
+ IdentityHashMap<Schema, Void> seenSchemas) {
List<Schema> nonNullSchemas = new
ArrayList<Schema>(schema.getTypes().size());
// Found any schemas in the union? Required for the edge case, where the
union contains only a single type.
boolean foundNullSchema = false;
@@ -267,20 +313,31 @@ public class AvroSchemaConverter {
case 1:
return foundNullSchema
- ? convertField(fieldName, nonNullSchemas.get(0), repetition,
schemaPath)
- : convertUnionToGroupType(fieldName, repetition, nonNullSchemas,
schemaPath);
+ ? convertField(fieldName, nonNullSchemas.get(0), repetition,
schemaPath, seenSchemas)
+ : convertUnionToGroupType(fieldName, repetition, nonNullSchemas,
schemaPath, seenSchemas);
default: // complex union type
- return convertUnionToGroupType(fieldName, repetition, nonNullSchemas,
schemaPath);
+ return convertUnionToGroupType(fieldName, repetition, nonNullSchemas,
schemaPath, seenSchemas);
}
}
private Type convertUnionToGroupType(
String fieldName, Type.Repetition repetition, List<Schema>
nonNullSchemas, String schemaPath) {
+ return convertUnionToGroupType(
+ fieldName, repetition, nonNullSchemas, schemaPath, new
IdentityHashMap<Schema, Void>());
+ }
+
+ private Type convertUnionToGroupType(
+ String fieldName,
+ Type.Repetition repetition,
+ List<Schema> nonNullSchemas,
+ String schemaPath,
+ IdentityHashMap<Schema, Void> seenSchemas) {
List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
int index = 0;
for (Schema childSchema : nonNullSchemas) {
- unionTypes.add(convertField("member" + index++, childSchema,
Type.Repetition.OPTIONAL, schemaPath));
+ unionTypes.add(
+ convertField("member" + index++, childSchema,
Type.Repetition.OPTIONAL, schemaPath, seenSchemas));
}
return new GroupType(repetition, fieldName, unionTypes);
}
@@ -289,6 +346,10 @@ public class AvroSchemaConverter {
return convertField(field.name(), field.schema(), schemaPath);
}
+ private Type convertField(Schema.Field field, String schemaPath,
IdentityHashMap<Schema, Void> seenSchemas) {
+ return convertField(field.name(), field.schema(), schemaPath, seenSchemas);
+ }
+
public Schema convert(MessageType parquetSchema) {
return convertFields(parquetSchema.getName(), parquetSchema.getFields(),
new HashMap<>());
}
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 077e9cecd..d54cd4310 100644
---
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -965,6 +965,86 @@ public class TestAvroSchemaConverter {
() -> new AvroSchemaConverter(conf).convert(schema));
}
+ @Test
+ public void testRecursiveSchemaThrowsException() {
+ String recursiveSchemaJson = "{"
+ + "\"type\": \"record\", \"name\": \"Node\", \"fields\": ["
+ + " {\"name\": \"value\", \"type\": \"int\"},"
+ + " {\"name\": \"children\", \"type\": ["
+ + " \"null\", {"
+ + " \"type\": \"array\", \"items\": [\"null\", \"Node\"]"
+ + " }"
+ + " ], \"default\": null}"
+ + "]}";
+
+ Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson);
+
+ assertThrows(
+ "Recursive Avro schema should throw UnsupportedOperationException for
cycles",
+ UnsupportedOperationException.class,
+ () -> new AvroSchemaConverter().convert(recursiveSchema));
+ }
+
+ @Test
+ public void testRecursiveSchemaFromGitHubIssue() {
+ String issueSchemaJson = "{"
+ + "\"type\": \"record\", \"name\": \"ObjXX\", \"fields\": ["
+ + " {\"name\": \"id\", \"type\": [\"null\", \"long\"], \"default\":
null},"
+ + " {\"name\": \"struct_add_list\", \"type\": [\"null\", {"
+ + " \"type\": \"array\", \"items\": [\"null\", {"
+ + " \"type\": \"record\", \"name\": \"ObjStructAdd\", \"fields\":
["
+ + " {\"name\": \"name\", \"type\": [\"null\", \"string\"],
\"default\": null},"
+ + " {\"name\": \"fld_list\", \"type\": [\"null\", {"
+ + " \"type\": \"array\", \"items\": [\"null\", {"
+ + " \"type\": \"record\", \"name\": \"ObjStructAddFld\",
\"fields\": ["
+ + " {\"name\": \"name\", \"type\": [\"null\",
\"string\"], \"default\": null},"
+ + " {\"name\": \"ref_val\", \"type\": [\"null\",
\"ObjStructAdd\"], \"default\": null}"
+ + " ]"
+ + " }]"
+ + " }], \"default\": null}"
+ + " ]"
+ + " }]"
+ + " }], \"default\": null},"
+ + " {\"name\": \"kafka_timestamp\", \"type\": {\"type\": \"long\",
\"logicalType\": \"timestamp-millis\"}}"
+ + "]}";
+
+ Schema issueSchema = new Schema.Parser().parse(issueSchemaJson);
+
+ assertThrows(
+ "Schema hould throw UnsupportedOperationException for cycles",
+ UnsupportedOperationException.class,
+ () -> new AvroSchemaConverter().convert(issueSchema));
+ }
+
+ @Test
+ public void testRecursiveSchemaErrorMessage() {
+ String recursiveSchemaJson = "{"
+ + "\"type\": \"record\", \"name\": \"TestRecord\", \"fields\": ["
+ + " {\"name\": \"self\", \"type\": [\"null\", \"TestRecord\"],
\"default\": null}"
+ + "]}";
+
+ Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson);
+
+ // With our cycle detection fix, this should throw
UnsupportedOperationException
+ assertThrows(
+ "Recursive schema should throw UnsupportedOperationException with
clear error message",
+ UnsupportedOperationException.class,
+ () -> new AvroSchemaConverter().convert(recursiveSchema));
+ }
+
+ @Test
+ public void testDeeplyNestedNonRecursiveSchema() {
+ Schema level3 = record("Level3", field("value", primitive(STRING)));
+ Schema level2 = record("Level2", field("level3", level3));
+ Schema level1 = record("Level1", field("level2", level2));
+ Schema rootSchema = record("Root", field("level1", level1));
+
+ AvroSchemaConverter converter = new AvroSchemaConverter();
+ MessageType result = converter.convert(rootSchema);
+ Assert.assertNotNull("Non-recursive deep schema should convert
successfully", result);
+ Assert.assertEquals("Root schema name should be preserved", "Root",
result.getName());
+ }
+
public static Schema optional(Schema original) {
return
Schema.createUnion(Lists.newArrayList(Schema.create(Schema.Type.NULL),
original));
}