@Arvid: Could you take a look at this. Thanks a lot.
On Wed, Aug 14, 2024 at 6:19 PM David Radley <david_rad...@uk.ibm.com> wrote: > Hi Danny, > Thank you for your feedback. I have brought a fork up to date with one > commit so you can > see the code; I need to test more scenarios and add more unit tests, but > it shows the basic idea of the design, Chesnay and I agreed. > > Kind regards, David. > > > From: Danny Cranmer <dannycran...@apache.org> > Date: Thursday, 25 July 2024 at 16:46 > To: dev@flink.apache.org <dev@flink.apache.org> > Cc: David Radley <david_rad...@uk.ibm.com> > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format > Hey, Thanks for the updated FLIP. > 1. 20 or beyond Given this is intended > to go in the Kafka connector we can target the next Kafka connector version > (v3. 3. 0) for Flink 1. 18 and 1. 19 (or 1. 19 and 1. 20, if 1. 20 is > released first). > RecordBasedSerialization: : public > > Hey, > > Thanks for the updated FLIP. > > > 1.20 or beyond > > Given this is intended to go in the Kafka connector we can target the next > Kafka connector version (v3.3.0) for Flink 1.18 and 1.19 (or 1.19 and 1.20, > if 1.20 is released first). > > > >> Agreed > > > RecordBasedSerialization::public boolean canProcess(byte[] > customSerialisation); > > Not sure what this is for, since we are passing a byte array and asking if > it can be serialized? Can you elaborate how the Kafka connector interacts > with these interfaces? > >> we are asking it can be processed. The canProcess has a default and > also can be overridden. The idea is that if we are in another format and > the apicurio jar is there then we might pick up the apicurio jar in the > discovery. So we check that what we have discovered really can process the > message. During trsting I have the key as raw and the value as apicurio, > and saw the issue that necessitates this method. > > > > byte[] customSerialisation > > There are a lot of byte arrays being passed around, I assume we are not > doing heavy conversions multiple times? Is there scope to introduce a new > intermediate type to prevent duplicate processing cycles? > Or even better, do you have a fork with these changes you can share? > >> https://github.com/davidradl/flink-connector-kafka/tree/FLINK-35311-3 > > > Format 'avro-apicurio' > > Since this is Kafka only I would be tempted to include Kafka in this tag, > just in case we decide to add a Kafka agnostic format in the future. > Additionally this makes it clearer to the user it is Kafka only. > >> I was keeping this in line with the avro-confluent naming which is also > Kafka only. I do not have strong views on this – so I can change if we > think it is better. > > Thanks, > Danny > > > > On Thu, Jul 18, 2024 at 6:32 PM Martijn Visser <martijnvis...@apache.org > <mailto:martijnvis...@apache.org>> wrote: > Hi David, > > The FLIP is updated. > > Cheers, Martijn > > > On Tue, Jul 16, 2024 at 6:33 PM David Radley <david_rad...@uk.ibm.com > <mailto:david_rad...@uk.ibm.com>> > wrote: > > > Hello all, > > > > > > > > I have prototyped the new design and confirmed it works. I have > documented > > the design in google doc > > > https://docs.google.com/document/d/1J1E-cE-X2H3-kw4rNjLn71OGPQk_Yl1iGX4-eCHWLgE/edit > > > > > > > > By copy Martijn: please could you move this content over to replace the > > content of > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format > > > > > > > > As the design is significantly different, there will be a new period of > > discussion before going to vote. > > > > > > > > Fyi I will be on vacation after this week until the 13th of August. > > > > > > > > Kind regards, David. > > > > > > > > > > > > > > > > *From: *David Radley <david_rad...@uk.ibm.com<mailto: > david_rad...@uk.ibm.com>> > > *Date: *Tuesday, 9 July 2024 at 11:17 > > *To: *dev@flink.apache.org<mailto:dev@flink.apache.org> < > dev@flink.apache.org<mailto:dev@flink.apache.org>> > > *Subject: *[EXTERNAL] RE: [DISCUSS] FLIP-XXX Apicurio-avro format > > > > Hi Kevin, > > I have agreed a design with Chesnay and Danny. I am implementing a > > prototype, to prove it works, then will update the Flip text with the > new > > design. Initial testing is showing it working. > > > > Here is a quick history so you can understand our current thinking. > > > > 1. Initially we passed maps for header information from the kafka > > connector to Flink for deserialization. Similar for serialize. This was > not > > great, because maps are not ideal and it was a big change as it needed > core > > Flink interface changes > > 2. We then moved the Avro Apicurio format to the Kafka connector and > > looked to discover a new record based de/serialization interface. So we > > could pass down the record (containing the headers) rather than the > > payload. This did not work, because there is a dependence on the Avro > > connector that is not aware of the new interface. > > 3. We considered using Thread local storage to pass the headers, we > did > > not like this as there was a risk of memory leaks if we did not manage > the > > thread well, also the contract is hidden. > > 4. We then came up with the current design that augments the > > deserialization in the Kafka connector in a new discovered record based > > deserialization, it then takes the headers out in the schema coder, > leaving > > the message as it was. Similar for serialization. > > > > > > One piece I need to work out the details of, is how to work when there > are > > 2 implementations that can be discovered, probably using an augmented > > format name as a factory identifier, > > > > I hope to put up a new design in the Flip by the end of next week, for > > wider review, > > Kind regards, David. > > > > > > From: Kevin Lam <kevin....@shopify.com.INVALID> > > Date: Monday, 8 July 2024 at 21:16 > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> < > dev@flink.apache.org<mailto:dev@flink.apache.org>> > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format > > Hi David, > > > > Any updates on the Kafka Message Header support? I am also interested in > > supporting headers with the Flink SQL Formats: > > https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6 > > > > On Fri, Jun 14, 2024 at 6:10 AM David Radley <david_rad...@uk.ibm.com > <mailto:david_rad...@uk.ibm.com>> > > wrote: > > > > > Hi everyone, > > > I have talked with Chesnay and Danny offline. Danny and I were not very > > > happy with the passing Maps around, and were looking for a neater > design. > > > Chesnay suggested that we could move the new format to the Kafka > > connector, > > > then pass the Kafka record down to the deserialize logic so it can make > > use > > > of the headers during deserialization and serialisation. > > > > > > I think this is a neat idea. This would mean: > > > - the Kafka connector code would need to be updated to pass down the > > Kafka > > > record > > > - there would be the Avro Apicurio format and SQL in the kafka > > repository. > > > We feel it is unlikely to want to use the Apicurio registry with files, > > as > > > the Avro format could be used. > > > > > > Unfortunately I have found that this as not so straight forward to > > > implement as the Avro Apicurio format uses the Avro format, which is > tied > > > to the DeserializationSchema. We were hoping to have a new decoding > > > implementation that would pass down the Kafka record rather than the > > > payload. This does not appear possible without a Avro format change. > > > > > > > > > Inspired by this idea, I notice that > > > KafkaValueOnlyRecordDeserializerWrapper<T> extends > > > KafkaValueOnlyDeserializerWrapper > > > > > > Does > > > > > > deserializer.deserialize(record.topic(),record.value()) > > > > > > > > > > > > I am investigating If I can add a factory/reflection to provide an > > > alternative > > > Implementation that will pass the record based (the kafka record is not > > > serializable so I will pick what we need and deserialize) as a byte > > array. > > > > > > I would need to do this 4 times (value ,key for deserialisation and > > > serialisation. To do this I would need to convert the record into a > byte > > > array, so it fits into the existing interface > (DeserializationSchema). I > > > think this could be a way through, to avoid using maps and avoid > changing > > > the existing Avro format and avoid change any core Flink interfaces. > > > > > > I am going to prototype this idea. WDYT? > > > > > > My thanks go to Chesnay and Danny for their support and insight around > > > this Flip, > > > Kind regards, David. > > > > > > > > > > > > > > > > > > > > > From: David Radley <david_rad...@uk.ibm.com<mailto: > david_rad...@uk.ibm.com>> > > > Date: Wednesday, 29 May 2024 at 11:39 > > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> < > dev@flink.apache.org<mailto:dev@flink.apache.org>> > > > Subject: [EXTERNAL] RE: [DISCUSS] FLIP-XXX Apicurio-avro format > > > Hi Danny, > > > Thank you for your feedback on this. > > > > > > I agree that using maps has pros and cons. The maps are flexible, but > do > > > require the sender and receiver to know what is in the map. > > > > > > When you say “That sounds like it would fit in better, I assume we > cannot > > > just take that approach?” The motivation behind this Flip is to support > > the > > > headers which is the usual way that Apicurio runs. We will support the > > > “schema id in the payload” as well. > > > > > > I agree with you when you say “ I am not 100% happy with the solution > > but I > > > cannot offer a better option.” – this is a pragmatic way we have found > to > > > solve this issue. I am open to any suggestions to improve this as well. > > > > > > If we are going with the maps design (which is the best we have at the > > > moment) ; it would be good to have the Flink core changes in base Flink > > > version 2.0 as this would mean we do not need to use reflection in a > > Flink > > > Kafka version 2 connector to work out if the runtime Flink has the new > > > methods. > > > > > > At this stage we only have one committer (yourself) backing this. Do > you > > > know of other 2 committers who would support this Flip? > > > > > > Kind regards, David. > > > > > > > > > > > > From: Danny Cranmer <dannycran...@apache.org<mailto: > dannycran...@apache.org>> > > > Date: Friday, 24 May 2024 at 19:32 > > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> < > dev@flink.apache.org<mailto:dev@flink.apache.org>> > > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format > > > Hello, > > > > > > > I am curious what you mean by abused. > > > > > > I just meant we will end up adding more and more fields to this map > over > > > time, and it may be hard to undo. > > > > > > > For Apicurio it can be sent at the start of the payload like > Confluent > > > Avro does. Confluent Avro have a magic byte followed by 4 bytes of > schema > > > id, at the start of the payload. Apicurio clients and SerDe libraries > can > > > be configured to not put the schema id in the headers in which case > there > > > is a magic byte followed by an 8 byte schema at the start of the > payload. > > > In the deserialization case, we would not need to look at the headers – > > > though the encoding is also in the headers. > > > > > > That sounds like it would fit in better, I assume we cannot just take > > that > > > approach? > > > > > > Thanks for the discussion. I am not 100% happy with the solution but I > > > cannot offer a better option. I would be interested to hear if others > > have > > > any suggestions. Playing devil's advocate against myself, we pass maps > > > around to configure connectors so it is not too far away from that. > > > > > > Thanks, > > > Danny > > > > > > > > > On Fri, May 24, 2024 at 2:23 PM David Radley <david_rad...@uk.ibm.com > <mailto:david_rad...@uk.ibm.com>> > > > wrote: > > > > > > > Hi Danny, > > > > No worries, thanks for replying. I have working prototype code that > is > > > > being reviewed. It needs some cleaning up and more complete testing > > > before > > > > it is ready, but will give you the general idea [1][2] to help to > > assess > > > > this approach. > > > > > > > > > > > > I am curious what you mean by abused. I guess the approaches are > > between > > > > generic map, mechanism vs a more particular more granular things > being > > > > passed that might be used by another connector. > > > > > > > > Your first question: > > > > “how would this work if the schema ID is not in the Kafka headers, as > > > > hinted to in the FLIP "usually the global ID in a Kafka header"? > > > > > > > > For Apicurio it can be sent at the start of the payload like > Confluent > > > > Avro does. Confluent Avro have a magic byte followed by 4 bytes of > > schema > > > > id, at the start of the payload. Apicurio clients and SerDe libraries > > can > > > > be configured to not put the schema id in the headers in which case > > there > > > > is a magic byte followed by an 8 byte schema at the start of the > > payload. > > > > In the deserialization case, we would not need to look at the > headers – > > > > though the encoding is also in the headers. > > > > > > > > Your second question: > > > > “I am wondering if there are any other instances where the source > would > > > be > > > > aware of the schema ID and pass it through in this way? > > > > ” > > > > The examples I can think of are: > > > > - Avro can send the complete schema in a header, this is not > > recommended > > > > but in theory fits the need for a message payload to require > something > > > else > > > > to get the structure. > > > > - I see [2] that Apicurio Protobuf uses headers. > > > > - it might be that other message queuing projects like Rabbit MQ > would > > > > need this to be able to support Apicurio Avro & protobuf. > > > > > > > > Kind regards, David, > > > > > > > > > > > > > > > > > > > > [1] https://github.com/apache/flink/pull/24715 > > > > [2] https://github.com/apache/flink-connector-kafka/pull/99 > > > > [3] > > > > > > > > > > https://www.apicur.io/registry/docs/apicurio-registry/2.5.x/getting-started/assembly-configuring-kafka-client-serdes.html#registry-serdes-types-json_registry > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From: Danny Cranmer <dannycran...@apache.org<mailto: > dannycran...@apache.org>> > > > > Date: Friday, 24 May 2024 at 12:22 > > > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> < > dev@flink.apache.org<mailto:dev@flink.apache.org>> > > > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format > > > > Hello, > > > > > > > > Apologies, I am on vacation and have limited access to email. > > > > > > > > I can see the logic here and why you ended up where you did. I can > also > > > see > > > > there are other useful metadata fields that we might want to pass > > > through, > > > > which might result in this Map being abused (Kafka Topic, Kinesis > > Shard, > > > > etc). > > > > > > > > I have a follow up question, how would this work if the schema ID is > > not > > > in > > > > the Kafka headers, as hinted to in the FLIP "usually the global ID > in a > > > > Kafka header"? I am wondering if there are any other instances where > > the > > > > source would be aware of the schema ID and pass it through in this > way? > > > > > > > > Thanks, > > > > Danny > > > > > > > > > > > > > > > > On Wed, May 22, 2024 at 3:43 PM David Radley < > david_rad...@uk.ibm.com<mailto:david_rad...@uk.ibm.com>> > > > > wrote: > > > > > > > > > Hi Danny, > > > > > Did you have a chance you have a look at my responses to your > > > feedback? I > > > > > am hoping to keep the momentum going on this one, kind regards, > > > David. > > > > > > > > > > > > > > > From: David Radley <david_rad...@uk.ibm.com<mailto: > david_rad...@uk.ibm.com>> > > > > > Date: Tuesday, 14 May 2024 at 17:21 > > > > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> < > dev@flink.apache.org<mailto:dev@flink.apache.org>> > > > > > Subject: [EXTERNAL] [DISCUSS] FLIP-XXX Apicurio-avro format > > > > > Hi Danny, > > > > > > > > > > Thank you very much for the feedback and your support. I have > copied > > > your > > > > > feedback from the VOTE thread to this discussion thread, so we can > > > > continue > > > > > our discussions off the VOTE thread. > > > > > > > > > > > > > > > > > > > > Your feedback: > > > > > > > > > > Thanks for Driving this David. I am +1 for adding support for the > new > > > > > > > > > > format, however have some questions/suggestions on the details. > > > > > > > > > > > > > > > > > > > > 1. Passing around Map<String, Object> additionalInputProperties > > feels a > > > > bit > > > > > > > > > > dirty. It looks like this is mainly for the Kafka connector. This > > > > connector > > > > > > > > > > already has a de/serialization schema extension to access record > > > > > > > > > > headers, KafkaRecordDeserializationSchema [1], can we use this > > instead? > > > > > > > > > > 2. Can you elaborate why we need to change the SchemaCoder > interface? > > > > Again > > > > > > > > > > I am not a fan of adding these Map parameters > > > > > > > > > > 3. I assume this integration will go into the core Flink repo under > > > > > > > > > > flink-formats [2], and not be a separate repository like the > > > connectors? > > > > > > > > > > > > > > > > > > > > My response: > > > > > > > > > > Addressing 1. and 2. > > > > > > > > > > I agree that sending maps around is a bit dirty. If we can see a > > better > > > > > way that would be great. I was looking for a way to pass this kafka > > > > header > > > > > information in a non-Kafka way - the most obvious way I could think > > was > > > > as > > > > > a map. Here are the main considerations I saw, if I have missed > > > anything > > > > or > > > > > could improve something I would be grateful for any further > feedback. > > > > > > > > > > > > > > > > > > > > * I see KafkaRecordDeserializationSchema is a Kafka interface > > that > > > > > works at the Kafka record level (so includes the headers). We need > a > > > > > mechanism to send over the headers from the Kafka record to Flink > > > > > * Flink core is not aware of Kafka headers, and I did not want > to > > > add > > > > > a Kafka dependancy to core flink. > > > > > * The formats are stateless so it did not appear to be in > fitting > > > > with > > > > > the Flink architecture to pass through header information to stash > in > > > > state > > > > > in the format waiting for the deserialise to be subsequently called > > to > > > > pick > > > > > up the header information. > > > > > * We could have used Thread local storage to stash the header > > > > content, > > > > > but this would be extra state to manage; and this would seem like > an > > > > > obtrusive change. > > > > > * The SchemaCoder deserialise is where Confluent Avro gets the > > > schema > > > > > id from the payload, so it can lookup the schema. In line with this > > > > > approach it made sense to extend the deserialise so it had the > header > > > > > contents so the Apicurio Avro format could lookup the schema. > > > > > * I did not want to have Apicurio specific logic in the Kafka > > > > > connector, if we did we could pull out the appropriate headers and > > only > > > > > send over the schema ids. > > > > > * For deserialise, the schema id we are interested in is the > one > > in > > > > > the Kafka headers on the message and is for the writer schema (an > > Avro > > > > > format concept) currently used by the confluent-avro format in > > > > deserialize. > > > > > * For serialise the schema ids need to be obtained from > apicurio > > > then > > > > > passed through to Kafka. > > > > > * For serialise there is existing logic around handling the > > > metadata > > > > > which includes passing the headers. But the presence of the > metadata > > > > would > > > > > imply we have a metadata column. Maybe a change to the metadata > > > mechanism > > > > > may have allowed to use to pass the headers, but not create a > > metadata > > > > > column; instead I pass through the additional headers in a map to > be > > > > > appended. > > > > > > > > > > > > > > > > > > > > 3. > > > > > > > > > > Yes this integration will go into the core Flink repo under > > > > > > > > > > flink-formats and sit next to the confluent-avro format. The Avro > > > format > > > > > has the concept of a Registry and drives the confluent-avro format. > > The > > > > > Apicurio Avro format will use the same approach. > > > > > > > > > > Unless otherwise stated above: > > > > > > > > > > IBM United Kingdom Limited > > > > > Registered in England and Wales with number 741598 > > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 > > 3AU > > > > > > > > > > Unless otherwise stated above: > > > > > > > > > > IBM United Kingdom Limited > > > > > Registered in England and Wales with number 741598 > > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 > > 3AU > > > > > > > > > > > > > Unless otherwise stated above: > > > > > > > > IBM United Kingdom Limited > > > > Registered in England and Wales with number 741598 > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 > 3AU > > > > > > > > > > Unless otherwise stated above: > > > > > > IBM United Kingdom Limited > > > Registered in England and Wales with number 741598 > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > > > > > Unless otherwise stated above: > > > > > > IBM United Kingdom Limited > > > Registered in England and Wales with number 741598 > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > > > > > > Unless otherwise stated above: > > > > IBM United Kingdom Limited > > Registered in England and Wales with number 741598 > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > Unless otherwise stated above: > > > > IBM United Kingdom Limited > > Registered in England and Wales with number 741598 > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU >