Hello Dawid,
I created a pull request based on your suggestion.
https://github.com/apache/flink/pull/21942
I'd appreciate it if you could have a look at it.
Thanks in advance.
Best regards,
Fruzsina

On Mon, Feb 6, 2023 at 11:34 AM Fruzsina Nagy <fru...@cloudera.com> wrote:

> Hi Dawid,
> Thanks for the suggestion, it’s worth a try. I’ll have a look at it.
> I assume this ’schema’ option would not be required and the current logic
> with the default name ‘record’ would be used, if the schema is not provided
> explicitly.
> Best regards,
> Fruzsina
>
> On 2023/01/27 13:14:39 Dawid Wysakowicz wrote:
> > Hi Fruzsina,
> >
> > I think this is a valid issue we should try to solve. A different
> > approach I am thinking about is that we could actually add an option to
> > provide an entire avro schema to use. Something like:
> > `avro-confluent.schema` which we would validate it maps properly to the
> > schema of the table (that is names of fields and their types match) and
> > use it instead of the generated one.
> >
> > What do you think about that approach?
> >
> > Best,
> >
> > Dawid
> >
> > On 26/01/2023 11:29, Fruzsina Nagy wrote:
> > > Hi everyone,
> > >
> > > I have come across the below issue, while experimenting with the
> Confluent registry and avro-confluent, debezium-avro-confluent formats.
> Please let me know your thoughts on it. Should this issue be addressed?
> > >
> > > Thanks in advance,
> > > Fruzsina
> > > The use case
> > >
> > > Create a new topic on Confluent Cloud
> > > Create a value schema with the name “sampleRecord”:
> > > {
> > >    "type": "record",
> > >    "namespace": "com.mycorp.mynamespace",
> > >    "name": "sampleRecord",
> > > …}
> > > Create table with “avro-confluent” format:
> > > CREATE TABLE `newtesttopic` (
> > >       `my_field1` INT NOT NULL,
> > >       `my_field2` DOUBLE NOT NULL,
> > >       `my_field3` VARCHAR(2147483647) NOT NULL,
> > >       ") WITH (
> > >       'connector' = 'kafka',
> > >       'topic' = 'newtesttopic',
> > >       'scan.startup.mode' = 'latest-offset',
> > >       'properties.bootstrap.servers' = 'bootstrapServers',
> > >       'properties.sasl.jaas.config' = 'saslJaasConfig',
> > >       'properties.sasl.mechanism' = 'PLAIN',
> > >       'properties.security.protocol' = 'SASL_SSL',
> > >       'format' = 'avro-confluent',
> > >       'avro-confluent.url' = 'confluentSchemaRegUrl',
> > >       'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
> > >       'avro-confluent.basic-auth.user-info' = 'user:pw')
> > >
> > > Insert data into the “newtesttopic”
> > > The following error is thrown:
> > >
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> Schema being registered is incompatible with an earlier schema for subject
> "newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH,
> location:/name, message:expected: com.mycorp.mynamespace.sampleRecord,
> reader:{"type":"record","name":"record",...},
> writer:{"type":"record","name":"sampleRecord",...}
> > > This error of course can be avoided if we don’t register a schema for
> our topic on the Confluent Cloud site before inserting data into the kafka
> table, and we just let Flink register it for us with the name “record”.
> > >
> > > The cause of the error
> > >
> > > I found that the error is caused by the
> EncodingFormat<SerializationSchema<RowData>> created by
> RegistryAvroFormatFactory.createEncodingFormat, because when creating a
> AvroRowDataSerializationSchema, it uses
> AvroSchemaConverter.convertToSchema(LogicalType schema) <
> https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java#L100
> >
> > > which names the schema “record” <
> https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L306>
> by default.
> > >
> > > But the registered schema is named “sampleRecord” in the above
> example, so the Confluent Schema Registry client doesn’t accept it.
> > > The problem
> > >
> > > To resolve this I added a new option “schema-name” to “avro-confluent”
> and “debezium-avro-confluent” formats. And as I was testing the
> “debezium-avro-confluent” format, it turned out that this solution doesn’t
> solve the problem in those cases when there are named schemas (record,
> enum, fixed types) nested in the schema of the topic.
> > >
> > > For example:
> > > In case of “debezium-avro-confluent” the schema created is a union of
> null and a Debezium specific record schema (before, after, op). If I use
> the above option to provide a specific name for the schema, I get an
> org.apache.avro.UnresolvedUnionException, because
> AvroRowDataSerializationSchema <
> https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java#L79>
> converts the RowType to a record schema with the name “record”, which will
> not be found in the union, if the the Debezium specific record has a
> different name.
> > > Union type is problematic because in the general case, if we define a
> union schema [schema1, schema2]meaning that the schema is either schema1 or
> schema2, we must determine somehow which schema we are converting the
> RowType to.
> > >
> > > In case of nested named schemas, Flink creates a name based on the
> record name and the field name <
> https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L400>.
> Schema registry client will also throw an error in this case, if the
> registered names don’t match.
> > >
> > > Possible solutions
> > >
> > > Look up names of the schemas in the field comment, e.g. if there is a
> field of type ROW<field1 String, field2 String> with a comment “avro-name =
> recordname”, we can use this name when converting the LogicalType to avro
> schema.
> > > there could be a schema-name option for the schema of the topic /
> table or
> > > the name of the topic / table schema could be defined in the table
> comment
> > > Use further table options to define the schema names, e.g.:
> > > ‘avro-confluent.schema-name.record.nested_record’ =
> ‘nested_record_name’ (where record and nested_record are field names)
> > > in this case the schema-name option is suffixed with the path to the
> named schema
> > >
> > >
> >

Reply via email to