C0urante commented on code in PR #16161: URL: https://github.com/apache/kafka/pull/16161#discussion_r1626470099
########## connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java: ########## @@ -330,4 +330,127 @@ public void testEmptyStruct() { new Struct(emptyStruct); } + private void assertInvalidValueForSchema(String fieldName, Schema schema, Object value, String message) { + Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, schema, value)); + assertEquals(message, e.getMessage()); + } + + @Test + public void testValidateFieldWithInvalidValueType() { + String fieldName = "field"; + assertInvalidValueForSchema(fieldName, new FakeSchema(), new Object(), + "Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\""); + assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(), + "Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\""); + assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(), + "Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\""); Review Comment: These two lines are identical. I think in the original test case in `StructTest` one of the invocations tested for the variant of `ConnectSchema::validate` that didn't accept a field name. ########## connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java: ########## @@ -251,19 +251,30 @@ public static void validateValue(String name, Schema schema, Object value) { break; case ARRAY: List<?> array = (List<?>) value; - for (Object entry : array) - validateValue(schema.valueSchema(), entry); + Schema arrayValueSchema = assertSchemaNotNull(name, "elements", schema.valueSchema()); + for (Object entry : array) { + validateValue("entry", arrayValueSchema, entry); Review Comment: The field name doesn't seem like it'll be very useful here; can we just use the nameless variant? Something like "array element" could possibly work but too but it'd still lack the name of the field, which is IMO the only really useful piece of info. It also seems a little strange with the error messages implying that the problematic value is a top-level value for the field, instead of a sub-value of it: `Invalid Java object for schema with type STRING: class java.lang.Boolean for field: "entry"`. Something like `Invalid Java object for schema with type STRING: class java.lang.Boolean for element of array field: "field"` could be less ambiguous. ########## connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java: ########## @@ -251,19 +251,30 @@ public static void validateValue(String name, Schema schema, Object value) { break; case ARRAY: List<?> array = (List<?>) value; - for (Object entry : array) - validateValue(schema.valueSchema(), entry); + Schema arrayValueSchema = assertSchemaNotNull(name, "elements", schema.valueSchema()); + for (Object entry : array) { + validateValue("entry", arrayValueSchema, entry); + } break; case MAP: Map<?, ?> map = (Map<?, ?>) value; + Schema mapKeySchema = assertSchemaNotNull(name, "keys", schema.keySchema()); + Schema mapValueSchema = assertSchemaNotNull(name, "values", schema.valueSchema()); for (Map.Entry<?, ?> entry : map.entrySet()) { - validateValue(schema.keySchema(), entry.getKey()); - validateValue(schema.valueSchema(), entry.getValue()); + validateValue("key", mapKeySchema, entry.getKey()); + validateValue("value", mapValueSchema, entry.getValue()); Review Comment: Same thoughts RE names, with "map key" and "map value" as possible alternatives. ########## connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java: ########## @@ -330,4 +330,127 @@ public void testEmptyStruct() { new Struct(emptyStruct); } + private void assertInvalidValueForSchema(String fieldName, Schema schema, Object value, String message) { + Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, schema, value)); + assertEquals(message, e.getMessage()); + } + + @Test + public void testValidateFieldWithInvalidValueType() { + String fieldName = "field"; + assertInvalidValueForSchema(fieldName, new FakeSchema(), new Object(), + "Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\""); + assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(), + "Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\""); + assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(), + "Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\""); + } + + @Test + public void testValidateFieldWithInvalidValueMismatchTimestamp() { + long longValue = 1000L; + String fieldName = "field"; + + ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue); + + assertInvalidValueForSchema(fieldName, Timestamp.SCHEMA, longValue, + "Invalid Java object for schema \"org.apache.kafka.connect.data.Timestamp\" " + + "with type INT64: class java.lang.Long for field: \"field\""); + } + + @Test + public void testValidateList() { + String fieldName = "field"; + + // Optional element schema + Schema optionalStrings = SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.emptyList()); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonList("hello")); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonList(null)); + ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", "world")); + ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", null)); + ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList(null, "world")); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"entry\""); + + // Required element schema + Schema requiredStrings = SchemaBuilder.array(Schema.STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyList()); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonList("hello")); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonList(null), + "Invalid value: null used for required field: \"entry\", schema type: STRING"); + ConnectSchema.validateValue(fieldName, requiredStrings, Arrays.asList("hello", "world")); + assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList("hello", null), + "Invalid value: null used for required field: \"entry\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList(null, "world"), + "Invalid value: null used for required field: \"entry\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"entry\""); + + // Null element schema + Schema nullElements = SchemaBuilder.type(Schema.Type.ARRAY); + assertInvalidValueForSchema(fieldName, nullElements, Collections.emptyList(), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList("hello"), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(null), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", "world"), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", null), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList(null, "world"), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(true), + "No schema defined for elements of field: \"field\""); + } + + @Test + public void testValidateMap() { + String fieldName = "field"; + + // Optional element schema + Schema optionalStrings = SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.emptyMap()); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap("key", "value")); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap("key", null)); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, "value")); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, null)); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap("key", true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"value\""); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap(true, "value"), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"key\""); + + // Required element schema + Schema requiredStrings = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyMap()); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonMap("key", "value")); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", null), + "Invalid value: null used for required field: \"value\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, "value"), + "Invalid value: null used for required field: \"key\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, null), + "Invalid value: null used for required field: \"key\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"value\""); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(true, "value"), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"key\""); + + // Null element schema + Schema nullElements = SchemaBuilder.type(Schema.Type.MAP); + assertInvalidValueForSchema(fieldName, nullElements, Collections.emptyMap(), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap("key", "value"), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap("key", null), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap(null, "value"), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap(null, null), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap("key", true), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap(true, "value"), + "No schema defined for keys of field: \"field\""); Review Comment: If it's not too much work, it could also be nice to exercise the code path where there's a key schema defined but no value schema. But only if it's a super easy addition, otherwise not worth the work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org