Hi all, I had chimed <https://github.com/apache/flink/pull/24482#issuecomment-2226279775> in on Anupam's PR that we might have some time to look at this issue FLINK-34440.
Adding @David Mariassy <david.maria...@shopify.com> to this thread, as he's started to work <https://github.com/apache/flink/pull/24482#issuecomment-2239843309> on our implementation of FLINK-34440: https://github.com/Shopify/flink/tree/protobuf-confluent-dynamic-deser On Fri, Apr 19, 2024 at 12:41 AM Anupam Aggarwal <anupam.aggar...@gmail.com> wrote: > Thanks David. > That's a great idea. For deserialization the external schema Id will be > used to obtain a dynamic message, so in a way it has to be inline with the > writer schema. > We could limit it to serialization and rename it according to your > suggestion. > > Thanks > Anupam > On Tue, Apr 16, 2024 at 3:38 PM David Radley <david_rad...@uk.ibm.com> > wrote: > > > Hi Anupam, > > Thanks for your response. I was wondering around the schema id and had > > some thoughts: > > > > I assume that for Confluent Avro, specifying the schema is not normally > > done, but could be useful to force a particular shape. > > > > If you specify a schema id in the format configuration: > > - for deserialization : does this mean the schema id in the payload has > to > > match it. If so we lose the ability to have multiple versions of the > schema > > on a topic. For me schemaId makes less sense for deserialization as the > > existing mechanism used by Avro / confluent avro formats is working well. > > > > - I can see it makes sense for the serialization where there is an > > existing schema in the registry you want to target. > > > > I suggest the schemaId be called something like schemaIdForSink or > > schemaIdForSerilization; to prevent confusion with the deserialization > > case. We could have the schema as you suggest so we are compatible with > the > > confluent avro format. > > > > > > WDYT? > > Kind regards, David. > > > > > > From: Anupam Aggarwal <anupam.aggar...@gmail.com> > > Date: Saturday, 13 April 2024 at 16:08 > > To: dev@flink.apache.org <dev@flink.apache.org> > > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf > > Confluent Format > > Hi David, > > > > Thank you for the suggestion. > > IIUC, you are proposing using an explicit schema string, instead of the > > schemaID. > > This makes sense, as it would make the behavior consistent with Avro, > > although a bit more verbose from a config standpoint. > > > > If we go via the schema string route, the user would have to ensure that > > the input schema string corresponds to an existing schemaID. > > This however, might end up registering a new id (based on > > > > > https://github.com/confluentinc/schema-registry/issues/878#issuecomment-437510493 > > ). > > > > How about adding both the options (explicit schema string/ schemaID). > > If a schema string is specified we register a new schemaID, if the user > > specifies an explicit schemaID we just use it directly? > > > > Thanks > > Anupam > > > > On Wed, Apr 10, 2024 at 2:27 PM David Radley <david_rad...@uk.ibm.com> > > wrote: > > > > > Hi, > > > I notice in the draft pr that there is a schema id in the format > config. > > I > > > was wondering why? In the confluent avro and existing debezium formats, > > > there is no schema id in the config, but there is the ability to > specify > > a > > > complete schema. In the protobuf format there is no schema id. > > > > > > I assume the schema id would be used during serialize in the case there > > is > > > already an existing registered schema and you have its id. I see in the > > > docs > > > > > > https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html > > > there is a serialize example where 2 schemas are registered. > > > > > > I would suggest aiming to copy what the confluent DeSer libraries do > > > rather than having a schema id hard coded in the config. > > > > > > WDYT? > > > Kind regards, David. > > > > > > From: Kevin Lam <kevin....@shopify.com.INVALID> > > > Date: Tuesday, 26 March 2024 at 20:06 > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf > > > Confluent Format > > > Thanks Anupam! Looking forward to it. > > > > > > On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal < > > anupam.aggar...@gmail.com > > > > > > > wrote: > > > > > > > Hi Kevin, > > > > > > > > Thanks, these are some great points. > > > > Just to clarify, I do agree that the subject should be an option > (like > > in > > > > the case of RegistryAvroFormatFactory). > > > > We could fallback to subject and auto-register schemas, if schema-Id > > not > > > > provided explicitly. > > > > In general, I think it would be good to be more explicit about the > > > schemas > > > > used ( > > > > > > > > > > > > > > https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration > > > > < > > > > > > > > > > https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration > > > > > > > > > ). > > > > This would also help prevent us from overriding the ids in > incompatible > > > > ways. > > > > > > > > Under the current implementation of FlinkToProtoSchemaConverter we > > might > > > > end up overwriting the field-Ids. > > > > If we are able to locate a prior schema, the approach you outlined > > makes > > > a > > > > lot of sense. > > > > Let me explore this a bit further and get back(in terms of > > feasibility). > > > > > > > > Thanks again! > > > > - Anupam > > > > > > > > On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam > > <kevin....@shopify.com.invalid > > > > > > > > wrote: > > > > > > > > > Hi Anupam, > > > > > > > > > > Thanks again for your work on contributing this feature back. > > > > > > > > > > Sounds good re: the refactoring/re-organizing. > > > > > > > > > > Regarding the schema-id, in my opinion this should NOT be a > > > configuration > > > > > option on the format. We should be able to deterministically map > the > > > > Flink > > > > > type to the ProtoSchema and register that with the Schema Registry. > > > > > > > > > > I think it can make sense to provide the `subject` as a parameter, > so > > > > that > > > > > the serialization format can look up existing schemas. > > > > > > > > > > This would be used in something I mentioned something related above > > > > > > > > > > > Another topic I had is Protobuf's field ids. Ideally in Flink it > > > would > > > > be > > > > > > nice if we are idiomatic about not renumbering them in > incompatible > > > > ways, > > > > > > similar to what's discussed on the Schema Registry issue here: > > > > > > https://github.com/confluentinc/schema-registry/issues/2551 > > > > > > > > > > > > > > > When we construct the ProtobufSchema from the Flink LogicalType, we > > > > > shouldn't renumber the field ids in an incompatible way, so one > > > approach > > > > > would be to use the subject to look up the most recent version, and > > use > > > > > that to evolve the field ids correctly. > > > > > > > > > > > > > > > On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal < > > > > anupam.aggar...@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > Hi Kevin, > > > > > > > > > > > > Thanks for starting the discussion on this. > > > > > > I will be working on contributing this feature back (This was > > > developed > > > > > by > > > > > > Dawid Wysakowicz and others at Confluent). > > > > > > > > > > > > I have opened a (very initial) draft PR here > > > > > > https://github.com/apache/flink/pull/24482 with our current > > > > > > implementation. > > > > > > Thanks for the feedback on the PR, I haven’t gotten around to > > > > > > re-organizing/refactoring the classes yet, but it would be inline > > > with > > > > > some > > > > > > of your comments. > > > > > > > > > > > > On the overall approach there are some slight variations from the > > > > initial > > > > > > proposal. > > > > > > Our implementation relies on an explicit schema-id being passed > > > through > > > > > the > > > > > > config. This could help in cases where one Flink type could > > > potentially > > > > > map > > > > > > to multiple proto types. > > > > > > We could make the schema-Id optional and fall back to deriving it > > > from > > > > > the > > > > > > rowType (during serialization) if not present? > > > > > > > > > > > > The message index handling is still TBD. I am thinking of > > replicating > > > > > logic > > > > > > in AbstractKafkaProtobufSerializer.java > > > > > > < > > > > > > > > > > > > > > > > > > > > > https://github.com/confluentinc/schema-registry/blob/342c8a9d3854d4253d785214f5dcfb1b6cc59a06/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufSerializer.java#L157 > > > > > > > > > > > > > (|Deserializer). > > > > > > Please let me know if this makes sense / or in case you have any > > > other > > > > > > feedback. > > > > > > > > > > > > Thanks > > > > > > Anupam > > > > > > > > > > > > On Thu, Feb 29, 2024 at 8:54 PM Kevin Lam > > > > <kevin....@shopify.com.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hey Robert, > > > > > > > > > > > > > > Awesome thanks, that timeline works for me. Sounds good re: > > > deciding > > > > on > > > > > > > FLIP once we have the PR, and thanks for looking into the field > > > ids. > > > > > > > > > > > > > > Looking forward to it! > > > > > > > > > > > > > > On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger < > > > metrob...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hey Kevin, > > > > > > > > > > > > > > > > Thanks a lot. Then let's contribute the Confluent > > implementation > > > to > > > > > > > > apache/flink. We can't start working on this immediately > > because > > > > of a > > > > > > > team > > > > > > > > event next week, but within the next two weeks, we will start > > > > working > > > > > > on > > > > > > > > this. > > > > > > > > It probably makes sense for us to open a pull request of what > > we > > > > have > > > > > > > > already, so that you can start reviewing and maybe also > > > > contributing > > > > > to > > > > > > > the > > > > > > > > PR. > > > > > > > > I hope this timeline works for you! > > > > > > > > > > > > > > > > Let's also decide if we need a FLIP once the code is public. > > > > > > > > We will look into the field ids. > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam > > > > > > <kevin....@shopify.com.invalid > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey Robert, > > > > > > > > > > > > > > > > > > Thanks for your response. I have a partial implementation, > > just > > > > for > > > > > > the > > > > > > > > > decoding portion. > > > > > > > > > > > > > > > > > > The code I have is pretty rough and doesn't do any of the > > > > > refactors I > > > > > > > > > mentioned, but the decoder logic does pull the schema from > > the > > > > > schema > > > > > > > > > registry and use that to deserialize the DynamicMessage > > before > > > > > > > converting > > > > > > > > > it to RowData using a DynamicMessageToRowDataConverter > class. > > > For > > > > > the > > > > > > > > other > > > > > > > > > aspects, I would need to start from scratch for the > encoder. > > > > > > > > > > > > > > > > > > Would be very happy to see you drive the contribution back > to > > > > open > > > > > > > source > > > > > > > > > from Confluent, or collaborate on this. > > > > > > > > > > > > > > > > > > Another topic I had is Protobuf's field ids. Ideally in > Flink > > > it > > > > > > would > > > > > > > be > > > > > > > > > nice if we are idiomatic about not renumbering them in > > > > incompatible > > > > > > > ways, > > > > > > > > > similar to what's discussed on the Schema Registry issue > > here: > > > > > > > > > > https://github.com/confluentinc/schema-registry/issues/2551 > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger < > > > > > rmetz...@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > +1 to support the format in Flink. > > > > > > > > > > > > > > > > > > > > @Kevin: Do you already have an implementation for this > > > inhouse > > > > > that > > > > > > > you > > > > > > > > > are > > > > > > > > > > looking to upstream, or would you start from scratch? > > > > > > > > > > I'm asking because my current employer, Confluent, has a > > > > Protobuf > > > > > > > > Schema > > > > > > > > > > registry implementation for Flink, and I could help drive > > > > > > > contributing > > > > > > > > > this > > > > > > > > > > back to open source. > > > > > > > > > > If you already have an implementation, let's decide which > > one > > > > to > > > > > > use > > > > > > > :) > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Robert > > > > > > > > > > > > > > > > > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley < > > > > > > > david_rad...@uk.ibm.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Kevin, > > > > > > > > > > > Some thoughts on this. > > > > > > > > > > > I suggested an Apicurio registry format in the dev > list, > > > and > > > > > was > > > > > > > > > advised > > > > > > > > > > > to raise a FLIP for this, I suggest the same would > apply > > > here > > > > > (or > > > > > > > the > > > > > > > > > > > alternative to FLIPs if you cannot raise one). I am > > > > prototyping > > > > > > an > > > > > > > > Avro > > > > > > > > > > > Apicurio format, prior to raising the Flip, and notice > > > that > > > > > the > > > > > > > > > > readSchema > > > > > > > > > > > in the SchemaCoder only takes a byte array ,but I need > to > > > > pass > > > > > > down > > > > > > > > the > > > > > > > > > > > Kafka headers (where the Apicurio globalId identifying > > the > > > > > schema > > > > > > > > > lives). > > > > > > > > > > > > > > > > > > > > > > I assume: > > > > > > > > > > > > > > > > > > > > > > * for the confluent Protobuf format you would > extend > > > the > > > > > > > Protobuf > > > > > > > > > > > format to drive some Schema Registry logic for Protobuf > > > > > (similar > > > > > > to > > > > > > > > the > > > > > > > > > > way > > > > > > > > > > > Avro does it) where the magic byte _ schema id can be > > > > obtained > > > > > > and > > > > > > > > the > > > > > > > > > > > schema looked up using the Confluent Schema registry. > > > > > > > > > > > * It would be good if any protobuf format > > enhancements > > > > for > > > > > > > Schema > > > > > > > > > > > registries pass down the Kafka headers (I am thinking > as > > a > > > > > > > > Map<String, > > > > > > > > > > > Object> for Avro) as well as the message payload so > > > Apicurio > > > > > > > registry > > > > > > > > > > could > > > > > > > > > > > work with this. > > > > > > > > > > > * It would make sense to have the Confluent schema > > > lookup > > > > > in > > > > > > > > common > > > > > > > > > > > code, which is part of the SchemaCoder readSchema > logic. > > > > > > > > > > > * I assume the ProtobufSchemaCoder readSchema would > > > > return > > > > > a > > > > > > > > > Protobuf > > > > > > > > > > > Schema object. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also wondered whether these Kafka only formats should > > be > > > > > moved > > > > > > to > > > > > > > > the > > > > > > > > > > > Kafka connector repo, or whether they might in the > future > > > be > > > > > used > > > > > > > > > outside > > > > > > > > > > > Kafka – e.g. Avro/Protobuf files in a database. > > > > > > > > > > > Kind regards, David. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From: Kevin Lam <kevin....@shopify.com.INVALID> > > > > > > > > > > > Date: Wednesday, 21 February 2024 at 18:51 > > > > > > > > > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > > > > > > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support > > Debezium > > > > > > Protobuf > > > > > > > > > > > Confluent Format > > > > > > > > > > > I would love to get some feedback from the community on > > > this > > > > > JIRA > > > > > > > > > issue: > > > > > > > > > > > > > > > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440 > > > > > > > > > > > > > > > > > > > > > > I am looking into creating a PR and would appreciate > some > > > > > review > > > > > > on > > > > > > > > the > > > > > > > > > > > approach. > > > > > > > > > > > > > > > > > > > > > > In terms of design I think we can mirror the > > > > > > > > `debezium-avro-confluent` > > > > > > > > > > and > > > > > > > > > > > `avro-confluent` formats already available in Flink: > > > > > > > > > > > > > > > > > > > > > > 1. `protobuf-confluent` format which uses > > DynamicMessage > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage > > > > > > > > > > > > > > > > > > > > > > > for encoding and decoding. > > > > > > > > > > > - For encoding the Flink RowType will be used to > > > > > > dynamically > > > > > > > > > > create a > > > > > > > > > > > Protobuf Schema and register it with the > Confluent > > > > Schema > > > > > > > > > > > Registry. It will > > > > > > > > > > > use the same schema to construct a DynamicMessage > > and > > > > > > > serialize > > > > > > > > > it. > > > > > > > > > > > - For decoding, the schema will be fetched from > the > > > > > > registry > > > > > > > > and > > > > > > > > > > use > > > > > > > > > > > DynamicMessage to deserialize and convert the > > > Protobuf > > > > > > object > > > > > > > > to > > > > > > > > > a > > > > > > > > > > > Flink > > > > > > > > > > > RowData. > > > > > > > > > > > - Note: here there is no external .proto file > > > > > > > > > > > 2. `debezium-avro-confluent` format which unpacks > the > > > > > Debezium > > > > > > > > > > Envelope > > > > > > > > > > > and collects the appropriate UPDATE_BEFORE, > > > UPDATE_AFTER, > > > > > > > INSERT, > > > > > > > > > > DELETE > > > > > > > > > > > events. > > > > > > > > > > > - We may be able to refactor and reuse code from > > the > > > > > > existing > > > > > > > > > > > DebeziumAvroDeserializationSchema + > > > > > > > > > DebeziumAvroSerializationSchema > > > > > > > > > > > since > > > > > > > > > > > the deser logic is largely delegated to and these > > > > Schemas > > > > > > are > > > > > > > > > > > concerned > > > > > > > > > > > with the handling the Debezium envelope. > > > > > > > > > > > 3. Move the Confluent Schema Registry Client code > to a > > > > > > separate > > > > > > > > > maven > > > > > > > > > > > module, flink-formats/flink-confluent-common, and > > extend > > > > it > > > > > to > > > > > > > > > support > > > > > > > > > > > ProtobufSchemaProvider > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26 > > > > > > > > > > > > > > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Does anyone have any feedback or objections to this > > > approach? > > > > > > > > > > > > > > > > > > > > > > Unless otherwise stated above: > > > > > > > > > > > > > > > > > > > > > > IBM United Kingdom Limited > > > > > > > > > > > Registered in England and Wales with number 741598 > > > > > > > > > > > Registered office: PO Box 41, North Harbour, > Portsmouth, > > > > Hants. > > > > > > PO6 > > > > > > > > 3AU > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Unless otherwise stated above: > > > > > > IBM United Kingdom Limited > > > Registered in England and Wales with number 741598 > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > > > > > > Unless otherwise stated above: > > > > IBM United Kingdom Limited > > Registered in England and Wales with number 741598 > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > >