Hi Danny, Did you have a chance you have a look at my responses to your feedback? I am hoping to keep the momentum going on this one, kind regards, David.
From: David Radley <david_rad...@uk.ibm.com> Date: Tuesday, 14 May 2024 at 17:21 To: dev@flink.apache.org <dev@flink.apache.org> Subject: [EXTERNAL] [DISCUSS] FLIP-XXX Apicurio-avro format Hi Danny, Thank you very much for the feedback and your support. I have copied your feedback from the VOTE thread to this discussion thread, so we can continue our discussions off the VOTE thread. Your feedback: Thanks for Driving this David. I am +1 for adding support for the new format, however have some questions/suggestions on the details. 1. Passing around Map<String, Object> additionalInputProperties feels a bit dirty. It looks like this is mainly for the Kafka connector. This connector already has a de/serialization schema extension to access record headers, KafkaRecordDeserializationSchema [1], can we use this instead? 2. Can you elaborate why we need to change the SchemaCoder interface? Again I am not a fan of adding these Map parameters 3. I assume this integration will go into the core Flink repo under flink-formats [2], and not be a separate repository like the connectors? My response: Addressing 1. and 2. I agree that sending maps around is a bit dirty. If we can see a better way that would be great. I was looking for a way to pass this kafka header information in a non-Kafka way - the most obvious way I could think was as a map. Here are the main considerations I saw, if I have missed anything or could improve something I would be grateful for any further feedback. * I see KafkaRecordDeserializationSchema is a Kafka interface that works at the Kafka record level (so includes the headers). We need a mechanism to send over the headers from the Kafka record to Flink * Flink core is not aware of Kafka headers, and I did not want to add a Kafka dependancy to core flink. * The formats are stateless so it did not appear to be in fitting with the Flink architecture to pass through header information to stash in state in the format waiting for the deserialise to be subsequently called to pick up the header information. * We could have used Thread local storage to stash the header content, but this would be extra state to manage; and this would seem like an obtrusive change. * The SchemaCoder deserialise is where Confluent Avro gets the schema id from the payload, so it can lookup the schema. In line with this approach it made sense to extend the deserialise so it had the header contents so the Apicurio Avro format could lookup the schema. * I did not want to have Apicurio specific logic in the Kafka connector, if we did we could pull out the appropriate headers and only send over the schema ids. * For deserialise, the schema id we are interested in is the one in the Kafka headers on the message and is for the writer schema (an Avro format concept) currently used by the confluent-avro format in deserialize. * For serialise the schema ids need to be obtained from apicurio then passed through to Kafka. * For serialise there is existing logic around handling the metadata which includes passing the headers. But the presence of the metadata would imply we have a metadata column. Maybe a change to the metadata mechanism may have allowed to use to pass the headers, but not create a metadata column; instead I pass through the additional headers in a map to be appended. 3. Yes this integration will go into the core Flink repo under flink-formats and sit next to the confluent-avro format. The Avro format has the concept of a Registry and drives the confluent-avro format. The Apicurio Avro format will use the same approach. Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU