Hey Robert, Awesome thanks, that timeline works for me. Sounds good re: deciding on FLIP once we have the PR, and thanks for looking into the field ids.
Looking forward to it! On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger <metrob...@gmail.com> wrote: > Hey Kevin, > > Thanks a lot. Then let's contribute the Confluent implementation to > apache/flink. We can't start working on this immediately because of a team > event next week, but within the next two weeks, we will start working on > this. > It probably makes sense for us to open a pull request of what we have > already, so that you can start reviewing and maybe also contributing to the > PR. > I hope this timeline works for you! > > Let's also decide if we need a FLIP once the code is public. > We will look into the field ids. > > > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam <kevin....@shopify.com.invalid> > wrote: > > > Hey Robert, > > > > Thanks for your response. I have a partial implementation, just for the > > decoding portion. > > > > The code I have is pretty rough and doesn't do any of the refactors I > > mentioned, but the decoder logic does pull the schema from the schema > > registry and use that to deserialize the DynamicMessage before converting > > it to RowData using a DynamicMessageToRowDataConverter class. For the > other > > aspects, I would need to start from scratch for the encoder. > > > > Would be very happy to see you drive the contribution back to open source > > from Confluent, or collaborate on this. > > > > Another topic I had is Protobuf's field ids. Ideally in Flink it would be > > nice if we are idiomatic about not renumbering them in incompatible ways, > > similar to what's discussed on the Schema Registry issue here: > > https://github.com/confluentinc/schema-registry/issues/2551 > > > > > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger <rmetz...@apache.org> > > wrote: > > > > > Hi all, > > > > > > +1 to support the format in Flink. > > > > > > @Kevin: Do you already have an implementation for this inhouse that you > > are > > > looking to upstream, or would you start from scratch? > > > I'm asking because my current employer, Confluent, has a Protobuf > Schema > > > registry implementation for Flink, and I could help drive contributing > > this > > > back to open source. > > > If you already have an implementation, let's decide which one to use :) > > > > > > Best, > > > Robert > > > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <david_rad...@uk.ibm.com> > > > wrote: > > > > > > > Hi Kevin, > > > > Some thoughts on this. > > > > I suggested an Apicurio registry format in the dev list, and was > > advised > > > > to raise a FLIP for this, I suggest the same would apply here (or the > > > > alternative to FLIPs if you cannot raise one). I am prototyping an > Avro > > > > Apicurio format, prior to raising the Flip, and notice that the > > > readSchema > > > > in the SchemaCoder only takes a byte array ,but I need to pass down > the > > > > Kafka headers (where the Apicurio globalId identifying the schema > > lives). > > > > > > > > I assume: > > > > > > > > * for the confluent Protobuf format you would extend the Protobuf > > > > format to drive some Schema Registry logic for Protobuf (similar to > the > > > way > > > > Avro does it) where the magic byte _ schema id can be obtained and > the > > > > schema looked up using the Confluent Schema registry. > > > > * It would be good if any protobuf format enhancements for Schema > > > > registries pass down the Kafka headers (I am thinking as a > Map<String, > > > > Object> for Avro) as well as the message payload so Apicurio registry > > > could > > > > work with this. > > > > * It would make sense to have the Confluent schema lookup in > common > > > > code, which is part of the SchemaCoder readSchema logic. > > > > * I assume the ProtobufSchemaCoder readSchema would return a > > Protobuf > > > > Schema object. > > > > > > > > > > > > > > > > I also wondered whether these Kafka only formats should be moved to > the > > > > Kafka connector repo, or whether they might in the future be used > > outside > > > > Kafka – e.g. Avro/Protobuf files in a database. > > > > Kind regards, David. > > > > > > > > > > > > From: Kevin Lam <kevin....@shopify.com.INVALID> > > > > Date: Wednesday, 21 February 2024 at 18:51 > > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf > > > > Confluent Format > > > > I would love to get some feedback from the community on this JIRA > > issue: > > > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440 > > > > > > > > I am looking into creating a PR and would appreciate some review on > the > > > > approach. > > > > > > > > In terms of design I think we can mirror the > `debezium-avro-confluent` > > > and > > > > `avro-confluent` formats already available in Flink: > > > > > > > > 1. `protobuf-confluent` format which uses DynamicMessage > > > > < > > > > > > > > > > https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage > > > > > > > > > for encoding and decoding. > > > > - For encoding the Flink RowType will be used to dynamically > > > create a > > > > Protobuf Schema and register it with the Confluent Schema > > > > Registry. It will > > > > use the same schema to construct a DynamicMessage and serialize > > it. > > > > - For decoding, the schema will be fetched from the registry > and > > > use > > > > DynamicMessage to deserialize and convert the Protobuf object > to > > a > > > > Flink > > > > RowData. > > > > - Note: here there is no external .proto file > > > > 2. `debezium-avro-confluent` format which unpacks the Debezium > > > Envelope > > > > and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT, > > > DELETE > > > > events. > > > > - We may be able to refactor and reuse code from the existing > > > > DebeziumAvroDeserializationSchema + > > DebeziumAvroSerializationSchema > > > > since > > > > the deser logic is largely delegated to and these Schemas are > > > > concerned > > > > with the handling the Debezium envelope. > > > > 3. Move the Confluent Schema Registry Client code to a separate > > maven > > > > module, flink-formats/flink-confluent-common, and extend it to > > support > > > > ProtobufSchemaProvider > > > > < > > > > > > > > > > https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26 > > > > > > > > > . > > > > > > > > > > > > Does anyone have any feedback or objections to this approach? > > > > > > > > Unless otherwise stated above: > > > > > > > > IBM United Kingdom Limited > > > > Registered in England and Wales with number 741598 > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 > 3AU > > > > > > > > > >