Hi Danny,

Thank you very much for the feedback and your support. I have copied your 
feedback from the VOTE thread to this discussion thread, so we can continue our 
discussions off the VOTE thread.

Your feedback:

Thanks for Driving this David. I am +1 for adding support for the new

format, however have some questions/suggestions on the details.

1. Passing around Map<String, Object> additionalInputProperties feels a bit

dirty. It looks like this is mainly for the Kafka connector. This connector

already has a de/serialization schema extension to access record

headers, KafkaRecordDeserializationSchema [1], can we use this instead?

2. Can you elaborate why we need to change the SchemaCoder interface? Again

I am not a fan of adding these Map parameters

3. I assume this integration will go into the core Flink repo under

flink-formats [2], and not be a separate repository like the connectors?

My response:

Addressing 1. and 2.

I agree that sending maps around is a bit dirty. If we can see a better way 
that would be great. I was looking for a way to pass this kafka header 
information in a non-Kafka way - the most obvious way I could think was as a 
map. Here are the main considerations I saw, if I have missed anything or could 
improve something I would be grateful for any further feedback.

  *   I see KafkaRecordDeserializationSchema is a Kafka interface that works at 
the Kafka record level (so includes the headers). We need a mechanism to send 
over the headers from the Kafka record to Flink
  *   Flink core is not aware of Kafka headers, and I did not want to add a 
Kafka dependancy to core flink.
  *   The formats are stateless so it did not appear to be in fitting with the 
Flink architecture to pass through header information to stash in state in the 
format waiting for the deserialise to be subsequently called to pick up the 
header information.
  *   We could have used Thread local storage to stash the header content, but 
this would be extra state to manage; and this would seem like an obtrusive 
  *   The SchemaCoder deserialise is where Confluent Avro gets the schema id 
from the payload, so it can lookup the schema. In line with this approach it 
made sense to extend the deserialise so it had the header contents so the 
Apicurio Avro format could lookup the schema.
  *   I did not want to have Apicurio specific logic in the Kafka connector, if 
we did we could pull out the appropriate headers and only send over the schema 
  *   For deserialise, the schema id we are interested in is the one in the 
Kafka headers on the message and is for the writer schema (an Avro format 
concept) currently used by the confluent-avro format in deserialize.
  *   For serialise the schema ids need to be obtained from apicurio then 
passed through to Kafka.
  *   For serialise there is existing logic around handling the metadata which 
includes passing the headers. But the presence of the metadata would imply we 
have a metadata column. Maybe a change to the metadata mechanism may have 
allowed to use to pass the headers, but not create a metadata column; instead I 
pass through the additional headers in a map to be appended.


Yes this integration will go into the core Flink repo under

flink-formats and sit next to the confluent-avro format. The Avro format has 
the concept of a Registry and drives the confluent-avro format. The Apicurio 
Avro format will use the same approach.

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to