Hi everyone,

I have come across the below issue, while experimenting with the Confluent 
registry and avro-confluent, debezium-avro-confluent formats. Please let me 
know your thoughts on it. Should this issue be addressed?

Thanks in advance,
Fruzsina
The use case

Create a new topic on Confluent Cloud
Create a value schema with the name “sampleRecord”:
{
  "type": "record",
  "namespace": "com.mycorp.mynamespace",
  "name": "sampleRecord",
…}
Create table with “avro-confluent” format:
CREATE TABLE `newtesttopic` (
     `my_field1` INT NOT NULL,
     `my_field2` DOUBLE NOT NULL,
     `my_field3` VARCHAR(2147483647) NOT NULL,
     ") WITH (
     'connector' = 'kafka',
     'topic' = 'newtesttopic',
     'scan.startup.mode' = 'latest-offset',
     'properties.bootstrap.servers' = 'bootstrapServers',
     'properties.sasl.jaas.config' = 'saslJaasConfig',
     'properties.sasl.mechanism' = 'PLAIN',
     'properties.security.protocol' = 'SASL_SSL',
     'format' = 'avro-confluent',
     'avro-confluent.url' = 'confluentSchemaRegUrl',
     'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
     'avro-confluent.basic-auth.user-info' = 'user:pw')

Insert data into the “newtesttopic”
The following error is thrown:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Schema being registered is incompatible with an earlier schema for subject 
"newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH, 
location:/name, message:expected: com.mycorp.mynamespace.sampleRecord, 
reader:{"type":"record","name":"record",...}, 
writer:{"type":"record","name":"sampleRecord",...}
This error of course can be avoided if we don’t register a schema for our topic 
on the Confluent Cloud site before inserting data into the kafka table, and we 
just let Flink register it for us with the name “record”.

The cause of the error

I found that the error is caused by the 
EncodingFormat<SerializationSchema<RowData>> created by 
RegistryAvroFormatFactory.createEncodingFormat, because when creating a 
AvroRowDataSerializationSchema, it uses 
AvroSchemaConverter.convertToSchema(LogicalType schema) 
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java#L100>
which names the schema “record” 
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L306>
 by default.

But the registered schema is named “sampleRecord” in the above example, so the 
Confluent Schema Registry client doesn’t accept it.
The problem

To resolve this I added a new option “schema-name” to “avro-confluent” and 
“debezium-avro-confluent” formats. And as I was testing the 
“debezium-avro-confluent” format, it turned out that this solution doesn’t 
solve the problem in those cases when there are named schemas (record, enum, 
fixed types) nested in the schema of the topic.

For example:
In case of “debezium-avro-confluent” the schema created is a union of null and 
a Debezium specific record schema (before, after, op). If I use the above 
option to provide a specific name for the schema, I get an 
org.apache.avro.UnresolvedUnionException, because 
AvroRowDataSerializationSchema 
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java#L79>
 converts the RowType to a record schema with the name “record”, which will not 
be found in the union, if the the Debezium specific record has a different name.
Union type is problematic because in the general case, if we define a union 
schema [schema1, schema2]meaning that the schema is either schema1 or schema2, 
we must determine somehow which schema we are converting the RowType to.

In case of nested named schemas, Flink creates a name based on the record name 
and the field name 
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L400>.
 Schema registry client will also throw an error in this case, if the 
registered names don’t match.

Possible solutions

Look up names of the schemas in the field comment, e.g. if there is a field of 
type ROW<field1 String, field2 String> with a comment “avro-name = recordname”, 
we can use this name when converting the LogicalType to avro schema.
there could be a schema-name option for the schema of the topic / table or
the name of the topic / table schema could be defined in the table comment
Use further table options to define the schema names, e.g.:
‘avro-confluent.schema-name.record.nested_record’ = ‘nested_record_name’ (where 
record and nested_record are field names)
in this case the schema-name option is suffixed with the path to the named 
schema

Reply via email to