fruska opened a new pull request, #21942: URL: https://github.com/apache/flink/pull/21942
## What is the purpose of the change When using `avro-confluent` and `debezium-avro-confluent` formats with schemas already defined in the Confluent Schema Registry, serialization fails, because Flink uses a default name `record` when converting row types to avro schemas. So if the predefined schema has a different name, the serialization schema will be incompatible with the registered schema due to name mismatch. Check [this description](https://lists.apache.org/thread/5xppmnqjqwfzxqo4gvd3lzz8wzs566zp) to reproduce the error. ## Brief change log - A new optional option `schema` can be defined for `avro-confluent` and `debezium-avro-confluent` formats to ensure that the proper schema is used for serialization and deserialization. - When schema is defined, it is validated against the table schema and if valid it's used for serialization and deserialization instead of the default conversion. When no schema is defined, the schema is created based on the row type. - The `open` method of `AvroRowDataSerialization` schema makes sure to use the schema of the nested schema if the nested schema is an `AvroSerializationSchema`. - The `open` method of `AvroSerializationSchema` makes sure that the schema provided is defined, so it can be used by `AvroRowDataSerialization`. ## Verifying this change This change added tests and can be verified as follows: - Extended `RegistryAvroFormatFactoryTest.testDeserializationSchemaWithOptionalProperties` and `RegistryAvroFormatFactoryTest.testSerializationSchemaWithOptionalProperties` with the schema option - Added test to `RegistryAvroFormatFactoryTest` that validates that exception is thrown in case of an invalid schema - Added test that validate that the constructor of `DebeziumAvroSerializationSchema` and `DebeziumAvroDeserializationSchema` creates the proper schema when the schema string is provided - Extended `DebeziumAvroFormatFactoryTest.testSeDeSchemaWithSchemaOption` with the schema option - Added test to `DebeziumAvroFormatFactoryTest` that validates that exception is thrown in case of an invalid schema - Added test to `AvroRowDataDeSerializationSchemaTest` that validates that record is deserialized and serialized properly based on nullable schema with a name that is not "record" - Added test to `AvroSerializationSchemaTest` that validates the `open` method of `AvroSerializationSchema` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Added a description for the new `schema` option of `avro-confluent` and `debezium-avro-confluent` formats. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
