Hi Fruzsina,

I think this is a valid issue we should try to solve. A different approach I am thinking about is that we could actually add an option to provide an entire avro schema to use. Something like: `avro-confluent.schema` which we would validate it maps properly to the schema of the table (that is names of fields and their types match) and use it instead of the generated one.

What do you think about that approach?

Best,

Dawid

On 26/01/2023 11:29, Fruzsina Nagy wrote:
Hi everyone,

I have come across the below issue, while experimenting with the Confluent 
registry and avro-confluent, debezium-avro-confluent formats. Please let me 
know your thoughts on it. Should this issue be addressed?

Thanks in advance,
Fruzsina
The use case

Create a new topic on Confluent Cloud
Create a value schema with the name “sampleRecord”:
{
   "type": "record",
   "namespace": "com.mycorp.mynamespace",
   "name": "sampleRecord",
…}
Create table with “avro-confluent” format:
CREATE TABLE `newtesttopic` (
      `my_field1` INT NOT NULL,
      `my_field2` DOUBLE NOT NULL,
      `my_field3` VARCHAR(2147483647) NOT NULL,
      ") WITH (
      'connector' = 'kafka',
      'topic' = 'newtesttopic',
      'scan.startup.mode' = 'latest-offset',
      'properties.bootstrap.servers' = 'bootstrapServers',
      'properties.sasl.jaas.config' = 'saslJaasConfig',
      'properties.sasl.mechanism' = 'PLAIN',
      'properties.security.protocol' = 'SASL_SSL',
      'format' = 'avro-confluent',
      'avro-confluent.url' = 'confluentSchemaRegUrl',
      'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
      'avro-confluent.basic-auth.user-info' = 'user:pw')

Insert data into the “newtesttopic”
The following error is thrown:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject 
"newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH, location:/name, message:expected: com.mycorp.mynamespace.sampleRecord, 
reader:{"type":"record","name":"record",...}, 
writer:{"type":"record","name":"sampleRecord",...}
This error of course can be avoided if we don’t register a schema for our topic 
on the Confluent Cloud site before inserting data into the kafka table, and we 
just let Flink register it for us with the name “record”.

The cause of the error

I found that the error is caused by the EncodingFormat<SerializationSchema<RowData>> 
created by RegistryAvroFormatFactory.createEncodingFormat, because when creating a 
AvroRowDataSerializationSchema, it uses AvroSchemaConverter.convertToSchema(LogicalType schema) 
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java#L100>
which names the schema “record” 
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L306>
 by default.

But the registered schema is named “sampleRecord” in the above example, so the 
Confluent Schema Registry client doesn’t accept it.
The problem

To resolve this I added a new option “schema-name” to “avro-confluent” and 
“debezium-avro-confluent” formats. And as I was testing the 
“debezium-avro-confluent” format, it turned out that this solution doesn’t 
solve the problem in those cases when there are named schemas (record, enum, 
fixed types) nested in the schema of the topic.

For example:
In case of “debezium-avro-confluent” the schema created is a union of null and a 
Debezium specific record schema (before, after, op). If I use the above option to 
provide a specific name for the schema, I get an 
org.apache.avro.UnresolvedUnionException, because AvroRowDataSerializationSchema 
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java#L79>
 converts the RowType to a record schema with the name “record”, which will not be 
found in the union, if the the Debezium specific record has a different name.
Union type is problematic because in the general case, if we define a union 
schema [schema1, schema2]meaning that the schema is either schema1 or schema2, 
we must determine somehow which schema we are converting the RowType to.

In case of nested named schemas, Flink creates a name based on the record name and 
the field name 
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L400>.
 Schema registry client will also throw an error in this case, if the registered 
names don’t match.

Possible solutions

Look up names of the schemas in the field comment, e.g. if there is a field of type 
ROW<field1 String, field2 String> with a comment “avro-name = recordname”, we 
can use this name when converting the LogicalType to avro schema.
there could be a schema-name option for the schema of the topic / table or
the name of the topic / table schema could be defined in the table comment
Use further table options to define the schema names, e.g.:
‘avro-confluent.schema-name.record.nested_record’ = ‘nested_record_name’ (where 
record and nested_record are field names)
in this case the schema-name option is suffixed with the path to the named 
schema


Attachment: OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to