I would love to get some feedback from the community on this JIRA issue: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
I am looking into creating a PR and would appreciate some review on the approach. In terms of design I think we can mirror the `debezium-avro-confluent` and `avro-confluent` formats already available in Flink: 1. `protobuf-confluent` format which uses DynamicMessage <https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage> for encoding and decoding. - For encoding the Flink RowType will be used to dynamically create a Protobuf Schema and register it with the Confluent Schema Registry. It will use the same schema to construct a DynamicMessage and serialize it. - For decoding, the schema will be fetched from the registry and use DynamicMessage to deserialize and convert the Protobuf object to a Flink RowData. - Note: here there is no external .proto file 2. `debezium-avro-confluent` format which unpacks the Debezium Envelope and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT, DELETE events. - We may be able to refactor and reuse code from the existing DebeziumAvroDeserializationSchema + DebeziumAvroSerializationSchema since the deser logic is largely delegated to and these Schemas are concerned with the handling the Debezium envelope. 3. Move the Confluent Schema Registry Client code to a separate maven module, flink-formats/flink-confluent-common, and extend it to support ProtobufSchemaProvider <https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26> . Does anyone have any feedback or objections to this approach?