Hi Robert, thanks for looping me in.

I have looked at the branch and the FLIP. Apicurio looks like a promising
alternative to Confluent SR and I'm certain that it's a good addition to
Flink.

However, at the current form it looks heavily overengineered. I'm
suspecting that comes from the attempt to do some generic implementation in
flink-core and later moving it into the kafka connector. I'm concerned for
two reasons: there is lots of code involved for a relatively minor change
that needs to be maintained and it looks like performance is going to
suffer from using SPI on the hotpath. I also don't fully understand why we
need to decide per record on which deserializer is the most suitable; I'd
expect one topic to either use Apicurio or not.

My proposal is to take a step back and ask what is really needed and
desired. I think from a high level, we need
1. a way for the user to specify that it's Apicurio and provide all the
config options
2. we need to modify the KafkaDynamicSource/Sink to use the respective
Apicurio (de)serializer
3. we need to implement the (de)serializer

Let's look at solutions
Re 1: This is straight-forward. I haven't seen this explicitly in the FLIP
and it might be worthwhile to call it out. It probably looks like this

CREATE TABLE user_created (
 ...
) WITH (

 'connector' = 'kafka',
 'topic' = 'user',
 'properties.bootstrap.servers' = '<broker>:9092',

 'key.format' = 'avro-apicurio',
 'key.avro-apicurio.url' = 'http://<apicurio>:8082',
 ...

 'value.format' = 'avro-apicurio',
 'value.avro-apicurio.url' = 'http://<apicurio>:8082',
 ...
)

Re 2: This is the hardest part as the KafkaDynamicSource/Sink basically
uses (De)serializationSchema which just knows bytes and can't use
ConsumerRecord. I see how you came to some conclusions.
However, I think there is still some easier way to achieve what you want:
We need to use the generified KafkaRecord(De)serializationSchema instead.
Let's check what needs to be done for KafkaDynamicSource (sink should be
similar):
* Replace all DeserializationSchema with KafkaRecordDeserializationSchema
in KafkaDynamicSource and DynamicKafkaDeserializationSchema
* Introduce a KafkaRecordDeserializationFormatFactory with an
implementation that delegates to existing factory and
uses KafkaRecordDeserializationSchema#valueOnly under the hood.
* Look in KafkaDynamicTableFactory for the old factory and wrap it into the
new factory. Optionally also look for the new factory (see below).

Re 3:
Implement Apicurio using KafkaRecordDeserializationSchema. This would be
directly usable in DataStream API.
For table API, you can implement the new
KafkaRecordDeserializationFormatFactory and dynamically find it. However,
as of now it seems like everything is in the same jar already. If it's
never really separated, I'm also fine to hardwire the Apicurio
implementations inside the KafkaDynamicTableFactory (so simply add it
statically to the SPI formats).

So as you can see, there is no need to introduce any kind of new SPI lookup
or did I miss something obvious?



Btw if we want to be fancy, an alternative/extended solution would be to
generify (De)serializationSchema in flink-core. A
RecordDeserializationSchema<R, T> would have a

void deserialize(R record, Collector<T> out) throws IOException;

and then you could plug in different kinds of records.
The existing DeserializationSchema would become

public interface DeserializationSchema<T> extends
RecordDeserializationSchema<byte[], T>

The KafkaRecordDeserializationSchema

public interface KafkaRecordDeserializationSchema<T> extends
RecordDeserializationSchema<ConsumerRecord<byte[], byte[]>, T>

However, that's a FLIP on its own, especially to maintain backwards
compatibility.

Best.

Arvid

PS: don't hesitate to reach out on slack for quicker feedback. we should
still primarily use the ML to keep everyone in the loop but it's usually
much faster to ping me there if you post an answer here.

On Tue, Sep 24, 2024 at 7:50 PM Robert Metzger <rmetz...@apache.org> wrote:

> @Arvid: Could you take a look at this.
>
> Thanks a lot.
>
> On Wed, Aug 14, 2024 at 6:19 PM David Radley <david_rad...@uk.ibm.com>
> wrote:
>
>> Hi Danny,
>> Thank you for your feedback. I have brought a fork up to date with one
>> commit so you can
>> see the code; I need to test more scenarios and add more unit tests, but
>> it shows the basic idea of the design, Chesnay and I agreed.
>>
>>  Kind regards, David.
>>
>>
>> From: Danny Cranmer <dannycran...@apache.org>
>> Date: Thursday, 25 July 2024 at 16:46
>> To: dev@flink.apache.org <dev@flink.apache.org>
>> Cc: David Radley <david_rad...@uk.ibm.com>
>> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
>> Hey, Thanks for the updated FLIP. > 1. 20 or beyond Given this is
>> intended to go in the Kafka connector we can target the next Kafka
>> connector version (v3. 3. 0) for Flink 1. 18 and 1. 19 (or 1. 19 and 1. 20,
>> if 1. 20 is released first). > RecordBasedSerialization: : public
>>
>> Hey,
>>
>> Thanks for the updated FLIP.
>>
>> > 1.20 or beyond
>>
>> Given this is intended to go in the Kafka connector we can target the
>> next Kafka connector version (v3.3.0) for Flink 1.18 and 1.19 (or 1.19 and
>> 1.20, if 1.20 is released first).
>>
>>
>> >> Agreed
>>
>>  > RecordBasedSerialization::public boolean canProcess(byte[]
>> customSerialisation);
>>
>> Not sure what this is for, since we are passing a byte array and asking
>> if it can be serialized? Can you elaborate how the Kafka connector
>> interacts with these interfaces?
>> >> we are asking it can be processed. The canProcess has a default and
>> also can be overridden. The idea is that if we are in another format and
>> the apicurio jar is there then we might pick up the apicurio jar in the
>> discovery. So we check that what we have discovered really can process the
>> message. During trsting I have the key as raw and the value as apicurio,
>> and saw the issue that necessitates this method.
>>
>>
>> > byte[] customSerialisation
>>
>> There are a lot of byte arrays being passed around, I assume we are not
>> doing heavy conversions multiple times? Is there scope to introduce a new
>> intermediate type to prevent duplicate processing cycles?
>> Or even better, do you have a fork with these changes you can share?
>> >> https://github.com/davidradl/flink-connector-kafka/tree/FLINK-35311-3
>>
>> > Format 'avro-apicurio'
>>
>> Since this is Kafka only I would be tempted to include Kafka in this tag,
>> just in case we decide to add a Kafka agnostic format in the future.
>> Additionally this makes it clearer to the user it is Kafka only.
>> >> I was keeping this in line with the avro-confluent naming which is
>> also Kafka only. I do not have strong views on this – so I can change if we
>> think it is better.
>>
>> Thanks,
>> Danny
>>
>>
>>
>> On Thu, Jul 18, 2024 at 6:32 PM Martijn Visser <martijnvis...@apache.org
>> <mailto:martijnvis...@apache.org>> wrote:
>> Hi David,
>>
>> The FLIP is updated.
>>
>> Cheers, Martijn
>>
>>
>> On Tue, Jul 16, 2024 at 6:33 PM David Radley <david_rad...@uk.ibm.com
>> <mailto:david_rad...@uk.ibm.com>>
>> wrote:
>>
>> > Hello all,
>> >
>> >
>> >
>> > I have prototyped the new design and confirmed it works. I have
>> documented
>> > the design in google doc
>> >
>> https://docs.google.com/document/d/1J1E-cE-X2H3-kw4rNjLn71OGPQk_Yl1iGX4-eCHWLgE/edit
>> >
>> >
>> >
>> > By copy Martijn: please could you move this content over to replace the
>> > content of
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format
>> >
>> >
>> >
>> > As the design is significantly different, there will be a new period of
>> > discussion before going to vote.
>> >
>> >
>> >
>> > Fyi I will be on vacation after this week until the 13th of August.
>> >
>> >
>> >
>> > Kind regards, David.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > *From: *David Radley <david_rad...@uk.ibm.com<mailto:
>> david_rad...@uk.ibm.com>>
>> > *Date: *Tuesday, 9 July 2024 at 11:17
>> > *To: *dev@flink.apache.org<mailto:dev@flink.apache.org> <
>> dev@flink.apache.org<mailto:dev@flink.apache.org>>
>> > *Subject: *[EXTERNAL] RE: [DISCUSS] FLIP-XXX Apicurio-avro format
>> >
>> > Hi Kevin,
>> > I have agreed a design with Chesnay and Danny. I am implementing a
>> > prototype, to prove it works,  then will update the Flip text with the
>> new
>> > design. Initial testing is showing it working.
>> >
>> > Here is a quick history so you can understand our current thinking.
>> >
>> >   1.  Initially we passed maps for header information from the kafka
>> > connector to Flink for deserialization. Similar for serialize. This was
>> not
>> > great, because maps are not ideal and it was a big change as it needed
>> core
>> > Flink interface changes
>> >   2.  We then moved the Avro Apicurio format to the Kafka connector and
>> > looked to discover a new record based de/serialization interface. So we
>> > could pass down the record (containing the headers) rather than the
>> > payload. This did not work, because there is a dependence on the Avro
>> > connector that is not  aware of the new interface.
>> >   3.  We considered using Thread local storage to pass the headers, we
>> did
>> > not like this as there was a risk of memory leaks if we did not manage
>> the
>> > thread well, also the contract is hidden.
>> >   4.  We then came up with the current design that augments the
>> > deserialization in the Kafka connector in a new discovered record based
>> > deserialization, it then takes the headers out in the schema coder,
>> leaving
>> > the message as it was. Similar for serialization.
>> >
>> >
>> > One piece I need to work out the details of, is how to work when there
>> are
>> > 2 implementations that can be discovered, probably using an augmented
>> > format name as a factory identifier,
>> >
>> > I hope to put up a new design in the Flip by the end of next week, for
>> > wider review,
>> >     Kind regards, David.
>> >
>> >
>> > From: Kevin Lam <kevin....@shopify.com.INVALID>
>> > Date: Monday, 8 July 2024 at 21:16
>> > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
>> dev@flink.apache.org<mailto:dev@flink.apache.org>>
>> > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
>> > Hi David,
>> >
>> > Any updates on the Kafka Message Header support? I am also interested in
>> > supporting headers with the Flink SQL Formats:
>> > https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6
>> >
>> > On Fri, Jun 14, 2024 at 6:10 AM David Radley <david_rad...@uk.ibm.com
>> <mailto:david_rad...@uk.ibm.com>>
>> > wrote:
>> >
>> > > Hi everyone,
>> > > I have talked with Chesnay and Danny offline. Danny and I were not
>> very
>> > > happy with the passing Maps around, and were looking for a neater
>> design.
>> > > Chesnay suggested that we could move the new format to the Kafka
>> > connector,
>> > > then pass the Kafka record down to the deserialize logic so it can
>> make
>> > use
>> > > of the headers during deserialization and serialisation.
>> > >
>> > > I think this is a neat idea. This would mean:
>> > > - the Kafka connector code would need to be updated to pass down the
>> > Kafka
>> > > record
>> > > - there would be the Avro Apicurio format and SQL in the kafka
>> > repository.
>> > > We feel it is unlikely to want to use the Apicurio registry with
>> files,
>> > as
>> > > the Avro format could be used.
>> > >
>> > > Unfortunately I have found that this as not so straight forward to
>> > > implement as the Avro Apicurio format uses the Avro format, which is
>> tied
>> > > to the DeserializationSchema. We were hoping to have a new decoding
>> > > implementation that would pass down the Kafka record rather than the
>> > > payload. This does not appear possible without a Avro format change.
>> > >
>> > >
>> > > Inspired by this idea, I notice that
>> > > KafkaValueOnlyRecordDeserializerWrapper<T> extends
>> > > KafkaValueOnlyDeserializerWrapper
>> > >
>> > > Does
>> > >
>> > > deserializer.deserialize(record.topic(),record.value())
>> > >
>> > >
>> > >
>> > > I am investigating If I can add a factory/reflection to provide an
>> > > alternative
>> > > Implementation that will pass the record based (the kafka record is
>> not
>> > > serializable so I will pick what we need and deserialize) as a byte
>> > array.
>> > >
>> > > I would need to do this 4 times (value ,key for deserialisation and
>> > > serialisation. To do this I would need to convert the record into a
>> byte
>> > > array, so it fits into the existing interface
>> (DeserializationSchema).  I
>> > > think this could be a way through, to avoid using maps and avoid
>> changing
>> > > the existing Avro format and avoid change any core Flink interfaces.
>> > >
>> > > I am going to prototype this idea. WDYT?
>> > >
>> > > My thanks go to Chesnay and Danny for their support and insight around
>> > > this Flip,
>> > >    Kind regards, David.
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > From: David Radley <david_rad...@uk.ibm.com<mailto:
>> david_rad...@uk.ibm.com>>
>> > > Date: Wednesday, 29 May 2024 at 11:39
>> > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
>> dev@flink.apache.org<mailto:dev@flink.apache.org>>
>> > > Subject: [EXTERNAL] RE: [DISCUSS] FLIP-XXX Apicurio-avro format
>> > > Hi Danny,
>> > > Thank you for your feedback on this.
>> > >
>> > > I agree that using maps has pros and cons. The maps are flexible, but
>> do
>> > > require the sender and receiver to know what is in the map.
>> > >
>> > > When you say “That sounds like it would fit in better, I assume we
>> cannot
>> > > just take that approach?” The motivation behind this Flip is to
>> support
>> > the
>> > > headers which is the usual way that Apicurio runs. We will support the
>> > > “schema id in the payload” as well.
>> > >
>> > > I agree with you when you say “ I am not 100% happy with the solution
>> > but I
>> > > cannot offer a better option.” – this is a pragmatic way we have
>> found to
>> > > solve this issue. I am open to any suggestions to improve this as
>> well.
>> > >
>> > > If we are going with the maps design (which is the best we have at the
>> > > moment) ; it would be good to have the Flink core changes in base
>> Flink
>> > > version 2.0 as this would mean we do not need to use reflection in a
>> > Flink
>> > > Kafka version 2 connector to work out if the runtime Flink has the new
>> > > methods.
>> > >
>> > > At this stage we only have one committer (yourself) backing this. Do
>> you
>> > > know of other 2 committers who would support this Flip?
>> > >
>> > >      Kind regards, David.
>> > >
>> > >
>> > >
>> > > From: Danny Cranmer <dannycran...@apache.org<mailto:
>> dannycran...@apache.org>>
>> > > Date: Friday, 24 May 2024 at 19:32
>> > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
>> dev@flink.apache.org<mailto:dev@flink.apache.org>>
>> > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
>> > > Hello,
>> > >
>> > > > I am curious what you mean by abused.
>> > >
>> > > I just meant we will end up adding more and more fields to this map
>> over
>> > > time, and it may be hard to undo.
>> > >
>> > > > For Apicurio it can be sent at the start of the payload like
>> Confluent
>> > > Avro does. Confluent Avro have a magic byte followed by 4 bytes of
>> schema
>> > > id, at the start of the payload. Apicurio clients and SerDe libraries
>> can
>> > > be configured to not put the schema id in the headers in which case
>> there
>> > > is a magic byte followed by an 8 byte schema at the start of the
>> payload.
>> > > In the deserialization case, we would not need to look at the headers
>> –
>> > > though the encoding is also in the headers.
>> > >
>> > > That sounds like it would fit in better, I assume we cannot just take
>> > that
>> > > approach?
>> > >
>> > > Thanks for the discussion. I am not 100% happy with the solution but I
>> > > cannot offer a better option. I would be interested to hear if others
>> > have
>> > > any suggestions. Playing devil's advocate against myself, we pass maps
>> > > around to configure connectors so it is not too far away from that.
>> > >
>> > > Thanks,
>> > > Danny
>> > >
>> > >
>> > > On Fri, May 24, 2024 at 2:23 PM David Radley <david_rad...@uk.ibm.com
>> <mailto:david_rad...@uk.ibm.com>>
>> > > wrote:
>> > >
>> > > > Hi Danny,
>> > > > No worries, thanks for replying. I have working prototype code that
>> is
>> > > > being reviewed. It needs some cleaning up and more complete testing
>> > > before
>> > > > it is ready, but will give you the general idea [1][2] to help to
>> > assess
>> > > > this approach.
>> > > >
>> > > >
>> > > > I am curious what you mean by abused. I guess the approaches are
>> > between
>> > > > generic map, mechanism vs a more particular more granular things
>> being
>> > > > passed that might be used by another connector.
>> > > >
>> > > > Your first question:
>> > > > “how would this work if the schema ID is not in the Kafka headers,
>> as
>> > > > hinted to in the FLIP "usually the global ID in a Kafka header"?
>> > > >
>> > > > For Apicurio it can be sent at the start of the payload like
>> Confluent
>> > > > Avro does. Confluent Avro have a magic byte followed by 4 bytes of
>> > schema
>> > > > id, at the start of the payload. Apicurio clients and SerDe
>> libraries
>> > can
>> > > > be configured to not put the schema id in the headers in which case
>> > there
>> > > > is a magic byte followed by an 8 byte schema at the start of the
>> > payload.
>> > > > In the deserialization case, we would not need to look at the
>> headers –
>> > > > though the encoding is also in the headers.
>> > > >
>> > > > Your second question:
>> > > > “I am wondering if there are any other instances where the source
>> would
>> > > be
>> > > > aware of the schema ID and pass it through in this way?
>> > > > ”
>> > > > The examples I can think of are:
>> > > > - Avro can send the complete schema in a header, this is not
>> > recommended
>> > > > but in theory fits the need for a message payload to require
>> something
>> > > else
>> > > > to get the structure.
>> > > > - I see [2] that Apicurio Protobuf uses headers.
>> > > > - it might be that other message queuing projects like Rabbit MQ
>> would
>> > > > need this to be able to support Apicurio Avro & protobuf.
>> > > >
>> > > > Kind regards, David,
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > [1] https://github.com/apache/flink/pull/24715
>> > > > [2] https://github.com/apache/flink-connector-kafka/pull/99
>> > > > [3]
>> > > >
>> > >
>> >
>> https://www.apicur.io/registry/docs/apicurio-registry/2.5.x/getting-started/assembly-configuring-kafka-client-serdes.html#registry-serdes-types-json_registry
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > From: Danny Cranmer <dannycran...@apache.org<mailto:
>> dannycran...@apache.org>>
>> > > > Date: Friday, 24 May 2024 at 12:22
>> > > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
>> dev@flink.apache.org<mailto:dev@flink.apache.org>>
>> > > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
>> > > > Hello,
>> > > >
>> > > > Apologies, I am on vacation and have limited access to email.
>> > > >
>> > > > I can see the logic here and why you ended up where you did. I can
>> also
>> > > see
>> > > > there are other useful metadata fields that we might want to pass
>> > > through,
>> > > > which might result in this Map being abused (Kafka Topic, Kinesis
>> > Shard,
>> > > > etc).
>> > > >
>> > > > I have a follow up question, how would this work if the schema ID is
>> > not
>> > > in
>> > > > the Kafka headers, as hinted to in the FLIP "usually the global ID
>> in a
>> > > > Kafka header"? I am wondering if there are any other instances where
>> > the
>> > > > source would be aware of the schema ID and pass it through in this
>> way?
>> > > >
>> > > > Thanks,
>> > > > Danny
>> > > >
>> > > >
>> > > >
>> > > > On Wed, May 22, 2024 at 3:43 PM David Radley <
>> david_rad...@uk.ibm.com<mailto:david_rad...@uk.ibm.com>>
>> > > > wrote:
>> > > >
>> > > > > Hi Danny,
>> > > > > Did you have a chance you have a look at my responses to your
>> > > feedback? I
>> > > > > am hoping to keep the momentum going on this one,   kind regards,
>> > > David.
>> > > > >
>> > > > >
>> > > > > From: David Radley <david_rad...@uk.ibm.com<mailto:
>> david_rad...@uk.ibm.com>>
>> > > > > Date: Tuesday, 14 May 2024 at 17:21
>> > > > > To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
>> dev@flink.apache.org<mailto:dev@flink.apache.org>>
>> > > > > Subject: [EXTERNAL] [DISCUSS] FLIP-XXX Apicurio-avro format
>> > > > > Hi Danny,
>> > > > >
>> > > > > Thank you very much for the feedback and your support. I have
>> copied
>> > > your
>> > > > > feedback from the VOTE thread to this discussion thread, so we can
>> > > > continue
>> > > > > our discussions off the VOTE thread.
>> > > > >
>> > > > >
>> > > > >
>> > > > > Your feedback:
>> > > > >
>> > > > > Thanks for Driving this David. I am +1 for adding support for the
>> new
>> > > > >
>> > > > > format, however have some questions/suggestions on the details.
>> > > > >
>> > > > >
>> > > > >
>> > > > > 1. Passing around Map<String, Object> additionalInputProperties
>> > feels a
>> > > > bit
>> > > > >
>> > > > > dirty. It looks like this is mainly for the Kafka connector. This
>> > > > connector
>> > > > >
>> > > > > already has a de/serialization schema extension to access record
>> > > > >
>> > > > > headers, KafkaRecordDeserializationSchema [1], can we use this
>> > instead?
>> > > > >
>> > > > > 2. Can you elaborate why we need to change the SchemaCoder
>> interface?
>> > > > Again
>> > > > >
>> > > > > I am not a fan of adding these Map parameters
>> > > > >
>> > > > > 3. I assume this integration will go into the core Flink repo
>> under
>> > > > >
>> > > > > flink-formats [2], and not be a separate repository like the
>> > > connectors?
>> > > > >
>> > > > >
>> > > > >
>> > > > > My response:
>> > > > >
>> > > > > Addressing 1. and 2.
>> > > > >
>> > > > > I agree that sending maps around is a bit dirty. If we can see a
>> > better
>> > > > > way that would be great. I was looking for a way to pass this
>> kafka
>> > > > header
>> > > > > information in a non-Kafka way - the most obvious way I could
>> think
>> > was
>> > > > as
>> > > > > a map. Here are the main considerations I saw, if I have missed
>> > > anything
>> > > > or
>> > > > > could improve something I would be grateful for any further
>> feedback.
>> > > > >
>> > > > >
>> > > > >
>> > > > >   *   I see KafkaRecordDeserializationSchema is a Kafka interface
>> > that
>> > > > > works at the Kafka record level (so includes the headers). We
>> need a
>> > > > > mechanism to send over the headers from the Kafka record to Flink
>> > > > >   *   Flink core is not aware of Kafka headers, and I did not
>> want to
>> > > add
>> > > > > a Kafka dependancy to core flink.
>> > > > >   *   The formats are stateless so it did not appear to be in
>> fitting
>> > > > with
>> > > > > the Flink architecture to pass through header information to
>> stash in
>> > > > state
>> > > > > in the format waiting for the deserialise to be subsequently
>> called
>> > to
>> > > > pick
>> > > > > up the header information.
>> > > > >   *   We could have used Thread local storage to stash the header
>> > > > content,
>> > > > > but this would be extra state to manage; and this would seem like
>> an
>> > > > > obtrusive change.
>> > > > >   *   The SchemaCoder deserialise is where Confluent Avro gets the
>> > > schema
>> > > > > id from the payload, so it can lookup the schema. In line with
>> this
>> > > > > approach it made sense to extend the deserialise so it had the
>> header
>> > > > > contents so the Apicurio Avro format could lookup the schema.
>> > > > >   *   I did not want to have Apicurio specific logic in the Kafka
>> > > > > connector, if we did we could pull out the appropriate headers and
>> > only
>> > > > > send over the schema ids.
>> > > > >   *   For deserialise, the schema id we are interested in is the
>> one
>> > in
>> > > > > the Kafka headers on the message and is for the writer schema (an
>> > Avro
>> > > > > format concept) currently used by the confluent-avro format in
>> > > > deserialize.
>> > > > >   *   For serialise the schema ids need to be obtained from
>> apicurio
>> > > then
>> > > > > passed through to Kafka.
>> > > > >   *   For serialise there is existing logic around handling the
>> > > metadata
>> > > > > which includes passing the headers. But the presence of the
>> metadata
>> > > > would
>> > > > > imply we have a metadata column. Maybe a change to the metadata
>> > > mechanism
>> > > > > may have allowed to use to pass the headers, but not create a
>> > metadata
>> > > > > column; instead I pass through the additional headers in a map to
>> be
>> > > > > appended.
>> > > > >
>> > > > >
>> > > > >
>> > > > > 3.
>> > > > >
>> > > > > Yes this integration will go into the core Flink repo under
>> > > > >
>> > > > > flink-formats and sit next to the confluent-avro format. The Avro
>> > > format
>> > > > > has the concept of a Registry and drives the confluent-avro
>> format.
>> > The
>> > > > > Apicurio Avro format will use the same approach.
>> > > > >
>> > > > > Unless otherwise stated above:
>> > > > >
>> > > > > IBM United Kingdom Limited
>> > > > > Registered in England and Wales with number 741598
>> > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
>> PO6
>> > 3AU
>> > > > >
>> > > > > Unless otherwise stated above:
>> > > > >
>> > > > > IBM United Kingdom Limited
>> > > > > Registered in England and Wales with number 741598
>> > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
>> PO6
>> > 3AU
>> > > > >
>> > > >
>> > > > Unless otherwise stated above:
>> > > >
>> > > > IBM United Kingdom Limited
>> > > > Registered in England and Wales with number 741598
>> > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
>> 3AU
>> > > >
>> > >
>> > > Unless otherwise stated above:
>> > >
>> > > IBM United Kingdom Limited
>> > > Registered in England and Wales with number 741598
>> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
>> 3AU
>> > >
>> > > Unless otherwise stated above:
>> > >
>> > > IBM United Kingdom Limited
>> > > Registered in England and Wales with number 741598
>> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
>> 3AU
>> > >
>> >
>> > Unless otherwise stated above:
>> >
>> > IBM United Kingdom Limited
>> > Registered in England and Wales with number 741598
>> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>> > Unless otherwise stated above:
>> >
>> > IBM United Kingdom Limited
>> > Registered in England and Wales with number 741598
>> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>> >
>>
>> Unless otherwise stated above:
>>
>> IBM United Kingdom Limited
>> Registered in England and Wales with number 741598
>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>>
>

Reply via email to