[ https://issues.apache.org/jira/browse/FLINK-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925929#comment-15925929 ]
Stefan Richter edited comment on FLINK-6061 at 3/15/17 11:19 AM: ----------------------------------------------------------------- Thanks for reporting this. I agree that the information from the `IllegalStateException` is helpful to the user. I will add this to the RocksDB states. However, the behaviour is correct: there is no key context during this initialization, only when processing an element, the key that is extracted from that element determines the key context of the backend. was (Author: srichter): Thanks for reporting this. I agree that the information from the `IllegalStateException` is helpful to the user. I will add this to the RocksDB states. > NPE on TypeSerializer.serialize with a RocksDBStateBackend calling entries() > on a keyed state in the open() function > -------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-6061 > URL: https://issues.apache.org/jira/browse/FLINK-6061 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing, Streaming > Affects Versions: 1.3.0 > Reporter: Vladislav Pernin > > With a default state (heap), the call to state.entries() "nicely fails" with > a IllegalStateException : > {noformat} > Caused by: java.lang.IllegalStateException: No key set. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at > org.apache.flink.runtime.state.heap.HeapMapState.entries(HeapMapState.java:188) > at > org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77) > at > org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670) > at java.lang.Thread.run(Thread.java:745) > {noformat} > With a RocksDBStateBackend, it fails with a NPE : > {noformat} > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:64) > at > org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:27) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState.serializeCurrentKeyAndNamespace(RocksDBMapState.java:263) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState.iterator(RocksDBMapState.java:196) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState.entries(RocksDBMapState.java:143) > at > org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77) > at > org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The reason is that the record is null, because backend.getCurrentKey() is > null (not yet set) in AbstractRocksDBState. > This may also be the case for other RockDBState implementations. > You can find the reproducer here based on 1.3-SNAPSHOT (needed for the > MapState) : > https://github.com/vpernin/flink-rocksdbstate-npe > The reproducer is a non sense application. There is no MapState with TTL or > expiration yet, so the goal is to try to shrink or expire the state at some > interval. > This could be done by iterating over the entries of the state and removing > some of them. > This could probably not be done in the open() method of a rich function. > I also tried to implement CheckpointListener and to access the state content > in notifyCheckpointComplete() method, but it fails to, I guess due to the > asynchronous nature of the checkpoint. -- This message was sent by Atlassian JIRA (v6.3.15#6346)