Hey Robert, Thanks for your response. I have a partial implementation, just for the decoding portion.
The code I have is pretty rough and doesn't do any of the refactors I mentioned, but the decoder logic does pull the schema from the schema registry and use that to deserialize the DynamicMessage before converting it to RowData using a DynamicMessageToRowDataConverter class. For the other aspects, I would need to start from scratch for the encoder. Would be very happy to see you drive the contribution back to open source from Confluent, or collaborate on this. Another topic I had is Protobuf's field ids. Ideally in Flink it would be nice if we are idiomatic about not renumbering them in incompatible ways, similar to what's discussed on the Schema Registry issue here: https://github.com/confluentinc/schema-registry/issues/2551 On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger <rmetz...@apache.org> wrote: > Hi all, > > +1 to support the format in Flink. > > @Kevin: Do you already have an implementation for this inhouse that you are > looking to upstream, or would you start from scratch? > I'm asking because my current employer, Confluent, has a Protobuf Schema > registry implementation for Flink, and I could help drive contributing this > back to open source. > If you already have an implementation, let's decide which one to use :) > > Best, > Robert > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <david_rad...@uk.ibm.com> > wrote: > > > Hi Kevin, > > Some thoughts on this. > > I suggested an Apicurio registry format in the dev list, and was advised > > to raise a FLIP for this, I suggest the same would apply here (or the > > alternative to FLIPs if you cannot raise one). I am prototyping an Avro > > Apicurio format, prior to raising the Flip, and notice that the > readSchema > > in the SchemaCoder only takes a byte array ,but I need to pass down the > > Kafka headers (where the Apicurio globalId identifying the schema lives). > > > > I assume: > > > > * for the confluent Protobuf format you would extend the Protobuf > > format to drive some Schema Registry logic for Protobuf (similar to the > way > > Avro does it) where the magic byte _ schema id can be obtained and the > > schema looked up using the Confluent Schema registry. > > * It would be good if any protobuf format enhancements for Schema > > registries pass down the Kafka headers (I am thinking as a Map<String, > > Object> for Avro) as well as the message payload so Apicurio registry > could > > work with this. > > * It would make sense to have the Confluent schema lookup in common > > code, which is part of the SchemaCoder readSchema logic. > > * I assume the ProtobufSchemaCoder readSchema would return a Protobuf > > Schema object. > > > > > > > > I also wondered whether these Kafka only formats should be moved to the > > Kafka connector repo, or whether they might in the future be used outside > > Kafka – e.g. Avro/Protobuf files in a database. > > Kind regards, David. > > > > > > From: Kevin Lam <kevin....@shopify.com.INVALID> > > Date: Wednesday, 21 February 2024 at 18:51 > > To: dev@flink.apache.org <dev@flink.apache.org> > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf > > Confluent Format > > I would love to get some feedback from the community on this JIRA issue: > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440 > > > > I am looking into creating a PR and would appreciate some review on the > > approach. > > > > In terms of design I think we can mirror the `debezium-avro-confluent` > and > > `avro-confluent` formats already available in Flink: > > > > 1. `protobuf-confluent` format which uses DynamicMessage > > < > > > https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage > > > > > for encoding and decoding. > > - For encoding the Flink RowType will be used to dynamically > create a > > Protobuf Schema and register it with the Confluent Schema > > Registry. It will > > use the same schema to construct a DynamicMessage and serialize it. > > - For decoding, the schema will be fetched from the registry and > use > > DynamicMessage to deserialize and convert the Protobuf object to a > > Flink > > RowData. > > - Note: here there is no external .proto file > > 2. `debezium-avro-confluent` format which unpacks the Debezium > Envelope > > and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT, > DELETE > > events. > > - We may be able to refactor and reuse code from the existing > > DebeziumAvroDeserializationSchema + DebeziumAvroSerializationSchema > > since > > the deser logic is largely delegated to and these Schemas are > > concerned > > with the handling the Debezium envelope. > > 3. Move the Confluent Schema Registry Client code to a separate maven > > module, flink-formats/flink-confluent-common, and extend it to support > > ProtobufSchemaProvider > > < > > > https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26 > > > > > . > > > > > > Does anyone have any feedback or objections to this approach? > > > > Unless otherwise stated above: > > > > IBM United Kingdom Limited > > Registered in England and Wales with number 741598 > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > >