Thanks Seth, I'll look into rolling my own KeyedStateInputFormat. On Mon, Apr 6, 2020 at 2:50 PM Seth Wiesman <sjwies...@gmail.com> wrote:
> Hi Stephen, > > You will need to implement a custom operator and user the `transform` > method. It's not just that you need to specify the namespace type but you > will also need to look into the beam internals to see how it stores data in > flink state, how it translates between beam serializers and flink > serializers, etc. > > Seth > > On Mon, Apr 6, 2020 at 1:02 PM Stephen Patel <merli...@gmail.com> wrote: > >> I've got an apache beam pipeline running on flink (1.9.1). >> >> I've been attempting to read a RocksDB savepoint taken from this >> beam-on-flink pipeline, using the state processor api, however it seems to >> have some incompatibilities around namespaces. Beam for instance uses a >> String namespace, while the KeyedStateInputFormat uses the >> VoidNamespace. This manifests as an exception: >> >> Caused by: org.apache.flink.util.StateMigrationException: The new namespace >> serializer must be compatible. >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:524) >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) >> at >> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >> at >> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) >> at >> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) >> at >> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) >> >> Is there any way to let the namespace type (and value) be specified by >> the user? >> >