JimmyWang6 commented on code in PR #19449:
URL: https://github.com/apache/kafka/pull/19449#discussion_r2321450794


##########
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java:
##########
@@ -340,13 +351,16 @@ public SchemaAndValue toConnectData(String topic, byte[] 
value) {
             throw new DataException("Converting byte[] to Kafka Connect data 
failed due to serialization error: ", e);
         }
 
-        if (config.schemasEnabled() && (!jsonValue.isObject() || 
jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) 
|| !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)))
-            throw new DataException("JsonConverter with schemas.enable 
requires \"schema\" and \"payload\" fields and may not contain additional 
fields." +
+        if (config.schemasEnabled()) {
+            if (schema != null) {
+                return new SchemaAndValue(schema, convertToConnect(schema, 
jsonValue, config));

Review Comment:
   Hi @chia7712, I just noticed that if `jsonValue` contains valid schema and 
payload fields and `schema.content` is null, current logic will go down to the 
end of this method:
   ```java
   Schema schema = 
asConnectSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
           return new SchemaAndValue(
                   schema,
                   convertToConnect(schema, 
jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME), config)
           );
   ``` 
   In this case, the method uses the corresponding schemas within each of the 
serialized values and keys, which I think maybe it's the expected behavior. 
What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to