I've got an apache beam pipeline running on flink (1.9.1).
I've been attempting to read a RocksDB savepoint taken from this
beam-on-flink pipeline, using the state processor api, however it seems to
have some incompatibilities around namespaces. Beam for instance uses a
String namespace, while the KeyedStateInputFormat uses the VoidNamespace.
This manifests as an exception:
Caused by: org.apache.flink.util.StateMigrationException: The new
namespace serializer must be compatible.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:524)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
Is there any way to let the namespace type (and value) be specified by the
user?