Hi Arvid, Did you check out the most recent AvroSerializer code? https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185 <https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185> I think this does what you're suggesting.
Regarding the integration tests, if this is in fact the case it is not good and I would be very happy about a Jira Issue/PR there. Regarding your last point, I think that the RockDB backend stores the metadata, which includes the type serialiser snapshot once, and not for all keys or key groups. Best, Aljoscha > On 20. Feb 2018, at 11:40, Arvid Heise <arvid.he...@gmail.com> wrote: > > Hi guys, > > just wanted to write about that topic on my own. > > The FF talk of Tzu-Li gave me also the impression that by just using > AvroSerializer, we get some kind of state evolution for free. > https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink > > However, I discovered two issues on 1.3.2: > > 1. The AvroSerializer does not use read/write schema. The snapshot > stores type information instead of the more plausible schema > information. > However, the actual type should not matter as long as a compatible > type is used for state restoration. > I have rewritten the AvroSerializer to store the schema in the > snapshot config and actually uses it as a read schema during the > initialization of the DatumReader. > > 2. During integration tests, it turns out that the current > implementation of the StateDescriptor always returns copies of the > serializer through #getSerializer. So #ensureCompatibility is invoked > on a different serializer than the actual #deserialize method. So > although my AvroSerializer sets the correct read schema, it is not > used, since it is set on the wrong instance. > I propose to make sure that #ensureCompatibility is invoked on the > original serializer in the state descriptor. Otherwise all adjustments > to the serializer are lost. > > I can provide tests and patches if needed. > > One related question: > > If I do an incremental snapshot with RocksDB backend and keyed state > backend, is the snapshot config attached to all keys? So would the > following work: > * Write (key1, value1) and (key2, value2) with schema1. Do cancel with > snapshot. > * Read (key1, value1) with schema1->schema2 and write with (key1, > value1). Do cancel with snapshot. > <Now we have two different schemas in the snapshots> > * Read (key1, value1) with schema2 and read with (key2, value2) with > schema1->schema2. > > Thanks for any feedback > > Arvid > > On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <nielsdenis...@gmail.com> > wrote: >> Hi Till, >> >> Thanks for the quick reply, I'm using 1.3.2 atm. >> >> Cheers, >> Niels >> >> On Feb 19, 2018 19:10, "Till Rohrmann" <trohrm...@apache.org> wrote: >>> >>> Hi Niels, >>> >>> which version of Flink are you using? Currently, Flink does not support to >>> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described, >>> it will try to use the old serializer stored in the checkpoint stream to >>> restore state. >>> >>> I've pulled Gordon into the conversation who can tell you a little bit >>> more about the current capability and limitations of state evolution. >>> >>> Cheers, >>> Till >>> >>> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote: >>>> >>>> Hi all, >>>> >>>> I'm currently trying to use Avro in order to evolve our data present in >>>> Flink's Managed State. I've extended the TypeSerializer class >>>> successfully >>>> for this purpose, but still have issues using Schema Evolution. >>>> >>>> *The problem:* >>>> When we try to read data (deserialize from savepoint) with a new >>>> serialiser >>>> and a new schema, Flink seems to use the old schema of the old serializer >>>> (written to the savepoint). This results in an old GenericRecord that >>>> doesn't adhere to the new Avro schema. >>>> >>>> *What seems to happen to me is the following* (Say we evolve from dataV1 >>>> to >>>> dataV2): >>>> - State containing dataV1 is serialized with avro schema V1 to a >>>> check/savepoint. Along with the data, the serializer itself is written. >>>> - Upon restore, the old serializer is retrieved from the data (therefore >>>> needs to be on the classpath). Data is restored using this old >>>> serializer. >>>> The new serializer provided is only used for writes. >>>> >>>> If this is indeed the case it explains our aforementioned problem. If you >>>> have any pointers as to whether this is true and what a possible solution >>>> would be that would be very much appreciated! >>>> >>>> Thanks! >>>> Niels >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>> >>> >>> >>> ________________________________ >>> If you reply to this email, your message will be added to the discussion >>> below: >>> >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html >>> To unsubscribe from Managed State Custom Serializer with Avro, click here. >>> NAML