Hi David,

Any updates on the Kafka Message Header support? I am also interested in
supporting headers with the Flink SQL Formats:
https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6

On Fri, Jun 14, 2024 at 6:10 AM David Radley <david_rad...@uk.ibm.com>
wrote:

> Hi everyone,
> I have talked with Chesnay and Danny offline. Danny and I were not very
> happy with the passing Maps around, and were looking for a neater design.
> Chesnay suggested that we could move the new format to the Kafka connector,
> then pass the Kafka record down to the deserialize logic so it can make use
> of the headers during deserialization and serialisation.
>
> I think this is a neat idea. This would mean:
> - the Kafka connector code would need to be updated to pass down the Kafka
> record
> - there would be the Avro Apicurio format and SQL in the kafka repository.
> We feel it is unlikely to want to use the Apicurio registry with files, as
> the Avro format could be used.
>
> Unfortunately I have found that this as not so straight forward to
> implement as the Avro Apicurio format uses the Avro format, which is tied
> to the DeserializationSchema. We were hoping to have a new decoding
> implementation that would pass down the Kafka record rather than the
> payload. This does not appear possible without a Avro format change.
>
>
> Inspired by this idea, I notice that
> KafkaValueOnlyRecordDeserializerWrapper<T> extends
> KafkaValueOnlyDeserializerWrapper
>
> Does
>
> deserializer.deserialize(record.topic(),record.value())
>
>
>
> I am investigating If I can add a factory/reflection to provide an
> alternative
> Implementation that will pass the record based (the kafka record is not
> serializable so I will pick what we need and deserialize) as a byte array.
>
> I would need to do this 4 times (value ,key for deserialisation and
> serialisation. To do this I would need to convert the record into a byte
> array, so it fits into the existing interface (DeserializationSchema).  I
> think this could be a way through, to avoid using maps and avoid changing
> the existing Avro format and avoid change any core Flink interfaces.
>
> I am going to prototype this idea. WDYT?
>
> My thanks go to Chesnay and Danny for their support and insight around
> this Flip,
>    Kind regards, David.
>
>
>
>
>
>
> From: David Radley <david_rad...@uk.ibm.com>
> Date: Wednesday, 29 May 2024 at 11:39
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] RE: [DISCUSS] FLIP-XXX Apicurio-avro format
> Hi Danny,
> Thank you for your feedback on this.
>
> I agree that using maps has pros and cons. The maps are flexible, but do
> require the sender and receiver to know what is in the map.
>
> When you say “That sounds like it would fit in better, I assume we cannot
> just take that approach?” The motivation behind this Flip is to support the
> headers which is the usual way that Apicurio runs. We will support the
> “schema id in the payload” as well.
>
> I agree with you when you say “ I am not 100% happy with the solution but I
> cannot offer a better option.” – this is a pragmatic way we have found to
> solve this issue. I am open to any suggestions to improve this as well.
>
> If we are going with the maps design (which is the best we have at the
> moment) ; it would be good to have the Flink core changes in base Flink
> version 2.0 as this would mean we do not need to use reflection in a Flink
> Kafka version 2 connector to work out if the runtime Flink has the new
> methods.
>
> At this stage we only have one committer (yourself) backing this. Do you
> know of other 2 committers who would support this Flip?
>
>      Kind regards, David.
>
>
>
> From: Danny Cranmer <dannycran...@apache.org>
> Date: Friday, 24 May 2024 at 19:32
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> Hello,
>
> > I am curious what you mean by abused.
>
> I just meant we will end up adding more and more fields to this map over
> time, and it may be hard to undo.
>
> > For Apicurio it can be sent at the start of the payload like Confluent
> Avro does. Confluent Avro have a magic byte followed by 4 bytes of schema
> id, at the start of the payload. Apicurio clients and SerDe libraries can
> be configured to not put the schema id in the headers in which case there
> is a magic byte followed by an 8 byte schema at the start of the payload.
> In the deserialization case, we would not need to look at the headers –
> though the encoding is also in the headers.
>
> That sounds like it would fit in better, I assume we cannot just take that
> approach?
>
> Thanks for the discussion. I am not 100% happy with the solution but I
> cannot offer a better option. I would be interested to hear if others have
> any suggestions. Playing devil's advocate against myself, we pass maps
> around to configure connectors so it is not too far away from that.
>
> Thanks,
> Danny
>
>
> On Fri, May 24, 2024 at 2:23 PM David Radley <david_rad...@uk.ibm.com>
> wrote:
>
> > Hi Danny,
> > No worries, thanks for replying. I have working prototype code that is
> > being reviewed. It needs some cleaning up and more complete testing
> before
> > it is ready, but will give you the general idea [1][2] to help to assess
> > this approach.
> >
> >
> > I am curious what you mean by abused. I guess the approaches are between
> > generic map, mechanism vs a more particular more granular things being
> > passed that might be used by another connector.
> >
> > Your first question:
> > “how would this work if the schema ID is not in the Kafka headers, as
> > hinted to in the FLIP "usually the global ID in a Kafka header"?
> >
> > For Apicurio it can be sent at the start of the payload like Confluent
> > Avro does. Confluent Avro have a magic byte followed by 4 bytes of schema
> > id, at the start of the payload. Apicurio clients and SerDe libraries can
> > be configured to not put the schema id in the headers in which case there
> > is a magic byte followed by an 8 byte schema at the start of the payload.
> > In the deserialization case, we would not need to look at the headers –
> > though the encoding is also in the headers.
> >
> > Your second question:
> > “I am wondering if there are any other instances where the source would
> be
> > aware of the schema ID and pass it through in this way?
> > ”
> > The examples I can think of are:
> > - Avro can send the complete schema in a header, this is not recommended
> > but in theory fits the need for a message payload to require something
> else
> > to get the structure.
> > - I see [2] that Apicurio Protobuf uses headers.
> > - it might be that other message queuing projects like Rabbit MQ would
> > need this to be able to support Apicurio Avro & protobuf.
> >
> > Kind regards, David,
> >
> >
> >
> >
> > [1] https://github.com/apache/flink/pull/24715
> > [2] https://github.com/apache/flink-connector-kafka/pull/99
> > [3]
> >
> https://www.apicur.io/registry/docs/apicurio-registry/2.5.x/getting-started/assembly-configuring-kafka-client-serdes.html#registry-serdes-types-json_registry
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > From: Danny Cranmer <dannycran...@apache.org>
> > Date: Friday, 24 May 2024 at 12:22
> > To: dev@flink.apache.org <dev@flink.apache.org>
> > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> > Hello,
> >
> > Apologies, I am on vacation and have limited access to email.
> >
> > I can see the logic here and why you ended up where you did. I can also
> see
> > there are other useful metadata fields that we might want to pass
> through,
> > which might result in this Map being abused (Kafka Topic, Kinesis Shard,
> > etc).
> >
> > I have a follow up question, how would this work if the schema ID is not
> in
> > the Kafka headers, as hinted to in the FLIP "usually the global ID in a
> > Kafka header"? I am wondering if there are any other instances where the
> > source would be aware of the schema ID and pass it through in this way?
> >
> > Thanks,
> > Danny
> >
> >
> >
> > On Wed, May 22, 2024 at 3:43 PM David Radley <david_rad...@uk.ibm.com>
> > wrote:
> >
> > > Hi Danny,
> > > Did you have a chance you have a look at my responses to your
> feedback? I
> > > am hoping to keep the momentum going on this one,   kind regards,
> David.
> > >
> > >
> > > From: David Radley <david_rad...@uk.ibm.com>
> > > Date: Tuesday, 14 May 2024 at 17:21
> > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > Subject: [EXTERNAL] [DISCUSS] FLIP-XXX Apicurio-avro format
> > > Hi Danny,
> > >
> > > Thank you very much for the feedback and your support. I have copied
> your
> > > feedback from the VOTE thread to this discussion thread, so we can
> > continue
> > > our discussions off the VOTE thread.
> > >
> > >
> > >
> > > Your feedback:
> > >
> > > Thanks for Driving this David. I am +1 for adding support for the new
> > >
> > > format, however have some questions/suggestions on the details.
> > >
> > >
> > >
> > > 1. Passing around Map<String, Object> additionalInputProperties feels a
> > bit
> > >
> > > dirty. It looks like this is mainly for the Kafka connector. This
> > connector
> > >
> > > already has a de/serialization schema extension to access record
> > >
> > > headers, KafkaRecordDeserializationSchema [1], can we use this instead?
> > >
> > > 2. Can you elaborate why we need to change the SchemaCoder interface?
> > Again
> > >
> > > I am not a fan of adding these Map parameters
> > >
> > > 3. I assume this integration will go into the core Flink repo under
> > >
> > > flink-formats [2], and not be a separate repository like the
> connectors?
> > >
> > >
> > >
> > > My response:
> > >
> > > Addressing 1. and 2.
> > >
> > > I agree that sending maps around is a bit dirty. If we can see a better
> > > way that would be great. I was looking for a way to pass this kafka
> > header
> > > information in a non-Kafka way - the most obvious way I could think was
> > as
> > > a map. Here are the main considerations I saw, if I have missed
> anything
> > or
> > > could improve something I would be grateful for any further feedback.
> > >
> > >
> > >
> > >   *   I see KafkaRecordDeserializationSchema is a Kafka interface that
> > > works at the Kafka record level (so includes the headers). We need a
> > > mechanism to send over the headers from the Kafka record to Flink
> > >   *   Flink core is not aware of Kafka headers, and I did not want to
> add
> > > a Kafka dependancy to core flink.
> > >   *   The formats are stateless so it did not appear to be in fitting
> > with
> > > the Flink architecture to pass through header information to stash in
> > state
> > > in the format waiting for the deserialise to be subsequently called to
> > pick
> > > up the header information.
> > >   *   We could have used Thread local storage to stash the header
> > content,
> > > but this would be extra state to manage; and this would seem like an
> > > obtrusive change.
> > >   *   The SchemaCoder deserialise is where Confluent Avro gets the
> schema
> > > id from the payload, so it can lookup the schema. In line with this
> > > approach it made sense to extend the deserialise so it had the header
> > > contents so the Apicurio Avro format could lookup the schema.
> > >   *   I did not want to have Apicurio specific logic in the Kafka
> > > connector, if we did we could pull out the appropriate headers and only
> > > send over the schema ids.
> > >   *   For deserialise, the schema id we are interested in is the one in
> > > the Kafka headers on the message and is for the writer schema (an Avro
> > > format concept) currently used by the confluent-avro format in
> > deserialize.
> > >   *   For serialise the schema ids need to be obtained from apicurio
> then
> > > passed through to Kafka.
> > >   *   For serialise there is existing logic around handling the
> metadata
> > > which includes passing the headers. But the presence of the metadata
> > would
> > > imply we have a metadata column. Maybe a change to the metadata
> mechanism
> > > may have allowed to use to pass the headers, but not create a metadata
> > > column; instead I pass through the additional headers in a map to be
> > > appended.
> > >
> > >
> > >
> > > 3.
> > >
> > > Yes this integration will go into the core Flink repo under
> > >
> > > flink-formats and sit next to the confluent-avro format. The Avro
> format
> > > has the concept of a Registry and drives the confluent-avro format. The
> > > Apicurio Avro format will use the same approach.
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > >
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

Reply via email to