Hey all,

Thanks for the ping Kevin! I put up a PR for the alternative implementation
here: https://github.com/apache/flink/pull/25114, and would love to get
this group's feedback!

TIA!

On Fri, Jul 19, 2024 at 2:17 PM Kevin Lam <kevin....@shopify.com> wrote:

> Hi all,
>
> I had chimed
> <https://github.com/apache/flink/pull/24482#issuecomment-2226279775> in
> on Anupam's PR that we might have some time to look at this issue
> FLINK-34440.
>
> Adding @David Mariassy <david.maria...@shopify.com> to this thread, as he's
> started to work
> <https://github.com/apache/flink/pull/24482#issuecomment-2239843309> on
> our implementation of FLINK-34440:
> https://github.com/Shopify/flink/tree/protobuf-confluent-dynamic-deser
>
> On Fri, Apr 19, 2024 at 12:41 AM Anupam Aggarwal <
> anupam.aggar...@gmail.com> wrote:
>
>> Thanks David.
>> That's a great idea. For deserialization the external schema Id will be
>> used to obtain a dynamic message, so in a way it has to be inline with the
>> writer schema.
>> We could limit it to serialization and rename it according to your
>> suggestion.
>>
>> Thanks
>> Anupam
>> On Tue, Apr 16, 2024 at 3:38 PM David Radley <david_rad...@uk.ibm.com>
>> wrote:
>>
>> > Hi Anupam,
>> > Thanks for your response. I was wondering around the schema id and had
>> > some thoughts:
>> >
>> > I assume that for Confluent Avro, specifying the schema is not normally
>> > done, but could be useful to force a particular shape.
>> >
>> > If you specify a schema id in the format configuration:
>> > - for deserialization : does this mean the schema id in the payload has
>> to
>> > match it. If so we lose the ability to have multiple versions of the
>> schema
>> > on a topic. For me schemaId makes less sense for deserialization as the
>> > existing mechanism used by Avro / confluent avro formats is working
>> well.
>> >
>> > - I can see it makes sense for the serialization where there is an
>> > existing schema in the registry you want to target.
>> >
>> > I suggest the schemaId be called something like schemaIdForSink or
>> > schemaIdForSerilization; to prevent confusion with the deserialization
>> > case. We could have the schema as you suggest so we are compatible with
>> the
>> > confluent avro format.
>> >
>> >
>> > WDYT?
>> >     Kind regards, David.
>> >
>> >
>> > From: Anupam Aggarwal <anupam.aggar...@gmail.com>
>> > Date: Saturday, 13 April 2024 at 16:08
>> > To: dev@flink.apache.org <dev@flink.apache.org>
>> > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf
>> > Confluent Format
>> > Hi David,
>> >
>> > Thank you for the suggestion.
>> > IIUC, you are proposing using an explicit schema string, instead of the
>> > schemaID.
>> > This makes sense, as it would make the behavior consistent with Avro,
>> > although a bit more verbose from a config standpoint.
>> >
>> > If we go via the schema string route, the user would have to ensure that
>> > the input schema string corresponds to an existing schemaID.
>> > This however, might end up registering a new id (based on
>> >
>> >
>> https://github.com/confluentinc/schema-registry/issues/878#issuecomment-437510493
>> > ).
>> >
>> > How about adding both the options (explicit schema string/ schemaID).
>> > If a schema string is specified we register a new schemaID, if the user
>> > specifies an explicit schemaID we just use it directly?
>> >
>> > Thanks
>> > Anupam
>> >
>> > On Wed, Apr 10, 2024 at 2:27 PM David Radley <david_rad...@uk.ibm.com>
>> > wrote:
>> >
>> > > Hi,
>> > > I notice in the draft pr that there is a schema id in the format
>> config.
>> > I
>> > > was wondering why? In the confluent avro and existing debezium
>> formats,
>> > > there is no schema id in the config, but there is the ability to
>> specify
>> > a
>> > > complete schema. In the protobuf format there is no schema id.
>> > >
>> > > I assume the schema id would be used during serialize in the case
>> there
>> > is
>> > > already an existing registered schema and you have its id. I see in
>> the
>> > > docs
>> > >
>> >
>> https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html
>> > > there is a serialize example where 2 schemas are registered.
>> > >
>> > > I would suggest aiming to copy what the confluent DeSer libraries do
>> > > rather than having a schema id hard coded in the config.
>> > >
>> > > WDYT?
>> > >     Kind regards, David.
>> > >
>> > > From: Kevin Lam <kevin....@shopify.com.INVALID>
>> > > Date: Tuesday, 26 March 2024 at 20:06
>> > > To: dev@flink.apache.org <dev@flink.apache.org>
>> > > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium
>> Protobuf
>> > > Confluent Format
>> > > Thanks Anupam! Looking forward to it.
>> > >
>> > > On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal <
>> > anupam.aggar...@gmail.com
>> > > >
>> > > wrote:
>> > >
>> > > > Hi Kevin,
>> > > >
>> > > > Thanks, these are some great points.
>> > > > Just to clarify, I do agree that the subject should be an option
>> (like
>> > in
>> > > > the case of RegistryAvroFormatFactory).
>> > > > We could fallback to subject and auto-register schemas, if schema-Id
>> > not
>> > > > provided explicitly.
>> > > > In general, I think it would be good to be more explicit about the
>> > > schemas
>> > > > used (
>> > > >
>> > > >
>> > >
>> >
>> https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
>> > > > <
>> > > >
>> > >
>> >
>> https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
>> > > > >
>> > > > ).
>> > > > This would also help prevent us from overriding the ids in
>> incompatible
>> > > > ways.
>> > > >
>> > > > Under the current implementation of FlinkToProtoSchemaConverter we
>> > might
>> > > > end up overwriting the field-Ids.
>> > > > If we are able to locate a prior schema, the approach you outlined
>> > makes
>> > > a
>> > > > lot of sense.
>> > > > Let me explore this a bit further and get back(in terms of
>> > feasibility).
>> > > >
>> > > > Thanks again!
>> > > > - Anupam
>> > > >
>> > > > On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam
>> > <kevin....@shopify.com.invalid
>> > > >
>> > > > wrote:
>> > > >
>> > > > > Hi Anupam,
>> > > > >
>> > > > > Thanks again for your work on contributing this feature back.
>> > > > >
>> > > > > Sounds good re: the refactoring/re-organizing.
>> > > > >
>> > > > > Regarding the schema-id, in my opinion this should NOT be a
>> > > configuration
>> > > > > option on the format. We should be able to deterministically map
>> the
>> > > > Flink
>> > > > > type to the ProtoSchema and register that with the Schema
>> Registry.
>> > > > >
>> > > > > I think it can make sense to provide the `subject` as a
>> parameter, so
>> > > > that
>> > > > > the serialization format can look up existing schemas.
>> > > > >
>> > > > > This would be used in something I mentioned something related
>> above
>> > > > >
>> > > > > > Another topic I had is Protobuf's field ids. Ideally in Flink it
>> > > would
>> > > > be
>> > > > > > nice if we are idiomatic about not renumbering them in
>> incompatible
>> > > > ways,
>> > > > > > similar to what's discussed on the Schema Registry issue here:
>> > > > > > https://github.com/confluentinc/schema-registry/issues/2551
>> > > > >
>> > > > >
>> > > > > When we construct the ProtobufSchema from the Flink LogicalType,
>> we
>> > > > > shouldn't renumber the field ids in an incompatible way, so one
>> > > approach
>> > > > > would be to use the subject to look up the most recent version,
>> and
>> > use
>> > > > > that to evolve the field ids correctly.
>> > > > >
>> > > > >
>> > > > > On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal <
>> > > > anupam.aggar...@gmail.com
>> > > > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Kevin,
>> > > > > >
>> > > > > > Thanks for starting the discussion on this.
>> > > > > > I will be working on contributing this feature back (This was
>> > > developed
>> > > > > by
>> > > > > > Dawid Wysakowicz and others at Confluent).
>> > > > > >
>> > > > > > I have opened a (very initial) draft PR here
>> > > > > > https://github.com/apache/flink/pull/24482  with our current
>> > > > > > implementation.
>> > > > > > Thanks for the feedback on the PR, I haven’t gotten around to
>> > > > > > re-organizing/refactoring the classes yet, but it would be
>> inline
>> > > with
>> > > > > some
>> > > > > > of your comments.
>> > > > > >
>> > > > > > On the overall approach there are some slight variations from
>> the
>> > > > initial
>> > > > > > proposal.
>> > > > > > Our implementation relies on an explicit schema-id being passed
>> > > through
>> > > > > the
>> > > > > > config. This could help in cases where one Flink type could
>> > > potentially
>> > > > > map
>> > > > > > to multiple proto types.
>> > > > > > We could make the schema-Id optional and fall back to deriving
>> it
>> > > from
>> > > > > the
>> > > > > > rowType (during serialization) if not present?
>> > > > > >
>> > > > > > The message index handling is still TBD. I am thinking of
>> > replicating
>> > > > > logic
>> > > > > > in AbstractKafkaProtobufSerializer.java
>> > > > > > <
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/confluentinc/schema-registry/blob/342c8a9d3854d4253d785214f5dcfb1b6cc59a06/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufSerializer.java#L157
>> > > > > > >
>> > > > > >  (|Deserializer).
>> > > > > > Please let me know if this makes sense / or in case you have any
>> > > other
>> > > > > > feedback.
>> > > > > >
>> > > > > > Thanks
>> > > > > > Anupam
>> > > > > >
>> > > > > > On Thu, Feb 29, 2024 at 8:54 PM Kevin Lam
>> > > > <kevin....@shopify.com.invalid
>> > > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hey Robert,
>> > > > > > >
>> > > > > > > Awesome thanks, that timeline works for me. Sounds good re:
>> > > deciding
>> > > > on
>> > > > > > > FLIP once we have the PR, and thanks for looking into the
>> field
>> > > ids.
>> > > > > > >
>> > > > > > > Looking forward to it!
>> > > > > > >
>> > > > > > > On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger <
>> > > metrob...@gmail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hey Kevin,
>> > > > > > > >
>> > > > > > > > Thanks a lot. Then let's contribute the Confluent
>> > implementation
>> > > to
>> > > > > > > > apache/flink. We can't start working on this immediately
>> > because
>> > > > of a
>> > > > > > > team
>> > > > > > > > event next week, but within the next two weeks, we will
>> start
>> > > > working
>> > > > > > on
>> > > > > > > > this.
>> > > > > > > > It probably makes sense for us to open a pull request of
>> what
>> > we
>> > > > have
>> > > > > > > > already, so that you can start reviewing and maybe also
>> > > > contributing
>> > > > > to
>> > > > > > > the
>> > > > > > > > PR.
>> > > > > > > > I hope this timeline works for you!
>> > > > > > > >
>> > > > > > > > Let's also decide if we need a FLIP once the code is public.
>> > > > > > > > We will look into the field ids.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam
>> > > > > > <kevin....@shopify.com.invalid
>> > > > > > > >
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hey Robert,
>> > > > > > > > >
>> > > > > > > > > Thanks for your response. I have a partial implementation,
>> > just
>> > > > for
>> > > > > > the
>> > > > > > > > > decoding portion.
>> > > > > > > > >
>> > > > > > > > > The code I have is pretty rough and doesn't do any of the
>> > > > > refactors I
>> > > > > > > > > mentioned, but the decoder logic does pull the schema from
>> > the
>> > > > > schema
>> > > > > > > > > registry and use that to deserialize the DynamicMessage
>> > before
>> > > > > > > converting
>> > > > > > > > > it to RowData using a DynamicMessageToRowDataConverter
>> class.
>> > > For
>> > > > > the
>> > > > > > > > other
>> > > > > > > > > aspects, I would need to start from scratch for the
>> encoder.
>> > > > > > > > >
>> > > > > > > > > Would be very happy to see you drive the contribution
>> back to
>> > > > open
>> > > > > > > source
>> > > > > > > > > from Confluent, or collaborate on this.
>> > > > > > > > >
>> > > > > > > > > Another topic I had is Protobuf's field ids. Ideally in
>> Flink
>> > > it
>> > > > > > would
>> > > > > > > be
>> > > > > > > > > nice if we are idiomatic about not renumbering them in
>> > > > incompatible
>> > > > > > > ways,
>> > > > > > > > > similar to what's discussed on the Schema Registry issue
>> > here:
>> > > > > > > > >
>> https://github.com/confluentinc/schema-registry/issues/2551
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger <
>> > > > > rmetz...@apache.org>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi all,
>> > > > > > > > > >
>> > > > > > > > > > +1 to support the format in Flink.
>> > > > > > > > > >
>> > > > > > > > > > @Kevin: Do you already have an implementation for this
>> > > inhouse
>> > > > > that
>> > > > > > > you
>> > > > > > > > > are
>> > > > > > > > > > looking to upstream, or would you start from scratch?
>> > > > > > > > > > I'm asking because my current employer, Confluent, has a
>> > > > Protobuf
>> > > > > > > > Schema
>> > > > > > > > > > registry implementation for Flink, and I could help
>> drive
>> > > > > > > contributing
>> > > > > > > > > this
>> > > > > > > > > > back to open source.
>> > > > > > > > > > If you already have an implementation, let's decide
>> which
>> > one
>> > > > to
>> > > > > > use
>> > > > > > > :)
>> > > > > > > > > >
>> > > > > > > > > > Best,
>> > > > > > > > > > Robert
>> > > > > > > > > >
>> > > > > > > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <
>> > > > > > > david_rad...@uk.ibm.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi Kevin,
>> > > > > > > > > > > Some thoughts on this.
>> > > > > > > > > > > I suggested an Apicurio registry format in the dev
>> list,
>> > > and
>> > > > > was
>> > > > > > > > > advised
>> > > > > > > > > > > to raise a FLIP for this, I suggest the same would
>> apply
>> > > here
>> > > > > (or
>> > > > > > > the
>> > > > > > > > > > > alternative to FLIPs if you cannot raise one). I am
>> > > > prototyping
>> > > > > > an
>> > > > > > > > Avro
>> > > > > > > > > > > Apicurio format, prior to raising the Flip,  and
>> notice
>> > > that
>> > > > > the
>> > > > > > > > > > readSchema
>> > > > > > > > > > > in the SchemaCoder only takes a byte array ,but I
>> need to
>> > > > pass
>> > > > > > down
>> > > > > > > > the
>> > > > > > > > > > > Kafka headers (where the Apicurio globalId identifying
>> > the
>> > > > > schema
>> > > > > > > > > lives).
>> > > > > > > > > > >
>> > > > > > > > > > > I assume:
>> > > > > > > > > > >
>> > > > > > > > > > >   *   for the confluent Protobuf format you would
>> extend
>> > > the
>> > > > > > > Protobuf
>> > > > > > > > > > > format to drive some Schema Registry logic for
>> Protobuf
>> > > > > (similar
>> > > > > > to
>> > > > > > > > the
>> > > > > > > > > > way
>> > > > > > > > > > > Avro does it) where the magic byte _ schema id can be
>> > > > obtained
>> > > > > > and
>> > > > > > > > the
>> > > > > > > > > > > schema looked up using the Confluent Schema registry.
>> > > > > > > > > > >   *   It would be good if any protobuf format
>> > enhancements
>> > > > for
>> > > > > > > Schema
>> > > > > > > > > > > registries pass down the Kafka headers (I am thinking
>> as
>> > a
>> > > > > > > > Map<String,
>> > > > > > > > > > > Object> for Avro) as well as the message payload so
>> > > Apicurio
>> > > > > > > registry
>> > > > > > > > > > could
>> > > > > > > > > > > work with this.
>> > > > > > > > > > >   *   It would make sense to have the Confluent schema
>> > > lookup
>> > > > > in
>> > > > > > > > common
>> > > > > > > > > > > code, which is part of the SchemaCoder readSchema
>> logic.
>> > > > > > > > > > >   *   I assume the ProtobufSchemaCoder readSchema
>> would
>> > > > return
>> > > > > a
>> > > > > > > > > Protobuf
>> > > > > > > > > > > Schema object.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > I also wondered whether these Kafka only formats
>> should
>> > be
>> > > > > moved
>> > > > > > to
>> > > > > > > > the
>> > > > > > > > > > > Kafka connector repo, or whether they might in the
>> future
>> > > be
>> > > > > used
>> > > > > > > > > outside
>> > > > > > > > > > > Kafka – e.g. Avro/Protobuf files in a database.
>> > > > > > > > > > >    Kind regards, David.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > From: Kevin Lam <kevin....@shopify.com.INVALID>
>> > > > > > > > > > > Date: Wednesday, 21 February 2024 at 18:51
>> > > > > > > > > > > To: dev@flink.apache.org <dev@flink.apache.org>
>> > > > > > > > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support
>> > Debezium
>> > > > > > Protobuf
>> > > > > > > > > > > Confluent Format
>> > > > > > > > > > > I would love to get some feedback from the community
>> on
>> > > this
>> > > > > JIRA
>> > > > > > > > > issue:
>> > > > > > > > > > >
>> > > > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
>> > > > > > > > > > >
>> > > > > > > > > > > I am looking into creating a PR and would appreciate
>> some
>> > > > > review
>> > > > > > on
>> > > > > > > > the
>> > > > > > > > > > > approach.
>> > > > > > > > > > >
>> > > > > > > > > > > In terms of design I think we can mirror the
>> > > > > > > > `debezium-avro-confluent`
>> > > > > > > > > > and
>> > > > > > > > > > > `avro-confluent` formats already available in Flink:
>> > > > > > > > > > >
>> > > > > > > > > > >    1. `protobuf-confluent` format which uses
>> > DynamicMessage
>> > > > > > > > > > >    <
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
>> > > > > > > > > > > >
>> > > > > > > > > > >    for encoding and decoding.
>> > > > > > > > > > >       - For encoding the Flink RowType will be used to
>> > > > > > dynamically
>> > > > > > > > > > create a
>> > > > > > > > > > >       Protobuf Schema and register it with the
>> Confluent
>> > > > Schema
>> > > > > > > > > > > Registry. It will
>> > > > > > > > > > >       use the same schema to construct a
>> DynamicMessage
>> > and
>> > > > > > > serialize
>> > > > > > > > > it.
>> > > > > > > > > > >       - For decoding, the schema will be fetched from
>> the
>> > > > > > registry
>> > > > > > > > and
>> > > > > > > > > > use
>> > > > > > > > > > >       DynamicMessage to deserialize and convert the
>> > > Protobuf
>> > > > > > object
>> > > > > > > > to
>> > > > > > > > > a
>> > > > > > > > > > > Flink
>> > > > > > > > > > >       RowData.
>> > > > > > > > > > >       - Note: here there is no external .proto file
>> > > > > > > > > > >    2. `debezium-avro-confluent` format which unpacks
>> the
>> > > > > Debezium
>> > > > > > > > > > Envelope
>> > > > > > > > > > >    and collects the appropriate UPDATE_BEFORE,
>> > > UPDATE_AFTER,
>> > > > > > > INSERT,
>> > > > > > > > > > DELETE
>> > > > > > > > > > >    events.
>> > > > > > > > > > >       - We may be able to refactor and reuse code from
>> > the
>> > > > > > existing
>> > > > > > > > > > >       DebeziumAvroDeserializationSchema +
>> > > > > > > > > DebeziumAvroSerializationSchema
>> > > > > > > > > > > since
>> > > > > > > > > > >       the deser logic is largely delegated to and
>> these
>> > > > Schemas
>> > > > > > are
>> > > > > > > > > > > concerned
>> > > > > > > > > > >       with the handling the Debezium envelope.
>> > > > > > > > > > >    3. Move the Confluent Schema Registry Client code
>> to a
>> > > > > > separate
>> > > > > > > > > maven
>> > > > > > > > > > >    module, flink-formats/flink-confluent-common, and
>> > extend
>> > > > it
>> > > > > to
>> > > > > > > > > support
>> > > > > > > > > > >    ProtobufSchemaProvider
>> > > > > > > > > > >    <
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
>> > > > > > > > > > > >
>> > > > > > > > > > >    .
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > Does anyone have any feedback or objections to this
>> > > approach?
>> > > > > > > > > > >
>> > > > > > > > > > > Unless otherwise stated above:
>> > > > > > > > > > >
>> > > > > > > > > > > IBM United Kingdom Limited
>> > > > > > > > > > > Registered in England and Wales with number 741598
>> > > > > > > > > > > Registered office: PO Box 41, North Harbour,
>> Portsmouth,
>> > > > Hants.
>> > > > > > PO6
>> > > > > > > > 3AU
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> > > Unless otherwise stated above:
>> > >
>> > > IBM United Kingdom Limited
>> > > Registered in England and Wales with number 741598
>> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
>> 3AU
>> > >
>> >
>> > Unless otherwise stated above:
>> >
>> > IBM United Kingdom Limited
>> > Registered in England and Wales with number 741598
>> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>> >
>>
>

-- 
David Mariassy
Staff Data Developer
[image: Shopify]
<https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>

Reply via email to