Hi Aljoscha, I opened https://issues.apache.org/jira/browse/FLINK-8715 for the RocksDB issue with pointers to the code. Let me know if you need more details.
Best, Arvid On Tue, Feb 20, 2018 at 1:04 PM, Arvid Heise <arvid.he...@gmail.com> wrote: > Hi Aljoscha, hi Till, > > @Aljoscha, the new AvroSerializer is almost what I wanted except that > it does not use the schema of the snapshot while reading. In fact, > this version will fail with the same error as before when a field is > added or removed. > https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265 > needs to use the schema from > https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188 > as the first parameter. Accordingly, a readSchema field need to be set > in #ensureCompatibility and relayed in #duplicate. > Should I add a ticket for that as well? > > @Till concerning the poor man's migration. The doc of > #ensureCompatibility in 1.3.2 states: > > <li>{@link CompatibilityResult#compatible()}: this signals Flink that > this serializer is compatible, or > * has been reconfigured to be compatible, to continue reading > previous data, and that the > * serialization schema remains the same. No migration needs to be > performed.</li> > > The important part is the reconfiguration, which is also mentioned on > the big documentation. The default avro and kryo serializers actually > try to reconfigure themselves. > > @Aljoscha, I will open a ticket for the RocksDB thingy. I pinned the > problem down and will try to come up with an easy solution. It's a tad > hard to compare the different versions (since I'm deep into the > debugger), so I just might write a 1.3.2 ticket. > > @Till, thanks for reminding me that we are not talking about > incremental checkpoints ;) That makes it indeed much easier to > understand the whole state recovery with evolution. > > Best, > > Arvid > > On Tue, Feb 20, 2018 at 12:27 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: >> Hi Arvid, >> >> Did you check out the most recent AvroSerializer code? >> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185 >> I think this does what you're suggesting. >> >> Regarding the integration tests, if this is in fact the case it is not good >> and I would be very happy about a Jira Issue/PR there. >> >> Regarding your last point, I think that the RockDB backend stores the >> metadata, which includes the type serialiser snapshot once, and not for all >> keys or key groups. >> >> Best, >> Aljoscha >> >> >> On 20. Feb 2018, at 11:40, Arvid Heise <arvid.he...@gmail.com> wrote: >> >> Hi guys, >> >> just wanted to write about that topic on my own. >> >> The FF talk of Tzu-Li gave me also the impression that by just using >> AvroSerializer, we get some kind of state evolution for free. >> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink >> >> However, I discovered two issues on 1.3.2: >> >> 1. The AvroSerializer does not use read/write schema. The snapshot >> stores type information instead of the more plausible schema >> information. >> However, the actual type should not matter as long as a compatible >> type is used for state restoration. >> I have rewritten the AvroSerializer to store the schema in the >> snapshot config and actually uses it as a read schema during the >> initialization of the DatumReader. >> >> 2. During integration tests, it turns out that the current >> implementation of the StateDescriptor always returns copies of the >> serializer through #getSerializer. So #ensureCompatibility is invoked >> on a different serializer than the actual #deserialize method. So >> although my AvroSerializer sets the correct read schema, it is not >> used, since it is set on the wrong instance. >> I propose to make sure that #ensureCompatibility is invoked on the >> original serializer in the state descriptor. Otherwise all adjustments >> to the serializer are lost. >> >> I can provide tests and patches if needed. >> >> One related question: >> >> If I do an incremental snapshot with RocksDB backend and keyed state >> backend, is the snapshot config attached to all keys? So would the >> following work: >> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with >> snapshot. >> * Read (key1, value1) with schema1->schema2 and write with (key1, >> value1). Do cancel with snapshot. >> <Now we have two different schemas in the snapshots> >> * Read (key1, value1) with schema2 and read with (key2, value2) with >> schema1->schema2. >> >> Thanks for any feedback >> >> Arvid >> >> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <nielsdenis...@gmail.com> >> wrote: >> >> Hi Till, >> >> Thanks for the quick reply, I'm using 1.3.2 atm. >> >> Cheers, >> Niels >> >> On Feb 19, 2018 19:10, "Till Rohrmann" <trohrm...@apache.org> wrote: >> >> >> Hi Niels, >> >> which version of Flink are you using? Currently, Flink does not support to >> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described, >> it will try to use the old serializer stored in the checkpoint stream to >> restore state. >> >> I've pulled Gordon into the conversation who can tell you a little bit >> more about the current capability and limitations of state evolution. >> >> Cheers, >> Till >> >> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote: >> >> >> Hi all, >> >> I'm currently trying to use Avro in order to evolve our data present in >> Flink's Managed State. I've extended the TypeSerializer class >> successfully >> for this purpose, but still have issues using Schema Evolution. >> >> *The problem:* >> When we try to read data (deserialize from savepoint) with a new >> serialiser >> and a new schema, Flink seems to use the old schema of the old serializer >> (written to the savepoint). This results in an old GenericRecord that >> doesn't adhere to the new Avro schema. >> >> *What seems to happen to me is the following* (Say we evolve from dataV1 >> to >> dataV2): >> - State containing dataV1 is serialized with avro schema V1 to a >> check/savepoint. Along with the data, the serializer itself is written. >> - Upon restore, the old serializer is retrieved from the data (therefore >> needs to be on the classpath). Data is restored using this old >> serializer. >> The new serializer provided is only used for writes. >> >> If this is indeed the case it explains our aforementioned problem. If you >> have any pointers as to whether this is true and what a possible solution >> would be that would be very much appreciated! >> >> Thanks! >> Niels >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >> >> >> >> >> ________________________________ >> If you reply to this email, your message will be added to the discussion >> below: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html >> To unsubscribe from Managed State Custom Serializer with Avro, click here. >> NAML >> >>