I built the Flink master branch and tried running this simple Flink app that 
uses a Java record:

https://github.com/kurtostfeld/flink-kryo-upgrade-demo/blob/main/flink-record-demo/src/main/java/demo/app/Main.java

It fails with the normal exception that Kryo 2.x throws when you try to 
serialize a Java record. The full stack trace is here: 
https://pastebin.com/HGhGKUWt

I tried removing this line:

https://github.com/kurtostfeld/flink-kryo-upgrade-demo/blob/main/flink-record-demo/src/main/java/demo/app/Main.java#L36

and that had no impact, I got the same error.

In the other thread, you said that the plan was to use PojoSerializer to 
serialize records rather than Kryo. Currently, the Flink code bases uses Kryo 
2.x by default for generic user data types, and that will fail when the data 
type is a record or contains records. Ultimately, if Flink wants to fully 
support Java records, it seems that it has to move off of Kryo 2.x. 
PojoSerializer is part of what is basically a custom serialization library 
internal to Flink that is an alternative to Kryo. That's one option: move off 
of Kryo to a Flink-internal serialization library. The other two options are 
upgrade to the new Kryo or use a different serialization library.

The Kryo 5.5.0 upgrade PR I submitted 
(https://github.com/apache/flink/pull/22660) with FLIP 317 
(https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0)
 works with records. The Flink app linked above that uses records works with 
the PR and that's what I posted to this mailing list a few weeks ago. I rebased 
the pull request on to the latest master branch and it's passing all tests. 
From my testing, it supports stateful upgrades, including checkpoints. If you 
can demonstrate a scenario where stateful upgrades error I can try to resolve 
that.

Reply via email to