I've got an apache beam pipeline running on flink (1.9.1).

I've been attempting to read a RocksDB savepoint taken from this
beam-on-flink pipeline, using the state processor api, however it seems to
have some incompatibilities around namespaces.  Beam for instance uses a
String namespace, while the KeyedStateInputFormat uses the VoidNamespace.
This manifests as an exception:

Caused by: org.apache.flink.util.StateMigrationException: The new
namespace serializer must be compatible.
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:524)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
        at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)

Is there any way to let the namespace type (and value) be specified by the
user?

Reply via email to