Hello, Apologies, I am on vacation and have limited access to email.
I can see the logic here and why you ended up where you did. I can also see there are other useful metadata fields that we might want to pass through, which might result in this Map being abused (Kafka Topic, Kinesis Shard, etc). I have a follow up question, how would this work if the schema ID is not in the Kafka headers, as hinted to in the FLIP "usually the global ID in a Kafka header"? I am wondering if there are any other instances where the source would be aware of the schema ID and pass it through in this way? Thanks, Danny On Wed, May 22, 2024 at 3:43 PM David Radley <david_rad...@uk.ibm.com> wrote: > Hi Danny, > Did you have a chance you have a look at my responses to your feedback? I > am hoping to keep the momentum going on this one, kind regards, David. > > > From: David Radley <david_rad...@uk.ibm.com> > Date: Tuesday, 14 May 2024 at 17:21 > To: dev@flink.apache.org <dev@flink.apache.org> > Subject: [EXTERNAL] [DISCUSS] FLIP-XXX Apicurio-avro format > Hi Danny, > > Thank you very much for the feedback and your support. I have copied your > feedback from the VOTE thread to this discussion thread, so we can continue > our discussions off the VOTE thread. > > > > Your feedback: > > Thanks for Driving this David. I am +1 for adding support for the new > > format, however have some questions/suggestions on the details. > > > > 1. Passing around Map<String, Object> additionalInputProperties feels a bit > > dirty. It looks like this is mainly for the Kafka connector. This connector > > already has a de/serialization schema extension to access record > > headers, KafkaRecordDeserializationSchema [1], can we use this instead? > > 2. Can you elaborate why we need to change the SchemaCoder interface? Again > > I am not a fan of adding these Map parameters > > 3. I assume this integration will go into the core Flink repo under > > flink-formats [2], and not be a separate repository like the connectors? > > > > My response: > > Addressing 1. and 2. > > I agree that sending maps around is a bit dirty. If we can see a better > way that would be great. I was looking for a way to pass this kafka header > information in a non-Kafka way - the most obvious way I could think was as > a map. Here are the main considerations I saw, if I have missed anything or > could improve something I would be grateful for any further feedback. > > > > * I see KafkaRecordDeserializationSchema is a Kafka interface that > works at the Kafka record level (so includes the headers). We need a > mechanism to send over the headers from the Kafka record to Flink > * Flink core is not aware of Kafka headers, and I did not want to add > a Kafka dependancy to core flink. > * The formats are stateless so it did not appear to be in fitting with > the Flink architecture to pass through header information to stash in state > in the format waiting for the deserialise to be subsequently called to pick > up the header information. > * We could have used Thread local storage to stash the header content, > but this would be extra state to manage; and this would seem like an > obtrusive change. > * The SchemaCoder deserialise is where Confluent Avro gets the schema > id from the payload, so it can lookup the schema. In line with this > approach it made sense to extend the deserialise so it had the header > contents so the Apicurio Avro format could lookup the schema. > * I did not want to have Apicurio specific logic in the Kafka > connector, if we did we could pull out the appropriate headers and only > send over the schema ids. > * For deserialise, the schema id we are interested in is the one in > the Kafka headers on the message and is for the writer schema (an Avro > format concept) currently used by the confluent-avro format in deserialize. > * For serialise the schema ids need to be obtained from apicurio then > passed through to Kafka. > * For serialise there is existing logic around handling the metadata > which includes passing the headers. But the presence of the metadata would > imply we have a metadata column. Maybe a change to the metadata mechanism > may have allowed to use to pass the headers, but not create a metadata > column; instead I pass through the additional headers in a map to be > appended. > > > > 3. > > Yes this integration will go into the core Flink repo under > > flink-formats and sit next to the confluent-avro format. The Avro format > has the concept of a Registry and drives the confluent-avro format. The > Apicurio Avro format will use the same approach. > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU >