Hey all, Thanks for the ping Kevin! I put up a PR for the alternative implementation here: https://github.com/apache/flink/pull/25114, and would love to get this group's feedback!
TIA! On Fri, Jul 19, 2024 at 2:17 PM Kevin Lam <kevin....@shopify.com> wrote: > Hi all, > > I had chimed > <https://github.com/apache/flink/pull/24482#issuecomment-2226279775> in > on Anupam's PR that we might have some time to look at this issue > FLINK-34440. > > Adding @David Mariassy <david.maria...@shopify.com> to this thread, as he's > started to work > <https://github.com/apache/flink/pull/24482#issuecomment-2239843309> on > our implementation of FLINK-34440: > https://github.com/Shopify/flink/tree/protobuf-confluent-dynamic-deser > > On Fri, Apr 19, 2024 at 12:41 AM Anupam Aggarwal < > anupam.aggar...@gmail.com> wrote: > >> Thanks David. >> That's a great idea. For deserialization the external schema Id will be >> used to obtain a dynamic message, so in a way it has to be inline with the >> writer schema. >> We could limit it to serialization and rename it according to your >> suggestion. >> >> Thanks >> Anupam >> On Tue, Apr 16, 2024 at 3:38 PM David Radley <david_rad...@uk.ibm.com> >> wrote: >> >> > Hi Anupam, >> > Thanks for your response. I was wondering around the schema id and had >> > some thoughts: >> > >> > I assume that for Confluent Avro, specifying the schema is not normally >> > done, but could be useful to force a particular shape. >> > >> > If you specify a schema id in the format configuration: >> > - for deserialization : does this mean the schema id in the payload has >> to >> > match it. If so we lose the ability to have multiple versions of the >> schema >> > on a topic. For me schemaId makes less sense for deserialization as the >> > existing mechanism used by Avro / confluent avro formats is working >> well. >> > >> > - I can see it makes sense for the serialization where there is an >> > existing schema in the registry you want to target. >> > >> > I suggest the schemaId be called something like schemaIdForSink or >> > schemaIdForSerilization; to prevent confusion with the deserialization >> > case. We could have the schema as you suggest so we are compatible with >> the >> > confluent avro format. >> > >> > >> > WDYT? >> > Kind regards, David. >> > >> > >> > From: Anupam Aggarwal <anupam.aggar...@gmail.com> >> > Date: Saturday, 13 April 2024 at 16:08 >> > To: dev@flink.apache.org <dev@flink.apache.org> >> > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf >> > Confluent Format >> > Hi David, >> > >> > Thank you for the suggestion. >> > IIUC, you are proposing using an explicit schema string, instead of the >> > schemaID. >> > This makes sense, as it would make the behavior consistent with Avro, >> > although a bit more verbose from a config standpoint. >> > >> > If we go via the schema string route, the user would have to ensure that >> > the input schema string corresponds to an existing schemaID. >> > This however, might end up registering a new id (based on >> > >> > >> https://github.com/confluentinc/schema-registry/issues/878#issuecomment-437510493 >> > ). >> > >> > How about adding both the options (explicit schema string/ schemaID). >> > If a schema string is specified we register a new schemaID, if the user >> > specifies an explicit schemaID we just use it directly? >> > >> > Thanks >> > Anupam >> > >> > On Wed, Apr 10, 2024 at 2:27 PM David Radley <david_rad...@uk.ibm.com> >> > wrote: >> > >> > > Hi, >> > > I notice in the draft pr that there is a schema id in the format >> config. >> > I >> > > was wondering why? In the confluent avro and existing debezium >> formats, >> > > there is no schema id in the config, but there is the ability to >> specify >> > a >> > > complete schema. In the protobuf format there is no schema id. >> > > >> > > I assume the schema id would be used during serialize in the case >> there >> > is >> > > already an existing registered schema and you have its id. I see in >> the >> > > docs >> > > >> > >> https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html >> > > there is a serialize example where 2 schemas are registered. >> > > >> > > I would suggest aiming to copy what the confluent DeSer libraries do >> > > rather than having a schema id hard coded in the config. >> > > >> > > WDYT? >> > > Kind regards, David. >> > > >> > > From: Kevin Lam <kevin....@shopify.com.INVALID> >> > > Date: Tuesday, 26 March 2024 at 20:06 >> > > To: dev@flink.apache.org <dev@flink.apache.org> >> > > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium >> Protobuf >> > > Confluent Format >> > > Thanks Anupam! Looking forward to it. >> > > >> > > On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal < >> > anupam.aggar...@gmail.com >> > > > >> > > wrote: >> > > >> > > > Hi Kevin, >> > > > >> > > > Thanks, these are some great points. >> > > > Just to clarify, I do agree that the subject should be an option >> (like >> > in >> > > > the case of RegistryAvroFormatFactory). >> > > > We could fallback to subject and auto-register schemas, if schema-Id >> > not >> > > > provided explicitly. >> > > > In general, I think it would be good to be more explicit about the >> > > schemas >> > > > used ( >> > > > >> > > > >> > > >> > >> https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration >> > > > < >> > > > >> > > >> > >> https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration >> > > > > >> > > > ). >> > > > This would also help prevent us from overriding the ids in >> incompatible >> > > > ways. >> > > > >> > > > Under the current implementation of FlinkToProtoSchemaConverter we >> > might >> > > > end up overwriting the field-Ids. >> > > > If we are able to locate a prior schema, the approach you outlined >> > makes >> > > a >> > > > lot of sense. >> > > > Let me explore this a bit further and get back(in terms of >> > feasibility). >> > > > >> > > > Thanks again! >> > > > - Anupam >> > > > >> > > > On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam >> > <kevin....@shopify.com.invalid >> > > > >> > > > wrote: >> > > > >> > > > > Hi Anupam, >> > > > > >> > > > > Thanks again for your work on contributing this feature back. >> > > > > >> > > > > Sounds good re: the refactoring/re-organizing. >> > > > > >> > > > > Regarding the schema-id, in my opinion this should NOT be a >> > > configuration >> > > > > option on the format. We should be able to deterministically map >> the >> > > > Flink >> > > > > type to the ProtoSchema and register that with the Schema >> Registry. >> > > > > >> > > > > I think it can make sense to provide the `subject` as a >> parameter, so >> > > > that >> > > > > the serialization format can look up existing schemas. >> > > > > >> > > > > This would be used in something I mentioned something related >> above >> > > > > >> > > > > > Another topic I had is Protobuf's field ids. Ideally in Flink it >> > > would >> > > > be >> > > > > > nice if we are idiomatic about not renumbering them in >> incompatible >> > > > ways, >> > > > > > similar to what's discussed on the Schema Registry issue here: >> > > > > > https://github.com/confluentinc/schema-registry/issues/2551 >> > > > > >> > > > > >> > > > > When we construct the ProtobufSchema from the Flink LogicalType, >> we >> > > > > shouldn't renumber the field ids in an incompatible way, so one >> > > approach >> > > > > would be to use the subject to look up the most recent version, >> and >> > use >> > > > > that to evolve the field ids correctly. >> > > > > >> > > > > >> > > > > On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal < >> > > > anupam.aggar...@gmail.com >> > > > > > >> > > > > wrote: >> > > > > >> > > > > > Hi Kevin, >> > > > > > >> > > > > > Thanks for starting the discussion on this. >> > > > > > I will be working on contributing this feature back (This was >> > > developed >> > > > > by >> > > > > > Dawid Wysakowicz and others at Confluent). >> > > > > > >> > > > > > I have opened a (very initial) draft PR here >> > > > > > https://github.com/apache/flink/pull/24482 with our current >> > > > > > implementation. >> > > > > > Thanks for the feedback on the PR, I haven’t gotten around to >> > > > > > re-organizing/refactoring the classes yet, but it would be >> inline >> > > with >> > > > > some >> > > > > > of your comments. >> > > > > > >> > > > > > On the overall approach there are some slight variations from >> the >> > > > initial >> > > > > > proposal. >> > > > > > Our implementation relies on an explicit schema-id being passed >> > > through >> > > > > the >> > > > > > config. This could help in cases where one Flink type could >> > > potentially >> > > > > map >> > > > > > to multiple proto types. >> > > > > > We could make the schema-Id optional and fall back to deriving >> it >> > > from >> > > > > the >> > > > > > rowType (during serialization) if not present? >> > > > > > >> > > > > > The message index handling is still TBD. I am thinking of >> > replicating >> > > > > logic >> > > > > > in AbstractKafkaProtobufSerializer.java >> > > > > > < >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/confluentinc/schema-registry/blob/342c8a9d3854d4253d785214f5dcfb1b6cc59a06/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufSerializer.java#L157 >> > > > > > > >> > > > > > (|Deserializer). >> > > > > > Please let me know if this makes sense / or in case you have any >> > > other >> > > > > > feedback. >> > > > > > >> > > > > > Thanks >> > > > > > Anupam >> > > > > > >> > > > > > On Thu, Feb 29, 2024 at 8:54 PM Kevin Lam >> > > > <kevin....@shopify.com.invalid >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > Hey Robert, >> > > > > > > >> > > > > > > Awesome thanks, that timeline works for me. Sounds good re: >> > > deciding >> > > > on >> > > > > > > FLIP once we have the PR, and thanks for looking into the >> field >> > > ids. >> > > > > > > >> > > > > > > Looking forward to it! >> > > > > > > >> > > > > > > On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger < >> > > metrob...@gmail.com> >> > > > > > > wrote: >> > > > > > > >> > > > > > > > Hey Kevin, >> > > > > > > > >> > > > > > > > Thanks a lot. Then let's contribute the Confluent >> > implementation >> > > to >> > > > > > > > apache/flink. We can't start working on this immediately >> > because >> > > > of a >> > > > > > > team >> > > > > > > > event next week, but within the next two weeks, we will >> start >> > > > working >> > > > > > on >> > > > > > > > this. >> > > > > > > > It probably makes sense for us to open a pull request of >> what >> > we >> > > > have >> > > > > > > > already, so that you can start reviewing and maybe also >> > > > contributing >> > > > > to >> > > > > > > the >> > > > > > > > PR. >> > > > > > > > I hope this timeline works for you! >> > > > > > > > >> > > > > > > > Let's also decide if we need a FLIP once the code is public. >> > > > > > > > We will look into the field ids. >> > > > > > > > >> > > > > > > > >> > > > > > > > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam >> > > > > > <kevin....@shopify.com.invalid >> > > > > > > > >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hey Robert, >> > > > > > > > > >> > > > > > > > > Thanks for your response. I have a partial implementation, >> > just >> > > > for >> > > > > > the >> > > > > > > > > decoding portion. >> > > > > > > > > >> > > > > > > > > The code I have is pretty rough and doesn't do any of the >> > > > > refactors I >> > > > > > > > > mentioned, but the decoder logic does pull the schema from >> > the >> > > > > schema >> > > > > > > > > registry and use that to deserialize the DynamicMessage >> > before >> > > > > > > converting >> > > > > > > > > it to RowData using a DynamicMessageToRowDataConverter >> class. >> > > For >> > > > > the >> > > > > > > > other >> > > > > > > > > aspects, I would need to start from scratch for the >> encoder. >> > > > > > > > > >> > > > > > > > > Would be very happy to see you drive the contribution >> back to >> > > > open >> > > > > > > source >> > > > > > > > > from Confluent, or collaborate on this. >> > > > > > > > > >> > > > > > > > > Another topic I had is Protobuf's field ids. Ideally in >> Flink >> > > it >> > > > > > would >> > > > > > > be >> > > > > > > > > nice if we are idiomatic about not renumbering them in >> > > > incompatible >> > > > > > > ways, >> > > > > > > > > similar to what's discussed on the Schema Registry issue >> > here: >> > > > > > > > > >> https://github.com/confluentinc/schema-registry/issues/2551 >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger < >> > > > > rmetz...@apache.org> >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi all, >> > > > > > > > > > >> > > > > > > > > > +1 to support the format in Flink. >> > > > > > > > > > >> > > > > > > > > > @Kevin: Do you already have an implementation for this >> > > inhouse >> > > > > that >> > > > > > > you >> > > > > > > > > are >> > > > > > > > > > looking to upstream, or would you start from scratch? >> > > > > > > > > > I'm asking because my current employer, Confluent, has a >> > > > Protobuf >> > > > > > > > Schema >> > > > > > > > > > registry implementation for Flink, and I could help >> drive >> > > > > > > contributing >> > > > > > > > > this >> > > > > > > > > > back to open source. >> > > > > > > > > > If you already have an implementation, let's decide >> which >> > one >> > > > to >> > > > > > use >> > > > > > > :) >> > > > > > > > > > >> > > > > > > > > > Best, >> > > > > > > > > > Robert >> > > > > > > > > > >> > > > > > > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley < >> > > > > > > david_rad...@uk.ibm.com> >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hi Kevin, >> > > > > > > > > > > Some thoughts on this. >> > > > > > > > > > > I suggested an Apicurio registry format in the dev >> list, >> > > and >> > > > > was >> > > > > > > > > advised >> > > > > > > > > > > to raise a FLIP for this, I suggest the same would >> apply >> > > here >> > > > > (or >> > > > > > > the >> > > > > > > > > > > alternative to FLIPs if you cannot raise one). I am >> > > > prototyping >> > > > > > an >> > > > > > > > Avro >> > > > > > > > > > > Apicurio format, prior to raising the Flip, and >> notice >> > > that >> > > > > the >> > > > > > > > > > readSchema >> > > > > > > > > > > in the SchemaCoder only takes a byte array ,but I >> need to >> > > > pass >> > > > > > down >> > > > > > > > the >> > > > > > > > > > > Kafka headers (where the Apicurio globalId identifying >> > the >> > > > > schema >> > > > > > > > > lives). >> > > > > > > > > > > >> > > > > > > > > > > I assume: >> > > > > > > > > > > >> > > > > > > > > > > * for the confluent Protobuf format you would >> extend >> > > the >> > > > > > > Protobuf >> > > > > > > > > > > format to drive some Schema Registry logic for >> Protobuf >> > > > > (similar >> > > > > > to >> > > > > > > > the >> > > > > > > > > > way >> > > > > > > > > > > Avro does it) where the magic byte _ schema id can be >> > > > obtained >> > > > > > and >> > > > > > > > the >> > > > > > > > > > > schema looked up using the Confluent Schema registry. >> > > > > > > > > > > * It would be good if any protobuf format >> > enhancements >> > > > for >> > > > > > > Schema >> > > > > > > > > > > registries pass down the Kafka headers (I am thinking >> as >> > a >> > > > > > > > Map<String, >> > > > > > > > > > > Object> for Avro) as well as the message payload so >> > > Apicurio >> > > > > > > registry >> > > > > > > > > > could >> > > > > > > > > > > work with this. >> > > > > > > > > > > * It would make sense to have the Confluent schema >> > > lookup >> > > > > in >> > > > > > > > common >> > > > > > > > > > > code, which is part of the SchemaCoder readSchema >> logic. >> > > > > > > > > > > * I assume the ProtobufSchemaCoder readSchema >> would >> > > > return >> > > > > a >> > > > > > > > > Protobuf >> > > > > > > > > > > Schema object. >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > I also wondered whether these Kafka only formats >> should >> > be >> > > > > moved >> > > > > > to >> > > > > > > > the >> > > > > > > > > > > Kafka connector repo, or whether they might in the >> future >> > > be >> > > > > used >> > > > > > > > > outside >> > > > > > > > > > > Kafka – e.g. Avro/Protobuf files in a database. >> > > > > > > > > > > Kind regards, David. >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > From: Kevin Lam <kevin....@shopify.com.INVALID> >> > > > > > > > > > > Date: Wednesday, 21 February 2024 at 18:51 >> > > > > > > > > > > To: dev@flink.apache.org <dev@flink.apache.org> >> > > > > > > > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support >> > Debezium >> > > > > > Protobuf >> > > > > > > > > > > Confluent Format >> > > > > > > > > > > I would love to get some feedback from the community >> on >> > > this >> > > > > JIRA >> > > > > > > > > issue: >> > > > > > > > > > > >> > > > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440 >> > > > > > > > > > > >> > > > > > > > > > > I am looking into creating a PR and would appreciate >> some >> > > > > review >> > > > > > on >> > > > > > > > the >> > > > > > > > > > > approach. >> > > > > > > > > > > >> > > > > > > > > > > In terms of design I think we can mirror the >> > > > > > > > `debezium-avro-confluent` >> > > > > > > > > > and >> > > > > > > > > > > `avro-confluent` formats already available in Flink: >> > > > > > > > > > > >> > > > > > > > > > > 1. `protobuf-confluent` format which uses >> > DynamicMessage >> > > > > > > > > > > < >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage >> > > > > > > > > > > > >> > > > > > > > > > > for encoding and decoding. >> > > > > > > > > > > - For encoding the Flink RowType will be used to >> > > > > > dynamically >> > > > > > > > > > create a >> > > > > > > > > > > Protobuf Schema and register it with the >> Confluent >> > > > Schema >> > > > > > > > > > > Registry. It will >> > > > > > > > > > > use the same schema to construct a >> DynamicMessage >> > and >> > > > > > > serialize >> > > > > > > > > it. >> > > > > > > > > > > - For decoding, the schema will be fetched from >> the >> > > > > > registry >> > > > > > > > and >> > > > > > > > > > use >> > > > > > > > > > > DynamicMessage to deserialize and convert the >> > > Protobuf >> > > > > > object >> > > > > > > > to >> > > > > > > > > a >> > > > > > > > > > > Flink >> > > > > > > > > > > RowData. >> > > > > > > > > > > - Note: here there is no external .proto file >> > > > > > > > > > > 2. `debezium-avro-confluent` format which unpacks >> the >> > > > > Debezium >> > > > > > > > > > Envelope >> > > > > > > > > > > and collects the appropriate UPDATE_BEFORE, >> > > UPDATE_AFTER, >> > > > > > > INSERT, >> > > > > > > > > > DELETE >> > > > > > > > > > > events. >> > > > > > > > > > > - We may be able to refactor and reuse code from >> > the >> > > > > > existing >> > > > > > > > > > > DebeziumAvroDeserializationSchema + >> > > > > > > > > DebeziumAvroSerializationSchema >> > > > > > > > > > > since >> > > > > > > > > > > the deser logic is largely delegated to and >> these >> > > > Schemas >> > > > > > are >> > > > > > > > > > > concerned >> > > > > > > > > > > with the handling the Debezium envelope. >> > > > > > > > > > > 3. Move the Confluent Schema Registry Client code >> to a >> > > > > > separate >> > > > > > > > > maven >> > > > > > > > > > > module, flink-formats/flink-confluent-common, and >> > extend >> > > > it >> > > > > to >> > > > > > > > > support >> > > > > > > > > > > ProtobufSchemaProvider >> > > > > > > > > > > < >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26 >> > > > > > > > > > > > >> > > > > > > > > > > . >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > Does anyone have any feedback or objections to this >> > > approach? >> > > > > > > > > > > >> > > > > > > > > > > Unless otherwise stated above: >> > > > > > > > > > > >> > > > > > > > > > > IBM United Kingdom Limited >> > > > > > > > > > > Registered in England and Wales with number 741598 >> > > > > > > > > > > Registered office: PO Box 41, North Harbour, >> Portsmouth, >> > > > Hants. >> > > > > > PO6 >> > > > > > > > 3AU >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > Unless otherwise stated above: >> > > >> > > IBM United Kingdom Limited >> > > Registered in England and Wales with number 741598 >> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 >> 3AU >> > > >> > >> > Unless otherwise stated above: >> > >> > IBM United Kingdom Limited >> > Registered in England and Wales with number 741598 >> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU >> > >> > -- David Mariassy Staff Data Developer [image: Shopify] <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>