Only KeyedState can be used as queryable state. So you cannot query the
OperatorState.
AFAIK, it should not be a problem if an operator has OperatorState and
queryable KeyedState.

2017-09-07 17:01 GMT+02:00 Navneeth Krishnan <reachnavnee...@gmail.com>:

> Will I be able to use both queryable MapState and union list state while
> implementing the CheckpointedFunction interface? Because one of my major
> requirement on that operator is to provide a queryable state and in order
> to compute that state we need the common static state across all parallel
> operator instances.
>
> Thanks.
>
> On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Navneeth,
>>
>> there's a lower level state interface that should address your
>> requirements: OperatorStateStore.getUnionListState()
>>
>> This union list state is similar to the regular operator list state, but
>> instead of splitting the list for recovery and giving out splits to
>> operator instance, it restores the complete list on each operator instance.
>> So it basically does a broadcast restore. If all operator have the same
>> state, only one instance checkpoints its state and this state is restored
>> to all other instances in case of a failure. This should also work with
>> rescaling.
>> The operator instance to checkpoint can be identified by
>> (RuntimeContext.getIndexOfThisSubtask == 0).
>>
>> The OperatorStateStore is a bit hidden. You have to implement the
>> CheckpointedFunction interface. When CheckpointedFunction.initializ
>> eState(FunctionInitializationContext context) is called context has a
>> method getOperatorStateStore().
>>
>> I'd recommend to have a look at the detailed JavaDocs of all involved
>> classes and methods.
>>
>> Hope this helps,
>> Fabian
>>
>>
>> 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan <reachnavnee...@gmail.com>:
>>
>>> Thanks Gordon for your response. I have around 80 parallel flatmap
>>> operator instances and each instance requires 3 states. Out of which one is
>>> user state in which each operator will have unique user's data and I need
>>> this data to be queryable. The other two states are kind of static states
>>> which are only modified when there an update message in config stream. This
>>> static data could easily be around 2GB and in my previous approach I used
>>> operator state where the data is retrieved inside open method across all
>>> operator instances whereas checkpointed only inside one of the operator
>>> instance.
>>>
>>> One of the issue that I have is if I change the operator parallelism how
>>> would it affect the internal state?
>>>
>>>
>>> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
>>> > wrote:
>>>
>>>> Hi Navneeth,
>>>>
>>>> Answering your three questions separately:
>>>>
>>>> 1. Yes. Your MapState will be backed by RocksDB, so when removing an
>>>> entry
>>>> from the map state, the state will be removed from the local RocksDB as
>>>> well.
>>>>
>>>> 2. If state classes are not POJOs, they will be serialized by Kryo,
>>>> unless a
>>>> custom serializer is specifically specified otherwise. You can take a
>>>> look
>>>> at this document on how to do that [1].
>>>>
>>>> 3. I might need to know more information to be able to suggest properly
>>>> for
>>>> this one. How are you using the "huge state values"? From what you
>>>> described, it seems like you only need it on one of the parallel
>>>> instances,
>>>> so I'm a bit curious on what they are actually used for. Are they needed
>>>> when processing your records?
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>> dev/stream/state.html#custom-serialization-for-managed-state
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/
>>>>
>>>
>>>
>>
>

Reply via email to