@Arvid: Could you take a look at this.

Thanks a lot.

On Wed, Aug 14, 2024 at 6:19 PM David Radley <david_rad...@uk.ibm.com>
wrote:

> Hi Danny,
> Thank you for your feedback. I have brought a fork up to date with one
> commit so you can
> see the code; I need to test more scenarios and add more unit tests, but
> it shows the basic idea of the design, Chesnay and I agreed.
>
>  Kind regards, David.
>
>
> From: Danny Cranmer <dannycran...@apache.org>
> Date: Thursday, 25 July 2024 at 16:46
> To: dev@flink.apache.org <dev@flink.apache.org>
> Cc: David Radley <david_rad...@uk.ibm.com>
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> Hey, Thanks for the updated FLIP. > 1. 20 or beyond Given this is intended
> to go in the Kafka connector we can target the next Kafka connector version
> (v3. 3. 0) for Flink 1. 18 and 1. 19 (or 1. 19 and 1. 20, if 1. 20 is
> released first). > RecordBasedSerialization: : public
>
> Hey,
>
> Thanks for the updated FLIP.
>
> > 1.20 or beyond
>
> Given this is intended to go in the Kafka connector we can target the next
> Kafka connector version (v3.3.0) for Flink 1.18 and 1.19 (or 1.19 and 1.20,
> if 1.20 is released first).
>
>
> >> Agreed
>
>  > RecordBasedSerialization::public boolean canProcess(byte[]
> customSerialisation);
>
> Not sure what this is for, since we are passing a byte array and asking if
> it can be serialized? Can you elaborate how the Kafka connector interacts
> with these interfaces?
> >> we are asking it can be processed. The canProcess has a default and
> also can be overridden. The idea is that if we are in another format and
> the apicurio jar is there then we might pick up the apicurio jar in the
> discovery. So we check that what we have discovered really can process the
> message. During trsting I have the key as raw and the value as apicurio,
> and saw the issue that necessitates this method.
>
>
> > byte[] customSerialisation
>
> There are a lot of byte arrays being passed around, I assume we are not
> doing heavy conversions multiple times? Is there scope to introduce a new
> intermediate type to prevent duplicate processing cycles?
> Or even better, do you have a fork with these changes you can share?
> >> https://github.com/davidradl/flink-connector-kafka/tree/FLINK-35311-3
>
> > Format 'avro-apicurio'
>
> Since this is Kafka only I would be tempted to include Kafka in this tag,
> just in case we decide to add a Kafka agnostic format in the future.
> Additionally this makes it clearer to the user it is Kafka only.
> >> I was keeping this in line with the avro-confluent naming which is also
> Kafka only. I do not have strong views on this – so I can change if we
> think it is better.
>
> Thanks,
> Danny
>
>
>
> On Thu, Jul 18, 2024 at 6:32 PM Martijn Visser <martijnvis...@apache.org
> <mailto:martijnvis...@apache.org>> wrote:
> Hi David,
>
> The FLIP is updated.
>
> Cheers, Martijn
>
>
> On Tue, Jul 16, 2024 at 6:33 PM David Radley <david_rad...@uk.ibm.com
> <mailto:david_rad...@uk.ibm.com>>
> wrote:
>
> > Hello all,
> >
> >
> >
> > I have prototyped the new design and confirmed it works. I have
> documented
> > the design in google doc
> >
> https://docs.google.com/document/d/1J1E-cE-X2H3-kw4rNjLn71OGPQk_Yl1iGX4-eCHWLgE/edit
> >
> >
> >
> > By copy Martijn: please could you move this content over to replace the
> > content of
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format
> >
> >
> >
> > As the design is significantly different, there will be a new period of
> > discussion before going to vote.
> >
> >
> >
> > Fyi I will be on vacation after this week until the 13th of August.
> >
> >
> >
> > Kind regards, David.
> >
> >
> >
> >
> >
> >
> >
> > *From: *David Radley <david_rad...@uk.ibm.com<mailto:
> david_rad...@uk.ibm.com>>
> > *Date: *Tuesday, 9 July 2024 at 11:17
> > *To: *dev@flink.apache.org<mailto:dev@flink.apache.org> <
> dev@flink.apache.org<mailto:dev@flink.apache.org>>
> > *Subject: *[EXTERNAL] RE: [DISCUSS] FLIP-XXX Apicurio-avro format
> >
> > Hi Kevin,
> > I have agreed a design with Chesnay and Danny. I am implementing a
> > prototype, to prove it works,  then will update the Flip text with the
> new
> > design. Initial testing is showing it working.
> >
> > Here is a quick history so you can understand our current thinking.
> >
> >   1.  Initially we passed maps for header information from the kafka
> > connector to Flink for deserialization. Similar for serialize. This was
> not
> > great, because maps are not ideal and it was a big change as it needed
> core
> > Flink interface changes
> >   2.  We then moved the Avro Apicurio format to the Kafka connector and
> > looked to discover a new record based de/serialization interface. So we
> > could pass down the record (containing the headers) rather than the
> > payload. This did not work, because there is a dependence on the Avro
> > connector that is not  aware of the new interface.
> >   3.  We considered using Thread local storage to pass the headers, we
> did
> > not like this as there was a risk of memory leaks if we did not manage
> the
> > thread well, also the contract is hidden.
> >   4.  We then came up with the current design that augments the
> > deserialization in the Kafka connector in a new discovered record based
> > deserialization, it then takes the headers out in the schema coder,
> leaving
> > the message as it was. Similar for serialization.
> >
> >
> > One piece I need to work out the details of, is how to work when there
> are
> > 2 implementations that can be discovered, probably using an augmented
> > format name as a factory identifier,
> >
> > I hope to put up a new design in the Flip by the end of next week, for
> > wider review,
> >     Kind regards, David.
> >
> >
> > From: Kevin Lam <kevin....@shopify.com.INVALID>
> > Date: Monday, 8 July 2024 at 21:16
> > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
> dev@flink.apache.org<mailto:dev@flink.apache.org>>
> > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> > Hi David,
> >
> > Any updates on the Kafka Message Header support? I am also interested in
> > supporting headers with the Flink SQL Formats:
> > https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6
> >
> > On Fri, Jun 14, 2024 at 6:10 AM David Radley <david_rad...@uk.ibm.com
> <mailto:david_rad...@uk.ibm.com>>
> > wrote:
> >
> > > Hi everyone,
> > > I have talked with Chesnay and Danny offline. Danny and I were not very
> > > happy with the passing Maps around, and were looking for a neater
> design.
> > > Chesnay suggested that we could move the new format to the Kafka
> > connector,
> > > then pass the Kafka record down to the deserialize logic so it can make
> > use
> > > of the headers during deserialization and serialisation.
> > >
> > > I think this is a neat idea. This would mean:
> > > - the Kafka connector code would need to be updated to pass down the
> > Kafka
> > > record
> > > - there would be the Avro Apicurio format and SQL in the kafka
> > repository.
> > > We feel it is unlikely to want to use the Apicurio registry with files,
> > as
> > > the Avro format could be used.
> > >
> > > Unfortunately I have found that this as not so straight forward to
> > > implement as the Avro Apicurio format uses the Avro format, which is
> tied
> > > to the DeserializationSchema. We were hoping to have a new decoding
> > > implementation that would pass down the Kafka record rather than the
> > > payload. This does not appear possible without a Avro format change.
> > >
> > >
> > > Inspired by this idea, I notice that
> > > KafkaValueOnlyRecordDeserializerWrapper<T> extends
> > > KafkaValueOnlyDeserializerWrapper
> > >
> > > Does
> > >
> > > deserializer.deserialize(record.topic(),record.value())
> > >
> > >
> > >
> > > I am investigating If I can add a factory/reflection to provide an
> > > alternative
> > > Implementation that will pass the record based (the kafka record is not
> > > serializable so I will pick what we need and deserialize) as a byte
> > array.
> > >
> > > I would need to do this 4 times (value ,key for deserialisation and
> > > serialisation. To do this I would need to convert the record into a
> byte
> > > array, so it fits into the existing interface
> (DeserializationSchema).  I
> > > think this could be a way through, to avoid using maps and avoid
> changing
> > > the existing Avro format and avoid change any core Flink interfaces.
> > >
> > > I am going to prototype this idea. WDYT?
> > >
> > > My thanks go to Chesnay and Danny for their support and insight around
> > > this Flip,
> > >    Kind regards, David.
> > >
> > >
> > >
> > >
> > >
> > >
> > > From: David Radley <david_rad...@uk.ibm.com<mailto:
> david_rad...@uk.ibm.com>>
> > > Date: Wednesday, 29 May 2024 at 11:39
> > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
> dev@flink.apache.org<mailto:dev@flink.apache.org>>
> > > Subject: [EXTERNAL] RE: [DISCUSS] FLIP-XXX Apicurio-avro format
> > > Hi Danny,
> > > Thank you for your feedback on this.
> > >
> > > I agree that using maps has pros and cons. The maps are flexible, but
> do
> > > require the sender and receiver to know what is in the map.
> > >
> > > When you say “That sounds like it would fit in better, I assume we
> cannot
> > > just take that approach?” The motivation behind this Flip is to support
> > the
> > > headers which is the usual way that Apicurio runs. We will support the
> > > “schema id in the payload” as well.
> > >
> > > I agree with you when you say “ I am not 100% happy with the solution
> > but I
> > > cannot offer a better option.” – this is a pragmatic way we have found
> to
> > > solve this issue. I am open to any suggestions to improve this as well.
> > >
> > > If we are going with the maps design (which is the best we have at the
> > > moment) ; it would be good to have the Flink core changes in base Flink
> > > version 2.0 as this would mean we do not need to use reflection in a
> > Flink
> > > Kafka version 2 connector to work out if the runtime Flink has the new
> > > methods.
> > >
> > > At this stage we only have one committer (yourself) backing this. Do
> you
> > > know of other 2 committers who would support this Flip?
> > >
> > >      Kind regards, David.
> > >
> > >
> > >
> > > From: Danny Cranmer <dannycran...@apache.org<mailto:
> dannycran...@apache.org>>
> > > Date: Friday, 24 May 2024 at 19:32
> > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
> dev@flink.apache.org<mailto:dev@flink.apache.org>>
> > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> > > Hello,
> > >
> > > > I am curious what you mean by abused.
> > >
> > > I just meant we will end up adding more and more fields to this map
> over
> > > time, and it may be hard to undo.
> > >
> > > > For Apicurio it can be sent at the start of the payload like
> Confluent
> > > Avro does. Confluent Avro have a magic byte followed by 4 bytes of
> schema
> > > id, at the start of the payload. Apicurio clients and SerDe libraries
> can
> > > be configured to not put the schema id in the headers in which case
> there
> > > is a magic byte followed by an 8 byte schema at the start of the
> payload.
> > > In the deserialization case, we would not need to look at the headers –
> > > though the encoding is also in the headers.
> > >
> > > That sounds like it would fit in better, I assume we cannot just take
> > that
> > > approach?
> > >
> > > Thanks for the discussion. I am not 100% happy with the solution but I
> > > cannot offer a better option. I would be interested to hear if others
> > have
> > > any suggestions. Playing devil's advocate against myself, we pass maps
> > > around to configure connectors so it is not too far away from that.
> > >
> > > Thanks,
> > > Danny
> > >
> > >
> > > On Fri, May 24, 2024 at 2:23 PM David Radley <david_rad...@uk.ibm.com
> <mailto:david_rad...@uk.ibm.com>>
> > > wrote:
> > >
> > > > Hi Danny,
> > > > No worries, thanks for replying. I have working prototype code that
> is
> > > > being reviewed. It needs some cleaning up and more complete testing
> > > before
> > > > it is ready, but will give you the general idea [1][2] to help to
> > assess
> > > > this approach.
> > > >
> > > >
> > > > I am curious what you mean by abused. I guess the approaches are
> > between
> > > > generic map, mechanism vs a more particular more granular things
> being
> > > > passed that might be used by another connector.
> > > >
> > > > Your first question:
> > > > “how would this work if the schema ID is not in the Kafka headers, as
> > > > hinted to in the FLIP "usually the global ID in a Kafka header"?
> > > >
> > > > For Apicurio it can be sent at the start of the payload like
> Confluent
> > > > Avro does. Confluent Avro have a magic byte followed by 4 bytes of
> > schema
> > > > id, at the start of the payload. Apicurio clients and SerDe libraries
> > can
> > > > be configured to not put the schema id in the headers in which case
> > there
> > > > is a magic byte followed by an 8 byte schema at the start of the
> > payload.
> > > > In the deserialization case, we would not need to look at the
> headers –
> > > > though the encoding is also in the headers.
> > > >
> > > > Your second question:
> > > > “I am wondering if there are any other instances where the source
> would
> > > be
> > > > aware of the schema ID and pass it through in this way?
> > > > ”
> > > > The examples I can think of are:
> > > > - Avro can send the complete schema in a header, this is not
> > recommended
> > > > but in theory fits the need for a message payload to require
> something
> > > else
> > > > to get the structure.
> > > > - I see [2] that Apicurio Protobuf uses headers.
> > > > - it might be that other message queuing projects like Rabbit MQ
> would
> > > > need this to be able to support Apicurio Avro & protobuf.
> > > >
> > > > Kind regards, David,
> > > >
> > > >
> > > >
> > > >
> > > > [1] https://github.com/apache/flink/pull/24715
> > > > [2] https://github.com/apache/flink-connector-kafka/pull/99
> > > > [3]
> > > >
> > >
> >
> https://www.apicur.io/registry/docs/apicurio-registry/2.5.x/getting-started/assembly-configuring-kafka-client-serdes.html#registry-serdes-types-json_registry
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > From: Danny Cranmer <dannycran...@apache.org<mailto:
> dannycran...@apache.org>>
> > > > Date: Friday, 24 May 2024 at 12:22
> > > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
> dev@flink.apache.org<mailto:dev@flink.apache.org>>
> > > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> > > > Hello,
> > > >
> > > > Apologies, I am on vacation and have limited access to email.
> > > >
> > > > I can see the logic here and why you ended up where you did. I can
> also
> > > see
> > > > there are other useful metadata fields that we might want to pass
> > > through,
> > > > which might result in this Map being abused (Kafka Topic, Kinesis
> > Shard,
> > > > etc).
> > > >
> > > > I have a follow up question, how would this work if the schema ID is
> > not
> > > in
> > > > the Kafka headers, as hinted to in the FLIP "usually the global ID
> in a
> > > > Kafka header"? I am wondering if there are any other instances where
> > the
> > > > source would be aware of the schema ID and pass it through in this
> way?
> > > >
> > > > Thanks,
> > > > Danny
> > > >
> > > >
> > > >
> > > > On Wed, May 22, 2024 at 3:43 PM David Radley <
> david_rad...@uk.ibm.com<mailto:david_rad...@uk.ibm.com>>
> > > > wrote:
> > > >
> > > > > Hi Danny,
> > > > > Did you have a chance you have a look at my responses to your
> > > feedback? I
> > > > > am hoping to keep the momentum going on this one,   kind regards,
> > > David.
> > > > >
> > > > >
> > > > > From: David Radley <david_rad...@uk.ibm.com<mailto:
> david_rad...@uk.ibm.com>>
> > > > > Date: Tuesday, 14 May 2024 at 17:21
> > > > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
> dev@flink.apache.org<mailto:dev@flink.apache.org>>
> > > > > Subject: [EXTERNAL] [DISCUSS] FLIP-XXX Apicurio-avro format
> > > > > Hi Danny,
> > > > >
> > > > > Thank you very much for the feedback and your support. I have
> copied
> > > your
> > > > > feedback from the VOTE thread to this discussion thread, so we can
> > > > continue
> > > > > our discussions off the VOTE thread.
> > > > >
> > > > >
> > > > >
> > > > > Your feedback:
> > > > >
> > > > > Thanks for Driving this David. I am +1 for adding support for the
> new
> > > > >
> > > > > format, however have some questions/suggestions on the details.
> > > > >
> > > > >
> > > > >
> > > > > 1. Passing around Map<String, Object> additionalInputProperties
> > feels a
> > > > bit
> > > > >
> > > > > dirty. It looks like this is mainly for the Kafka connector. This
> > > > connector
> > > > >
> > > > > already has a de/serialization schema extension to access record
> > > > >
> > > > > headers, KafkaRecordDeserializationSchema [1], can we use this
> > instead?
> > > > >
> > > > > 2. Can you elaborate why we need to change the SchemaCoder
> interface?
> > > > Again
> > > > >
> > > > > I am not a fan of adding these Map parameters
> > > > >
> > > > > 3. I assume this integration will go into the core Flink repo under
> > > > >
> > > > > flink-formats [2], and not be a separate repository like the
> > > connectors?
> > > > >
> > > > >
> > > > >
> > > > > My response:
> > > > >
> > > > > Addressing 1. and 2.
> > > > >
> > > > > I agree that sending maps around is a bit dirty. If we can see a
> > better
> > > > > way that would be great. I was looking for a way to pass this kafka
> > > > header
> > > > > information in a non-Kafka way - the most obvious way I could think
> > was
> > > > as
> > > > > a map. Here are the main considerations I saw, if I have missed
> > > anything
> > > > or
> > > > > could improve something I would be grateful for any further
> feedback.
> > > > >
> > > > >
> > > > >
> > > > >   *   I see KafkaRecordDeserializationSchema is a Kafka interface
> > that
> > > > > works at the Kafka record level (so includes the headers). We need
> a
> > > > > mechanism to send over the headers from the Kafka record to Flink
> > > > >   *   Flink core is not aware of Kafka headers, and I did not want
> to
> > > add
> > > > > a Kafka dependancy to core flink.
> > > > >   *   The formats are stateless so it did not appear to be in
> fitting
> > > > with
> > > > > the Flink architecture to pass through header information to stash
> in
> > > > state
> > > > > in the format waiting for the deserialise to be subsequently called
> > to
> > > > pick
> > > > > up the header information.
> > > > >   *   We could have used Thread local storage to stash the header
> > > > content,
> > > > > but this would be extra state to manage; and this would seem like
> an
> > > > > obtrusive change.
> > > > >   *   The SchemaCoder deserialise is where Confluent Avro gets the
> > > schema
> > > > > id from the payload, so it can lookup the schema. In line with this
> > > > > approach it made sense to extend the deserialise so it had the
> header
> > > > > contents so the Apicurio Avro format could lookup the schema.
> > > > >   *   I did not want to have Apicurio specific logic in the Kafka
> > > > > connector, if we did we could pull out the appropriate headers and
> > only
> > > > > send over the schema ids.
> > > > >   *   For deserialise, the schema id we are interested in is the
> one
> > in
> > > > > the Kafka headers on the message and is for the writer schema (an
> > Avro
> > > > > format concept) currently used by the confluent-avro format in
> > > > deserialize.
> > > > >   *   For serialise the schema ids need to be obtained from
> apicurio
> > > then
> > > > > passed through to Kafka.
> > > > >   *   For serialise there is existing logic around handling the
> > > metadata
> > > > > which includes passing the headers. But the presence of the
> metadata
> > > > would
> > > > > imply we have a metadata column. Maybe a change to the metadata
> > > mechanism
> > > > > may have allowed to use to pass the headers, but not create a
> > metadata
> > > > > column; instead I pass through the additional headers in a map to
> be
> > > > > appended.
> > > > >
> > > > >
> > > > >
> > > > > 3.
> > > > >
> > > > > Yes this integration will go into the core Flink repo under
> > > > >
> > > > > flink-formats and sit next to the confluent-avro format. The Avro
> > > format
> > > > > has the concept of a Registry and drives the confluent-avro format.
> > The
> > > > > Apicurio Avro format will use the same approach.
> > > > >
> > > > > Unless otherwise stated above:
> > > > >
> > > > > IBM United Kingdom Limited
> > > > > Registered in England and Wales with number 741598
> > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> > 3AU
> > > > >
> > > > > Unless otherwise stated above:
> > > > >
> > > > > IBM United Kingdom Limited
> > > > > Registered in England and Wales with number 741598
> > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> > 3AU
> > > > >
> > > >
> > > > Unless otherwise stated above:
> > > >
> > > > IBM United Kingdom Limited
> > > > Registered in England and Wales with number 741598
> > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
> > > >
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > >
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

Reply via email to