Hi,

I'd like to discuss a topic, that from time to time appears in different contexts (e.g. [1]). I'd like restate the problem in a slightly more generic way as: "Should we have a way to completely exchange coder of a PCollection/state of a _running_ Pipeline?". First my motivation for this question - Beam has an extension called beam-sdks-java-extensions-kryo, which contains a KryoCoder. This coder uses Kryo [2] to serialize virtually any Java class into binary format. Unfortunately, this binary representation differs between Kryo versions and it does not contain any way to recognize which version of Kryo was used to serialize the data. Attempt to deserialize bytes produced by incompatible version of Kryo results in an exception. The current version of Kryo that is used by the KryoCoder is already more than 5 years old and upgrade to newer version is needed, because the current version does not work with JDK17+ [3]. Thus, the only option seems to be the creation of a different Coder (e.g. Kryo5Coder), but then we need the ability to transfer Pipelines using the old KryoCoder to the newer one. That is, we need to completely switch coder that encodes PCollection and/or state.

We have therefore the following options:

 1) Simply ignore this and let users rerun the Pipeline from scratch. This is possible, essentially should be applicable, but if anything else, for some Pipelines it might be costly to reprocess all historical data.

 2) We can create the new Coder and let users use a runner-specific way to convert the Pipeline. E.g. in case of Flink, this could be done by converting savepoint into the new format. This requires knowledge of how Beam stores state (namespaces) and is kind of involved on the user side. We could probably provide runner-specific tools for this, but some runners, in general, might not allow such state manipulation.

 3) We can include the information of a Coder update into the Pipeline and resubmit it to the runner and let the runner handle it. Upon Pipeline restart, a runner would have to convert all state and all inflight data from the old Coder to the new one, before resuming the Pipeline.

Option 3) seems like the most natural, but it requires support on the runner side.

I leave the details on how a runner would do this open, I'm currently interested in knowing what is the community's position on this.

 Jan

[1] https://lists.apache.org/thread/z2m1hg4l5k2kb7nhjkv2lnwf8g4t9wps

[2] https://github.com/EsotericSoftware/kryo

[3] https://github.com/EsotericSoftware/kryo/issues/885

Reply via email to