Hi Fruzsina,I think this is a valid issue we should try to solve. A different approach I am thinking about is that we could actually add an option to provide an entire avro schema to use. Something like: `avro-confluent.schema` which we would validate it maps properly to the schema of the table (that is names of fields and their types match) and use it instead of the generated one.
What do you think about that approach? Best, Dawid On 26/01/2023 11:29, Fruzsina Nagy wrote:
Hi everyone,
I have come across the below issue, while experimenting with the Confluent
registry and avro-confluent, debezium-avro-confluent formats. Please let me
know your thoughts on it. Should this issue be addressed?
Thanks in advance,
Fruzsina
The use case
Create a new topic on Confluent Cloud
Create a value schema with the name “sampleRecord”:
{
"type": "record",
"namespace": "com.mycorp.mynamespace",
"name": "sampleRecord",
…}
Create table with “avro-confluent” format:
CREATE TABLE `newtesttopic` (
`my_field1` INT NOT NULL,
`my_field2` DOUBLE NOT NULL,
`my_field3` VARCHAR(2147483647) NOT NULL,
") WITH (
'connector' = 'kafka',
'topic' = 'newtesttopic',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'bootstrapServers',
'properties.sasl.jaas.config' = 'saslJaasConfig',
'properties.sasl.mechanism' = 'PLAIN',
'properties.security.protocol' = 'SASL_SSL',
'format' = 'avro-confluent',
'avro-confluent.url' = 'confluentSchemaRegUrl',
'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'avro-confluent.basic-auth.user-info' = 'user:pw')
Insert data into the “newtesttopic”
The following error is thrown:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject
"newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH, location:/name, message:expected: com.mycorp.mynamespace.sampleRecord,
reader:{"type":"record","name":"record",...},
writer:{"type":"record","name":"sampleRecord",...}
This error of course can be avoided if we don’t register a schema for our topic
on the Confluent Cloud site before inserting data into the kafka table, and we
just let Flink register it for us with the name “record”.
The cause of the error
I found that the error is caused by the EncodingFormat<SerializationSchema<RowData>>
created by RegistryAvroFormatFactory.createEncodingFormat, because when creating a
AvroRowDataSerializationSchema, it uses AvroSchemaConverter.convertToSchema(LogicalType schema)
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java#L100>
which names the schema “record”
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L306>
by default.
But the registered schema is named “sampleRecord” in the above example, so the
Confluent Schema Registry client doesn’t accept it.
The problem
To resolve this I added a new option “schema-name” to “avro-confluent” and
“debezium-avro-confluent” formats. And as I was testing the
“debezium-avro-confluent” format, it turned out that this solution doesn’t
solve the problem in those cases when there are named schemas (record, enum,
fixed types) nested in the schema of the topic.
For example:
In case of “debezium-avro-confluent” the schema created is a union of null and a
Debezium specific record schema (before, after, op). If I use the above option to
provide a specific name for the schema, I get an
org.apache.avro.UnresolvedUnionException, because AvroRowDataSerializationSchema
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java#L79>
converts the RowType to a record schema with the name “record”, which will not be
found in the union, if the the Debezium specific record has a different name.
Union type is problematic because in the general case, if we define a union
schema [schema1, schema2]meaning that the schema is either schema1 or schema2,
we must determine somehow which schema we are converting the RowType to.
In case of nested named schemas, Flink creates a name based on the record name and
the field name
<https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L400>.
Schema registry client will also throw an error in this case, if the registered
names don’t match.
Possible solutions
Look up names of the schemas in the field comment, e.g. if there is a field of type
ROW<field1 String, field2 String> with a comment “avro-name = recordname”, we
can use this name when converting the LogicalType to avro schema.
there could be a schema-name option for the schema of the topic / table or
the name of the topic / table schema could be defined in the table comment
Use further table options to define the schema names, e.g.:
‘avro-confluent.schema-name.record.nested_record’ = ‘nested_record_name’ (where
record and nested_record are field names)
in this case the schema-name option is suffixed with the path to the named
schema
OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key
OpenPGP_signature
Description: OpenPGP digital signature
