Hello Dawid, I created a pull request based on your suggestion. https://github.com/apache/flink/pull/21942 I'd appreciate it if you could have a look at it. Thanks in advance. Best regards, Fruzsina
On Mon, Feb 6, 2023 at 11:34 AM Fruzsina Nagy <fru...@cloudera.com> wrote: > Hi Dawid, > Thanks for the suggestion, it’s worth a try. I’ll have a look at it. > I assume this ’schema’ option would not be required and the current logic > with the default name ‘record’ would be used, if the schema is not provided > explicitly. > Best regards, > Fruzsina > > On 2023/01/27 13:14:39 Dawid Wysakowicz wrote: > > Hi Fruzsina, > > > > I think this is a valid issue we should try to solve. A different > > approach I am thinking about is that we could actually add an option to > > provide an entire avro schema to use. Something like: > > `avro-confluent.schema` which we would validate it maps properly to the > > schema of the table (that is names of fields and their types match) and > > use it instead of the generated one. > > > > What do you think about that approach? > > > > Best, > > > > Dawid > > > > On 26/01/2023 11:29, Fruzsina Nagy wrote: > > > Hi everyone, > > > > > > I have come across the below issue, while experimenting with the > Confluent registry and avro-confluent, debezium-avro-confluent formats. > Please let me know your thoughts on it. Should this issue be addressed? > > > > > > Thanks in advance, > > > Fruzsina > > > The use case > > > > > > Create a new topic on Confluent Cloud > > > Create a value schema with the name “sampleRecord”: > > > { > > > "type": "record", > > > "namespace": "com.mycorp.mynamespace", > > > "name": "sampleRecord", > > > …} > > > Create table with “avro-confluent” format: > > > CREATE TABLE `newtesttopic` ( > > > `my_field1` INT NOT NULL, > > > `my_field2` DOUBLE NOT NULL, > > > `my_field3` VARCHAR(2147483647) NOT NULL, > > > ") WITH ( > > > 'connector' = 'kafka', > > > 'topic' = 'newtesttopic', > > > 'scan.startup.mode' = 'latest-offset', > > > 'properties.bootstrap.servers' = 'bootstrapServers', > > > 'properties.sasl.jaas.config' = 'saslJaasConfig', > > > 'properties.sasl.mechanism' = 'PLAIN', > > > 'properties.security.protocol' = 'SASL_SSL', > > > 'format' = 'avro-confluent', > > > 'avro-confluent.url' = 'confluentSchemaRegUrl', > > > 'avro-confluent.basic-auth.credentials-source' = 'USER_INFO', > > > 'avro-confluent.basic-auth.user-info' = 'user:pw') > > > > > > Insert data into the “newtesttopic” > > > The following error is thrown: > > > > io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: > Schema being registered is incompatible with an earlier schema for subject > "newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH, > location:/name, message:expected: com.mycorp.mynamespace.sampleRecord, > reader:{"type":"record","name":"record",...}, > writer:{"type":"record","name":"sampleRecord",...} > > > This error of course can be avoided if we don’t register a schema for > our topic on the Confluent Cloud site before inserting data into the kafka > table, and we just let Flink register it for us with the name “record”. > > > > > > The cause of the error > > > > > > I found that the error is caused by the > EncodingFormat<SerializationSchema<RowData>> created by > RegistryAvroFormatFactory.createEncodingFormat, because when creating a > AvroRowDataSerializationSchema, it uses > AvroSchemaConverter.convertToSchema(LogicalType schema) < > https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java#L100 > > > > > which names the schema “record” < > https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L306> > by default. > > > > > > But the registered schema is named “sampleRecord” in the above > example, so the Confluent Schema Registry client doesn’t accept it. > > > The problem > > > > > > To resolve this I added a new option “schema-name” to “avro-confluent” > and “debezium-avro-confluent” formats. And as I was testing the > “debezium-avro-confluent” format, it turned out that this solution doesn’t > solve the problem in those cases when there are named schemas (record, > enum, fixed types) nested in the schema of the topic. > > > > > > For example: > > > In case of “debezium-avro-confluent” the schema created is a union of > null and a Debezium specific record schema (before, after, op). If I use > the above option to provide a specific name for the schema, I get an > org.apache.avro.UnresolvedUnionException, because > AvroRowDataSerializationSchema < > https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java#L79> > converts the RowType to a record schema with the name “record”, which will > not be found in the union, if the the Debezium specific record has a > different name. > > > Union type is problematic because in the general case, if we define a > union schema [schema1, schema2]meaning that the schema is either schema1 or > schema2, we must determine somehow which schema we are converting the > RowType to. > > > > > > In case of nested named schemas, Flink creates a name based on the > record name and the field name < > https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L400>. > Schema registry client will also throw an error in this case, if the > registered names don’t match. > > > > > > Possible solutions > > > > > > Look up names of the schemas in the field comment, e.g. if there is a > field of type ROW<field1 String, field2 String> with a comment “avro-name = > recordname”, we can use this name when converting the LogicalType to avro > schema. > > > there could be a schema-name option for the schema of the topic / > table or > > > the name of the topic / table schema could be defined in the table > comment > > > Use further table options to define the schema names, e.g.: > > > ‘avro-confluent.schema-name.record.nested_record’ = > ‘nested_record_name’ (where record and nested_record are field names) > > > in this case the schema-name option is suffixed with the path to the > named schema > > > > > > > >