Hi,

+1 to what Stefan is suggesting , we have been using similar logic for a
while:

@Override

public void snapshotState(StateSnapshotContext context) throws Exception {

    updateBroadcastState();

    super.snapshotState(context);

}


@Override

public void initializeState(StateInitializationContext context) throws
Exception {

    super.initializeState(context);

    initBroadcastState();

}


Gyula

Stefan Richter <s.rich...@data-artisans.com> ezt írta (időpont: 2017. júl.
4., K, 15:19):

> What I mean is that you could obtain such a state in
>
> initializeState(FunctionInitializationContext context) {
> context.getOperatorStateStore().getUnionListState(…);
> }
>
> and in snapshotState(…), you will just insert the state in only one of
> the parallel instances. Which instance can be based on the subtask index
> (e.g. only add to the list state if your subtask index is == 0). You can
> obtain the subtask index by getRuntimeContext().getIndexOfThisSubtask().
>
> UnionListState will replicate all submitted states to all parallel
> instances, so what was checkpointed on one operator instance will be
> replicated to all in restore.
>
> Best,
> Stefan
>
>
> Am 04.07.2017 um 13:56 schrieb gerardg <ger...@talaia.io>:
>
> Thanks Fabian, I'll keep an eye to that JIRA.
>
> I'm not sure I follow you Stefan. You mean that I could implement my own
> OperatorStateStore and override its methods (e.g. snapshot and restore) to
> achieve this functionality? I think I don't have enough knowledge about
> Flink's internals to implement this easily.
>
> Gerard
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-tp14102p14111.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>
>

Reply via email to