Hi,
I'd like to discuss a topic, that from time to time appears in different
contexts (e.g. [1]). I'd like restate the problem in a slightly more
generic way as: "Should we have a way to completely exchange coder of a
PCollection/state of a _running_ Pipeline?". First my motivation for
this question - Beam has an extension called
beam-sdks-java-extensions-kryo, which contains a KryoCoder. This coder
uses Kryo [2] to serialize virtually any Java class into binary format.
Unfortunately, this binary representation differs between Kryo versions
and it does not contain any way to recognize which version of Kryo was
used to serialize the data. Attempt to deserialize bytes produced by
incompatible version of Kryo results in an exception. The current
version of Kryo that is used by the KryoCoder is already more than 5
years old and upgrade to newer version is needed, because the current
version does not work with JDK17+ [3]. Thus, the only option seems to be
the creation of a different Coder (e.g. Kryo5Coder), but then we need
the ability to transfer Pipelines using the old KryoCoder to the newer
one. That is, we need to completely switch coder that encodes
PCollection and/or state.
We have therefore the following options:
1) Simply ignore this and let users rerun the Pipeline from scratch.
This is possible, essentially should be applicable, but if anything
else, for some Pipelines it might be costly to reprocess all historical
data.
2) We can create the new Coder and let users use a runner-specific way
to convert the Pipeline. E.g. in case of Flink, this could be done by
converting savepoint into the new format. This requires knowledge of how
Beam stores state (namespaces) and is kind of involved on the user side.
We could probably provide runner-specific tools for this, but some
runners, in general, might not allow such state manipulation.
3) We can include the information of a Coder update into the Pipeline
and resubmit it to the runner and let the runner handle it. Upon
Pipeline restart, a runner would have to convert all state and all
inflight data from the old Coder to the new one, before resuming the
Pipeline.
Option 3) seems like the most natural, but it requires support on the
runner side.
I leave the details on how a runner would do this open, I'm currently
interested in knowing what is the community's position on this.
Jan
[1] https://lists.apache.org/thread/z2m1hg4l5k2kb7nhjkv2lnwf8g4t9wps
[2] https://github.com/EsotericSoftware/kryo
[3] https://github.com/EsotericSoftware/kryo/issues/885