This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new 3065e8dbd GH-2961: Cycle detection in AvroSchemaConverter to prevent 
infinite recursion (#3272)
3065e8dbd is described below

commit 3065e8dbd61f55280893b3e5fc44548d670a3461
Author: Arnav Balyan <[email protected]>
AuthorDate: Tue Sep 2 11:12:54 2025 +0530

    GH-2961: Cycle detection in AvroSchemaConverter to prevent infinite 
recursion (#3272)
---
 .../apache/parquet/avro/AvroSchemaConverter.java   | 85 +++++++++++++++++++---
 .../parquet/avro/TestAvroSchemaConverter.java      | 80 ++++++++++++++++++++
 2 files changed, 153 insertions(+), 12 deletions(-)

diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 8e5a58df8..58e9c2e19 100644
--- 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -51,6 +51,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -150,16 +151,23 @@ public class AvroSchemaConverter {
     if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
       throw new IllegalArgumentException("Avro schema must be a record.");
     }
-    return new MessageType(avroSchema.getFullName(), 
convertFields(avroSchema.getFields(), ""));
+    return new MessageType(
+        avroSchema.getFullName(),
+        convertFields(avroSchema.getFields(), "", new IdentityHashMap<Schema, 
Void>()));
   }
 
   private List<Type> convertFields(List<Schema.Field> fields, String 
schemaPath) {
+    return convertFields(fields, schemaPath, new IdentityHashMap<Schema, 
Void>());
+  }
+
+  private List<Type> convertFields(
+      List<Schema.Field> fields, String schemaPath, IdentityHashMap<Schema, 
Void> seenSchemas) {
     List<Type> types = new ArrayList<Type>();
     for (Schema.Field field : fields) {
       if (field.schema().getType().equals(Schema.Type.NULL)) {
         continue; // Avro nulls are not encoded, unless they are null unions
       }
-      types.add(convertField(field, appendPath(schemaPath, field.name())));
+      types.add(convertField(field, appendPath(schemaPath, field.name()), 
seenSchemas));
     }
     return types;
   }
@@ -168,11 +176,37 @@ public class AvroSchemaConverter {
     return convertField(fieldName, schema, Type.Repetition.REQUIRED, 
schemaPath);
   }
 
+  private Type convertField(
+      String fieldName, Schema schema, String schemaPath, 
IdentityHashMap<Schema, Void> seenSchemas) {
+    return convertField(fieldName, schema, Type.Repetition.REQUIRED, 
schemaPath, seenSchemas);
+  }
+
   @SuppressWarnings("deprecation")
   private Type convertField(String fieldName, Schema schema, Type.Repetition 
repetition, String schemaPath) {
-    Types.PrimitiveBuilder<PrimitiveType> builder;
+    return convertField(fieldName, schema, repetition, schemaPath, new 
IdentityHashMap<Schema, Void>());
+  }
+
+  @SuppressWarnings("deprecation")
+  private Type convertField(
+      String fieldName,
+      Schema schema,
+      Type.Repetition repetition,
+      String schemaPath,
+      IdentityHashMap<Schema, Void> seenSchemas) {
     Schema.Type type = schema.getType();
     LogicalType logicalType = schema.getLogicalType();
+
+    if (type.equals(Schema.Type.RECORD) || type.equals(Schema.Type.ENUM) || 
type.equals(Schema.Type.FIXED)) {
+      // If this schema has already been seen in the current branch, we have a 
recursion loop
+      if (seenSchemas.containsKey(schema)) {
+        throw new UnsupportedOperationException(
+            "Recursive Avro schemas are not supported by parquet-avro: " + 
schema.getFullName());
+      }
+      seenSchemas = new IdentityHashMap<>(seenSchemas);
+      seenSchemas.put(schema, null);
+    }
+
+    Types.PrimitiveBuilder<PrimitiveType> builder;
     if (type.equals(Schema.Type.BOOLEAN)) {
       builder = Types.primitive(BOOLEAN, repetition);
     } else if (type.equals(Schema.Type.INT)) {
@@ -195,21 +229,24 @@ public class AvroSchemaConverter {
         builder = Types.primitive(BINARY, repetition).as(stringType());
       }
     } else if (type.equals(Schema.Type.RECORD)) {
-      return new GroupType(repetition, fieldName, 
convertFields(schema.getFields(), schemaPath));
+      return new GroupType(repetition, fieldName, 
convertFields(schema.getFields(), schemaPath, seenSchemas));
     } else if (type.equals(Schema.Type.ENUM)) {
       builder = Types.primitive(BINARY, repetition).as(enumType());
     } else if (type.equals(Schema.Type.ARRAY)) {
       if (writeOldListStructure) {
         return ConversionPatterns.listType(
-            repetition, fieldName, convertField("array", 
schema.getElementType(), REPEATED, schemaPath));
+            repetition,
+            fieldName,
+            convertField("array", schema.getElementType(), REPEATED, 
schemaPath, seenSchemas));
       } else {
         return ConversionPatterns.listOfElements(
             repetition,
             fieldName,
-            convertField(AvroWriteSupport.LIST_ELEMENT_NAME, 
schema.getElementType(), schemaPath));
+            convertField(
+                AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(), 
schemaPath, seenSchemas));
       }
     } else if (type.equals(Schema.Type.MAP)) {
-      Type valType = convertField("value", schema.getValueType(), schemaPath);
+      Type valType = convertField("value", schema.getValueType(), schemaPath, 
seenSchemas);
       // avro map key type is always string
       return ConversionPatterns.stringKeyMapType(repetition, fieldName, 
valType);
     } else if (type.equals(Schema.Type.FIXED)) {
@@ -223,7 +260,7 @@ public class AvroSchemaConverter {
         builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, 
repetition).length(schema.getFixedSize());
       }
     } else if (type.equals(Schema.Type.UNION)) {
-      return convertUnion(fieldName, schema, repetition, schemaPath);
+      return convertUnion(fieldName, schema, repetition, schemaPath, 
seenSchemas);
     } else {
       throw new UnsupportedOperationException("Cannot convert Avro type " + 
type);
     }
@@ -246,6 +283,15 @@ public class AvroSchemaConverter {
   }
 
   private Type convertUnion(String fieldName, Schema schema, Type.Repetition 
repetition, String schemaPath) {
+    return convertUnion(fieldName, schema, repetition, schemaPath, new 
IdentityHashMap<Schema, Void>());
+  }
+
+  private Type convertUnion(
+      String fieldName,
+      Schema schema,
+      Type.Repetition repetition,
+      String schemaPath,
+      IdentityHashMap<Schema, Void> seenSchemas) {
     List<Schema> nonNullSchemas = new 
ArrayList<Schema>(schema.getTypes().size());
     // Found any schemas in the union? Required for the edge case, where the 
union contains only a single type.
     boolean foundNullSchema = false;
@@ -267,20 +313,31 @@ public class AvroSchemaConverter {
 
       case 1:
         return foundNullSchema
-            ? convertField(fieldName, nonNullSchemas.get(0), repetition, 
schemaPath)
-            : convertUnionToGroupType(fieldName, repetition, nonNullSchemas, 
schemaPath);
+            ? convertField(fieldName, nonNullSchemas.get(0), repetition, 
schemaPath, seenSchemas)
+            : convertUnionToGroupType(fieldName, repetition, nonNullSchemas, 
schemaPath, seenSchemas);
 
       default: // complex union type
-        return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, 
schemaPath);
+        return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, 
schemaPath, seenSchemas);
     }
   }
 
   private Type convertUnionToGroupType(
       String fieldName, Type.Repetition repetition, List<Schema> 
nonNullSchemas, String schemaPath) {
+    return convertUnionToGroupType(
+        fieldName, repetition, nonNullSchemas, schemaPath, new 
IdentityHashMap<Schema, Void>());
+  }
+
+  private Type convertUnionToGroupType(
+      String fieldName,
+      Type.Repetition repetition,
+      List<Schema> nonNullSchemas,
+      String schemaPath,
+      IdentityHashMap<Schema, Void> seenSchemas) {
     List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
     int index = 0;
     for (Schema childSchema : nonNullSchemas) {
-      unionTypes.add(convertField("member" + index++, childSchema, 
Type.Repetition.OPTIONAL, schemaPath));
+      unionTypes.add(
+          convertField("member" + index++, childSchema, 
Type.Repetition.OPTIONAL, schemaPath, seenSchemas));
     }
     return new GroupType(repetition, fieldName, unionTypes);
   }
@@ -289,6 +346,10 @@ public class AvroSchemaConverter {
     return convertField(field.name(), field.schema(), schemaPath);
   }
 
+  private Type convertField(Schema.Field field, String schemaPath, 
IdentityHashMap<Schema, Void> seenSchemas) {
+    return convertField(field.name(), field.schema(), schemaPath, seenSchemas);
+  }
+
   public Schema convert(MessageType parquetSchema) {
     return convertFields(parquetSchema.getName(), parquetSchema.getFields(), 
new HashMap<>());
   }
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 077e9cecd..d54cd4310 100644
--- 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -965,6 +965,86 @@ public class TestAvroSchemaConverter {
         () -> new AvroSchemaConverter(conf).convert(schema));
   }
 
+  @Test
+  public void testRecursiveSchemaThrowsException() {
+    String recursiveSchemaJson = "{"
+        + "\"type\": \"record\", \"name\": \"Node\", \"fields\": ["
+        + "  {\"name\": \"value\", \"type\": \"int\"},"
+        + "  {\"name\": \"children\", \"type\": ["
+        + "    \"null\", {"
+        + "      \"type\": \"array\", \"items\": [\"null\", \"Node\"]"
+        + "    }"
+        + "  ], \"default\": null}"
+        + "]}";
+
+    Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson);
+
+    assertThrows(
+        "Recursive Avro schema should throw UnsupportedOperationException for 
cycles",
+        UnsupportedOperationException.class,
+        () -> new AvroSchemaConverter().convert(recursiveSchema));
+  }
+
+  @Test
+  public void testRecursiveSchemaFromGitHubIssue() {
+    String issueSchemaJson = "{"
+        + "\"type\": \"record\", \"name\": \"ObjXX\", \"fields\": ["
+        + "  {\"name\": \"id\", \"type\": [\"null\", \"long\"], \"default\": 
null},"
+        + "  {\"name\": \"struct_add_list\", \"type\": [\"null\", {"
+        + "    \"type\": \"array\", \"items\": [\"null\", {"
+        + "      \"type\": \"record\", \"name\": \"ObjStructAdd\", \"fields\": 
["
+        + "        {\"name\": \"name\", \"type\": [\"null\", \"string\"], 
\"default\": null},"
+        + "        {\"name\": \"fld_list\", \"type\": [\"null\", {"
+        + "          \"type\": \"array\", \"items\": [\"null\", {"
+        + "            \"type\": \"record\", \"name\": \"ObjStructAddFld\", 
\"fields\": ["
+        + "              {\"name\": \"name\", \"type\": [\"null\", 
\"string\"], \"default\": null},"
+        + "              {\"name\": \"ref_val\", \"type\": [\"null\", 
\"ObjStructAdd\"], \"default\": null}"
+        + "            ]"
+        + "          }]"
+        + "        }], \"default\": null}"
+        + "      ]"
+        + "    }]"
+        + "  }], \"default\": null},"
+        + "  {\"name\": \"kafka_timestamp\", \"type\": {\"type\": \"long\", 
\"logicalType\": \"timestamp-millis\"}}"
+        + "]}";
+
+    Schema issueSchema = new Schema.Parser().parse(issueSchemaJson);
+
+    assertThrows(
+        "Schema hould throw UnsupportedOperationException for cycles",
+        UnsupportedOperationException.class,
+        () -> new AvroSchemaConverter().convert(issueSchema));
+  }
+
+  @Test
+  public void testRecursiveSchemaErrorMessage() {
+    String recursiveSchemaJson = "{"
+        + "\"type\": \"record\", \"name\": \"TestRecord\", \"fields\": ["
+        + "  {\"name\": \"self\", \"type\": [\"null\", \"TestRecord\"], 
\"default\": null}"
+        + "]}";
+
+    Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson);
+
+    // With our cycle detection fix, this should throw 
UnsupportedOperationException
+    assertThrows(
+        "Recursive schema should throw UnsupportedOperationException with 
clear error message",
+        UnsupportedOperationException.class,
+        () -> new AvroSchemaConverter().convert(recursiveSchema));
+  }
+
+  @Test
+  public void testDeeplyNestedNonRecursiveSchema() {
+    Schema level3 = record("Level3", field("value", primitive(STRING)));
+    Schema level2 = record("Level2", field("level3", level3));
+    Schema level1 = record("Level1", field("level2", level2));
+    Schema rootSchema = record("Root", field("level1", level1));
+
+    AvroSchemaConverter converter = new AvroSchemaConverter();
+    MessageType result = converter.convert(rootSchema);
+    Assert.assertNotNull("Non-recursive deep schema should convert 
successfully", result);
+    Assert.assertEquals("Root schema name should be preserved", "Root", 
result.getName());
+  }
+
   public static Schema optional(Schema original) {
     return 
Schema.createUnion(Lists.newArrayList(Schema.create(Schema.Type.NULL), 
original));
   }

Reply via email to