Only KeyedState can be used as queryable state. So you cannot query the OperatorState. AFAIK, it should not be a problem if an operator has OperatorState and queryable KeyedState.
2017-09-07 17:01 GMT+02:00 Navneeth Krishnan <reachnavnee...@gmail.com>: > Will I be able to use both queryable MapState and union list state while > implementing the CheckpointedFunction interface? Because one of my major > requirement on that operator is to provide a queryable state and in order > to compute that state we need the common static state across all parallel > operator instances. > > Thanks. > > On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Navneeth, >> >> there's a lower level state interface that should address your >> requirements: OperatorStateStore.getUnionListState() >> >> This union list state is similar to the regular operator list state, but >> instead of splitting the list for recovery and giving out splits to >> operator instance, it restores the complete list on each operator instance. >> So it basically does a broadcast restore. If all operator have the same >> state, only one instance checkpoints its state and this state is restored >> to all other instances in case of a failure. This should also work with >> rescaling. >> The operator instance to checkpoint can be identified by >> (RuntimeContext.getIndexOfThisSubtask == 0). >> >> The OperatorStateStore is a bit hidden. You have to implement the >> CheckpointedFunction interface. When CheckpointedFunction.initializ >> eState(FunctionInitializationContext context) is called context has a >> method getOperatorStateStore(). >> >> I'd recommend to have a look at the detailed JavaDocs of all involved >> classes and methods. >> >> Hope this helps, >> Fabian >> >> >> 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan <reachnavnee...@gmail.com>: >> >>> Thanks Gordon for your response. I have around 80 parallel flatmap >>> operator instances and each instance requires 3 states. Out of which one is >>> user state in which each operator will have unique user's data and I need >>> this data to be queryable. The other two states are kind of static states >>> which are only modified when there an update message in config stream. This >>> static data could easily be around 2GB and in my previous approach I used >>> operator state where the data is retrieved inside open method across all >>> operator instances whereas checkpointed only inside one of the operator >>> instance. >>> >>> One of the issue that I have is if I change the operator parallelism how >>> would it affect the internal state? >>> >>> >>> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org >>> > wrote: >>> >>>> Hi Navneeth, >>>> >>>> Answering your three questions separately: >>>> >>>> 1. Yes. Your MapState will be backed by RocksDB, so when removing an >>>> entry >>>> from the map state, the state will be removed from the local RocksDB as >>>> well. >>>> >>>> 2. If state classes are not POJOs, they will be serialized by Kryo, >>>> unless a >>>> custom serializer is specifically specified otherwise. You can take a >>>> look >>>> at this document on how to do that [1]. >>>> >>>> 3. I might need to know more information to be able to suggest properly >>>> for >>>> this one. How are you using the "huge state values"? From what you >>>> described, it seems like you only need it on one of the parallel >>>> instances, >>>> so I'm a bit curious on what they are actually used for. Are they needed >>>> when processing your records? >>>> >>>> Cheers, >>>> Gordon >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>> dev/stream/state.html#custom-serialization-for-managed-state >>>> >>>> >>>> >>>> -- >>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/ >>>> >>> >>> >> >