Hi David, Thanks a lot for clarification. Sounds good to me.
Regards, Jeyhun On Fri, Mar 22, 2024 at 10:54 AM David Radley <david_rad...@uk.ibm.com> wrote: > Hi Jeyhun, > Thanks for your feedback. > > So for outbound messages, the message includes the global ID. We register > the schema and match on the artifact id. So if the schema then evolved, > adding a new version, the global ID would still be unique and the same > version would be targeted. If you wanted to change the Flink table > definition in line with a higher version, then you could do this – the > artifact id would need to match for it to use the same schema and a higher > artifact version would need to be provided. I notice that Apicurio has > rules around compatibility that you can configure, I suppose if we attempt > to create an artifact that breaks these rules , then the register schema > will fail and the associated operation should fail (e.g. an insert). I have > not tried this. > > > For inbound messages, using the global id in the header – this targets one > version of the schema. I can create different messages on the topic built > with different schema versions, and I can create different tables in Flink, > as long as the reader and writer schemas are compatible as per the > https://github.com/apache/flink/blob/779459168c46b7b4c600ef52f99a5435f81b9048/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java#L109 > Then this should work. > > Does this address your question? > Kind regards, David. > > > From: Jeyhun Karimov <je.kari...@gmail.com> > Date: Thursday, 21 March 2024 at 21:06 > To: dev@flink.apache.org <dev@flink.apache.org> > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format > Hi David, > > Thanks for the FLIP. +1 for it. > I have a minor comment. > > Can you please elaborate more on mechanisms in place to ensure data > consistency and integrity, particularly in the event of schema conflicts? > Since each message includes a schema ID for inbound and outbound messages, > can you elaborate more on message consistency in the context of schema > evolution? > > Regards, > Jeyhun > > > > > > On Wed, Mar 20, 2024 at 4:34 PM David Radley <david...@apache.org> wrote: > > > Thank you very much for your feedback Mark. I have made the changes in > the > > latest google document. On reflection I agree with you that the > > globalIdPlacement format configuration should apply to the > deserialization > > as well, so it is declarative. I am also going to have a new > configuration > > option to work with content IDs as well as global IDs. In line with the > > deser Apicurio IdHandler and headerHandlers. > > > > kind regards, David. > > > > > > On 2024/03/20 15:18:37 Mark Nuttall wrote: > > > +1 to this > > > > > > A few small comments: > > > > > > Currently, if users have Avro schemas in an Apicurio Registry (an open > > source Apache 2 licensed schema registry), then the natural way to work > > with those Avro flows is to use the schemas in the Apicurio Repository. > > > 'those Avro flows' ... this is the first reference to flows. > > > > > > The new format will use the global Id to look up the Avro schema that > > the message was written during deserialization. > > > I get the point, phrasing is awkward. Probably you're more interested > in > > content than word polish at this point though. > > > > > > The Avro Schema Registry (apicurio-avro) format > > > The Confluent format is called avro-confluent; this should be > > avro-apicurio > > > > > > How to create tables with Apicurio-avro format > > > s/Apicurio-avro/avro-apicurio/g > > > > > > HEADER – globalId is put in the header > > > LEGACY– global Id is put in the message as a long > > > CONFLUENT - globalId is put in the message as an int. > > > Please could we specify 'four-byte int' and 'eight-byte long' ? > > > > > > For a Kafka source the globalId will be looked for in this order: > > > - In the header > > > - After a magic byte as an int > > > - After a magic byte as a long. > > > but apicurio-avro.globalid-placement has a default value of HEADER : > why > > do we have a search order as well? Isn't apicurio-avro.globalid-placement > > enough? Don't the two mechanisms conflict? > > > > > > In addition to the types listed there, Flink supports reading/writing > > nullable types. Flink maps nullable types to Avro union(something, null), > > where something is the Avro type converted from Flink type. > > > Is that definitely the right way round? I know we've had multiple > > conversations about how unions work with Flink > > > > > > This is because the writer schema is expanded, but this could not > > complete if there are circularities. > > > I understand your meaning but the sentence is awkward. > > > > > > The registered schema will be created or if it exists be updated. > > > same again > > > > > > At some stage the lowest Flink level supported by the Kafka connector > > will contain the additionalProperties methods in code flink. > > > wording > > > > > > There existing Kafka deserialization for the writer schema passes down > > the message body to be deserialised. > > > wording > > > > > > @Override > > > public void deserialize(ConsumerRecord<byte[], byte[]> message, > > Collector<T> out) > > > throws IOException { > > > Map<String, Object> additionalPropertiesMap = new HashMap<>(); > > > for (Header header : message.additionalProperties()) { > > > headersMap.put(header.key(), header.value()); > > > } > > > deserializationSchema.deserialize(message.value(), headersMap, > > out); > > > } > > > This fails to compile at headersMap. > > > > > > The input stream and additionalProperties will be sent so the Apicurio > > SchemaCoder which will try getting the globalId from the headers, then 4 > > bytes from the payload then 8 bytes from the payload. > > > I'm still stuck on apicurio-avro.globalid-placement having a default > > value of HEADER . Should we try all three, or fail if this config param > has > > a wrong value? > > > > > > Other considerations > > > The implementation does not use the Apicurio deser libraries, > > > Please can we refer to them as SerDes; this is the term used within the > > documentation that you link to > > > > > > > > > On 2024/03/20 10:09:08 David Radley wrote: > > > > Hi, > > > > As per the FLIP process I would like to raise a FLIP, but do not have > > authority, so have created a google doc for the Flip to introduce a new > > Apicurio Avro format. The document is > > > https://docs.google.com/document/d/14LWZPVFQ7F9mryJPdKXb4l32n7B0iWYkcOdEd1xTC7w/edit?usp=sharing > > > > > > > > I have prototyped a lot of the content to prove that this approach is > > feasible. I look forward to the discussion, > > > > Kind regards, David. > > > > > > > > > > > > > > > > Unless otherwise stated above: > > > > > > > > IBM United Kingdom Limited > > > > Registered in England and Wales with number 741598 > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 > 3AU > > > > > > > > > > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU >