Will I be able to use both queryable MapState and union list state while implementing the CheckpointedFunction interface? Because one of my major requirement on that operator is to provide a queryable state and in order to compute that state we need the common static state across all parallel operator instances.
Thanks. On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Navneeth, > > there's a lower level state interface that should address your > requirements: OperatorStateStore.getUnionListState() > > This union list state is similar to the regular operator list state, but > instead of splitting the list for recovery and giving out splits to > operator instance, it restores the complete list on each operator instance. > So it basically does a broadcast restore. If all operator have the same > state, only one instance checkpoints its state and this state is restored > to all other instances in case of a failure. This should also work with > rescaling. > The operator instance to checkpoint can be identified by > (RuntimeContext.getIndexOfThisSubtask > == 0). > > The OperatorStateStore is a bit hidden. You have to implement the > CheckpointedFunction interface. When > CheckpointedFunction.initializeState(FunctionInitializationContext > context) is called context has a method getOperatorStateStore(). > > I'd recommend to have a look at the detailed JavaDocs of all involved > classes and methods. > > Hope this helps, > Fabian > > > 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan <reachnavnee...@gmail.com>: > >> Thanks Gordon for your response. I have around 80 parallel flatmap >> operator instances and each instance requires 3 states. Out of which one is >> user state in which each operator will have unique user's data and I need >> this data to be queryable. The other two states are kind of static states >> which are only modified when there an update message in config stream. This >> static data could easily be around 2GB and in my previous approach I used >> operator state where the data is retrieved inside open method across all >> operator instances whereas checkpointed only inside one of the operator >> instance. >> >> One of the issue that I have is if I change the operator parallelism how >> would it affect the internal state? >> >> >> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> >> wrote: >> >>> Hi Navneeth, >>> >>> Answering your three questions separately: >>> >>> 1. Yes. Your MapState will be backed by RocksDB, so when removing an >>> entry >>> from the map state, the state will be removed from the local RocksDB as >>> well. >>> >>> 2. If state classes are not POJOs, they will be serialized by Kryo, >>> unless a >>> custom serializer is specifically specified otherwise. You can take a >>> look >>> at this document on how to do that [1]. >>> >>> 3. I might need to know more information to be able to suggest properly >>> for >>> this one. How are you using the "huge state values"? From what you >>> described, it seems like you only need it on one of the parallel >>> instances, >>> so I'm a bit curious on what they are actually used for. Are they needed >>> when processing your records? >>> >>> Cheers, >>> Gordon >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> dev/stream/state.html#custom-serialization-for-managed-state >>> >>> >>> >>> -- >>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>> ble.com/ >>> >> >> >