Hi, +1 to what Stefan is suggesting , we have been using similar logic for a while:
@Override public void snapshotState(StateSnapshotContext context) throws Exception { updateBroadcastState(); super.snapshotState(context); } @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); initBroadcastState(); } Gyula Stefan Richter <s.rich...@data-artisans.com> ezt írta (időpont: 2017. júl. 4., K, 15:19): > What I mean is that you could obtain such a state in > > initializeState(FunctionInitializationContext context) { > context.getOperatorStateStore().getUnionListState(…); > } > > and in snapshotState(…), you will just insert the state in only one of > the parallel instances. Which instance can be based on the subtask index > (e.g. only add to the list state if your subtask index is == 0). You can > obtain the subtask index by getRuntimeContext().getIndexOfThisSubtask(). > > UnionListState will replicate all submitted states to all parallel > instances, so what was checkpointed on one operator instance will be > replicated to all in restore. > > Best, > Stefan > > > Am 04.07.2017 um 13:56 schrieb gerardg <ger...@talaia.io>: > > Thanks Fabian, I'll keep an eye to that JIRA. > > I'm not sure I follow you Stefan. You mean that I could implement my own > OperatorStateStore and override its methods (e.g. snapshot and restore) to > achieve this functionality? I think I don't have enough knowledge about > Flink's internals to implement this easily. > > Gerard > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-tp14102p14111.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > > >