C0urante commented on code in PR #16161:
URL: https://github.com/apache/kafka/pull/16161#discussion_r1626470099


##########
connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java:
##########
@@ -330,4 +330,127 @@ public void testEmptyStruct() {
         new Struct(emptyStruct);
     }
 
+    private void assertInvalidValueForSchema(String fieldName, Schema schema, 
Object value, String message) {
+        Exception e = assertThrows(DataException.class, () -> 
ConnectSchema.validateValue(fieldName, schema, value));
+        assertEquals(message, e.getMessage());
+    }
+
+    @Test
+    public void testValidateFieldWithInvalidValueType() {
+        String fieldName = "field";
+        assertInvalidValueForSchema(fieldName, new FakeSchema(), new Object(),
+                "Invalid Java object for schema \"fake\" with type null: class 
java.lang.Object for field: \"field\"");
+        assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new 
Object(),
+                "Invalid Java object for schema with type INT8: class 
java.lang.Object for field: \"field\"");
+        assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new 
Object(),
+                "Invalid Java object for schema with type INT8: class 
java.lang.Object for field: \"field\"");

Review Comment:
   These two lines are identical. I think in the original test case in 
`StructTest` one of the invocations tested for the variant of 
`ConnectSchema::validate` that didn't accept a field name.



##########
connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java:
##########
@@ -251,19 +251,30 @@ public static void validateValue(String name, Schema 
schema, Object value) {
                 break;
             case ARRAY:
                 List<?> array = (List<?>) value;
-                for (Object entry : array)
-                    validateValue(schema.valueSchema(), entry);
+                Schema arrayValueSchema = assertSchemaNotNull(name, 
"elements", schema.valueSchema());
+                for (Object entry : array) {
+                    validateValue("entry", arrayValueSchema, entry);

Review Comment:
   The field name doesn't seem like it'll be very useful here; can we just use 
the nameless variant? Something like "array element" could possibly work but 
too but it'd still lack the name of the field, which is IMO the only really 
useful piece of info.
   
   It also seems a little strange with the error messages implying that the 
problematic value is a top-level value for the field, instead of a sub-value of 
it: `Invalid Java object for schema with type STRING: class java.lang.Boolean 
for field: "entry"`. Something like `Invalid Java object for schema with type 
STRING: class java.lang.Boolean for element of array field: "field"` could be 
less ambiguous.



##########
connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java:
##########
@@ -251,19 +251,30 @@ public static void validateValue(String name, Schema 
schema, Object value) {
                 break;
             case ARRAY:
                 List<?> array = (List<?>) value;
-                for (Object entry : array)
-                    validateValue(schema.valueSchema(), entry);
+                Schema arrayValueSchema = assertSchemaNotNull(name, 
"elements", schema.valueSchema());
+                for (Object entry : array) {
+                    validateValue("entry", arrayValueSchema, entry);
+                }
                 break;
             case MAP:
                 Map<?, ?> map = (Map<?, ?>) value;
+                Schema mapKeySchema = assertSchemaNotNull(name, "keys", 
schema.keySchema());
+                Schema mapValueSchema = assertSchemaNotNull(name, "values", 
schema.valueSchema());
                 for (Map.Entry<?, ?> entry : map.entrySet()) {
-                    validateValue(schema.keySchema(), entry.getKey());
-                    validateValue(schema.valueSchema(), entry.getValue());
+                    validateValue("key", mapKeySchema, entry.getKey());
+                    validateValue("value", mapValueSchema, entry.getValue());

Review Comment:
   Same thoughts RE names, with "map key" and "map value" as possible 
alternatives.



##########
connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java:
##########
@@ -330,4 +330,127 @@ public void testEmptyStruct() {
         new Struct(emptyStruct);
     }
 
+    private void assertInvalidValueForSchema(String fieldName, Schema schema, 
Object value, String message) {
+        Exception e = assertThrows(DataException.class, () -> 
ConnectSchema.validateValue(fieldName, schema, value));
+        assertEquals(message, e.getMessage());
+    }
+
+    @Test
+    public void testValidateFieldWithInvalidValueType() {
+        String fieldName = "field";
+        assertInvalidValueForSchema(fieldName, new FakeSchema(), new Object(),
+                "Invalid Java object for schema \"fake\" with type null: class 
java.lang.Object for field: \"field\"");
+        assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new 
Object(),
+                "Invalid Java object for schema with type INT8: class 
java.lang.Object for field: \"field\"");
+        assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new 
Object(),
+                "Invalid Java object for schema with type INT8: class 
java.lang.Object for field: \"field\"");
+    }
+
+    @Test
+    public void testValidateFieldWithInvalidValueMismatchTimestamp() {
+        long longValue = 1000L;
+        String fieldName = "field";
+
+        ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue);
+
+        assertInvalidValueForSchema(fieldName, Timestamp.SCHEMA, longValue,
+                "Invalid Java object for schema 
\"org.apache.kafka.connect.data.Timestamp\" " +
+                        "with type INT64: class java.lang.Long for field: 
\"field\"");
+    }
+
+    @Test
+    public void testValidateList() {
+        String fieldName = "field";
+
+        // Optional element schema
+        Schema optionalStrings = 
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA);
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Collections.emptyList());
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Collections.singletonList("hello"));
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Collections.singletonList(null));
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Arrays.asList("hello", "world"));
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Arrays.asList("hello", null));
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Arrays.asList(null, "world"));
+        assertInvalidValueForSchema(fieldName, optionalStrings, 
Collections.singletonList(true),
+                "Invalid Java object for schema with type STRING: class 
java.lang.Boolean for field: \"entry\"");
+
+        // Required element schema
+        Schema requiredStrings = SchemaBuilder.array(Schema.STRING_SCHEMA);
+        ConnectSchema.validateValue(fieldName, requiredStrings, 
Collections.emptyList());
+        ConnectSchema.validateValue(fieldName, requiredStrings, 
Collections.singletonList("hello"));
+        assertInvalidValueForSchema(fieldName, requiredStrings, 
Collections.singletonList(null),
+                "Invalid value: null used for required field: \"entry\", 
schema type: STRING");
+        ConnectSchema.validateValue(fieldName, requiredStrings, 
Arrays.asList("hello", "world"));
+        assertInvalidValueForSchema(fieldName, requiredStrings, 
Arrays.asList("hello", null),
+                "Invalid value: null used for required field: \"entry\", 
schema type: STRING");
+        assertInvalidValueForSchema(fieldName, requiredStrings, 
Arrays.asList(null, "world"),
+                "Invalid value: null used for required field: \"entry\", 
schema type: STRING");
+        assertInvalidValueForSchema(fieldName, optionalStrings, 
Collections.singletonList(true),
+                "Invalid Java object for schema with type STRING: class 
java.lang.Boolean for field: \"entry\"");
+
+        // Null element schema
+        Schema nullElements = SchemaBuilder.type(Schema.Type.ARRAY);
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.emptyList(),
+                "No schema defined for elements of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.singletonList("hello"),
+                "No schema defined for elements of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.singletonList(null),
+                "No schema defined for elements of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Arrays.asList("hello", "world"),
+                "No schema defined for elements of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Arrays.asList("hello", null),
+                "No schema defined for elements of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Arrays.asList(null, "world"),
+                "No schema defined for elements of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.singletonList(true),
+                "No schema defined for elements of field: \"field\"");
+    }
+
+    @Test
+    public void testValidateMap() {
+        String fieldName = "field";
+
+        // Optional element schema
+        Schema optionalStrings = 
SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Collections.emptyMap());
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Collections.singletonMap("key", "value"));
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Collections.singletonMap("key", null));
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Collections.singletonMap(null, "value"));
+        ConnectSchema.validateValue(fieldName, optionalStrings, 
Collections.singletonMap(null, null));
+        assertInvalidValueForSchema(fieldName, optionalStrings, 
Collections.singletonMap("key", true),
+                "Invalid Java object for schema with type STRING: class 
java.lang.Boolean for field: \"value\"");
+        assertInvalidValueForSchema(fieldName, optionalStrings, 
Collections.singletonMap(true, "value"),
+                "Invalid Java object for schema with type STRING: class 
java.lang.Boolean for field: \"key\"");
+
+        // Required element schema
+        Schema requiredStrings = SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.STRING_SCHEMA);
+        ConnectSchema.validateValue(fieldName, requiredStrings, 
Collections.emptyMap());
+        ConnectSchema.validateValue(fieldName, requiredStrings, 
Collections.singletonMap("key", "value"));
+        assertInvalidValueForSchema(fieldName, requiredStrings, 
Collections.singletonMap("key", null),
+                "Invalid value: null used for required field: \"value\", 
schema type: STRING");
+        assertInvalidValueForSchema(fieldName, requiredStrings, 
Collections.singletonMap(null, "value"),
+                "Invalid value: null used for required field: \"key\", schema 
type: STRING");
+        assertInvalidValueForSchema(fieldName, requiredStrings, 
Collections.singletonMap(null, null),
+                "Invalid value: null used for required field: \"key\", schema 
type: STRING");
+        assertInvalidValueForSchema(fieldName, requiredStrings, 
Collections.singletonMap("key", true),
+                "Invalid Java object for schema with type STRING: class 
java.lang.Boolean for field: \"value\"");
+        assertInvalidValueForSchema(fieldName, requiredStrings, 
Collections.singletonMap(true, "value"),
+                "Invalid Java object for schema with type STRING: class 
java.lang.Boolean for field: \"key\"");
+
+        // Null element schema
+        Schema nullElements = SchemaBuilder.type(Schema.Type.MAP);
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.emptyMap(),
+                "No schema defined for keys of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.singletonMap("key", "value"),
+                "No schema defined for keys of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.singletonMap("key", null),
+                "No schema defined for keys of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.singletonMap(null, "value"),
+                "No schema defined for keys of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.singletonMap(null, null),
+                "No schema defined for keys of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.singletonMap("key", true),
+                "No schema defined for keys of field: \"field\"");
+        assertInvalidValueForSchema(fieldName, nullElements, 
Collections.singletonMap(true, "value"),
+                "No schema defined for keys of field: \"field\"");

Review Comment:
   If it's not too much work, it could also be nice to exercise the code path 
where there's a key schema defined but no value schema. But only if it's a 
super easy addition, otherwise not worth the work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to