Hi everyone, I have come across the below issue, while experimenting with the Confluent registry and avro-confluent, debezium-avro-confluent formats. Please let me know your thoughts on it. Should this issue be addressed?
Thanks in advance, Fruzsina The use case Create a new topic on Confluent Cloud Create a value schema with the name “sampleRecord”: { "type": "record", "namespace": "com.mycorp.mynamespace", "name": "sampleRecord", …} Create table with “avro-confluent” format: CREATE TABLE `newtesttopic` ( `my_field1` INT NOT NULL, `my_field2` DOUBLE NOT NULL, `my_field3` VARCHAR(2147483647) NOT NULL, ") WITH ( 'connector' = 'kafka', 'topic' = 'newtesttopic', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'bootstrapServers', 'properties.sasl.jaas.config' = 'saslJaasConfig', 'properties.sasl.mechanism' = 'PLAIN', 'properties.security.protocol' = 'SASL_SSL', 'format' = 'avro-confluent', 'avro-confluent.url' = 'confluentSchemaRegUrl', 'avro-confluent.basic-auth.credentials-source' = 'USER_INFO', 'avro-confluent.basic-auth.user-info' = 'user:pw') Insert data into the “newtesttopic” The following error is thrown: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH, location:/name, message:expected: com.mycorp.mynamespace.sampleRecord, reader:{"type":"record","name":"record",...}, writer:{"type":"record","name":"sampleRecord",...} This error of course can be avoided if we don’t register a schema for our topic on the Confluent Cloud site before inserting data into the kafka table, and we just let Flink register it for us with the name “record”. The cause of the error I found that the error is caused by the EncodingFormat<SerializationSchema<RowData>> created by RegistryAvroFormatFactory.createEncodingFormat, because when creating a AvroRowDataSerializationSchema, it uses AvroSchemaConverter.convertToSchema(LogicalType schema) <https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java#L100> which names the schema “record” <https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L306> by default. But the registered schema is named “sampleRecord” in the above example, so the Confluent Schema Registry client doesn’t accept it. The problem To resolve this I added a new option “schema-name” to “avro-confluent” and “debezium-avro-confluent” formats. And as I was testing the “debezium-avro-confluent” format, it turned out that this solution doesn’t solve the problem in those cases when there are named schemas (record, enum, fixed types) nested in the schema of the topic. For example: In case of “debezium-avro-confluent” the schema created is a union of null and a Debezium specific record schema (before, after, op). If I use the above option to provide a specific name for the schema, I get an org.apache.avro.UnresolvedUnionException, because AvroRowDataSerializationSchema <https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java#L79> converts the RowType to a record schema with the name “record”, which will not be found in the union, if the the Debezium specific record has a different name. Union type is problematic because in the general case, if we define a union schema [schema1, schema2]meaning that the schema is either schema1 or schema2, we must determine somehow which schema we are converting the RowType to. In case of nested named schemas, Flink creates a name based on the record name and the field name <https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L400>. Schema registry client will also throw an error in this case, if the registered names don’t match. Possible solutions Look up names of the schemas in the field comment, e.g. if there is a field of type ROW<field1 String, field2 String> with a comment “avro-name = recordname”, we can use this name when converting the LogicalType to avro schema. there could be a schema-name option for the schema of the topic / table or the name of the topic / table schema could be defined in the table comment Use further table options to define the schema names, e.g.: ‘avro-confluent.schema-name.record.nested_record’ = ‘nested_record_name’ (where record and nested_record are field names) in this case the schema-name option is suffixed with the path to the named schema