Thanks Seth, I'll look into rolling my own KeyedStateInputFormat.

On Mon, Apr 6, 2020 at 2:50 PM Seth Wiesman <sjwies...@gmail.com> wrote:

> Hi Stephen,
>
> You will need to implement a custom operator and user the `transform`
> method. It's not just that you need to specify the namespace type but you
> will also need to look into the beam internals to see how it stores data in
> flink state, how it translates between beam serializers and flink
> serializers, etc.
>
> Seth
>
> On Mon, Apr 6, 2020 at 1:02 PM Stephen Patel <merli...@gmail.com> wrote:
>
>> I've got an apache beam pipeline running on flink (1.9.1).
>>
>> I've been attempting to read a RocksDB savepoint taken from this
>> beam-on-flink pipeline, using the state processor api, however it seems to
>> have some incompatibilities around namespaces.  Beam for instance uses a
>> String namespace, while the KeyedStateInputFormat uses the
>> VoidNamespace. This manifests as an exception:
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new namespace 
>> serializer must be compatible.
>>      at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:524)
>>      at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>>      at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>>      at 
>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>      at 
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>>      at 
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>>      at 
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
>>      at 
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>      at 
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
>>
>> Is there any way to let the namespace type (and value) be specified by
>> the user?
>>
>

Reply via email to