Hi Yun, Thanks for the suggestion!
Best Eleanore On Mon, Jan 27, 2020 at 1:54 AM Yun Tang <myas...@live.com> wrote: > Hi Yi > > Glad to know you have already resolved it. State process API would use > data stream API instead of data set API in the future [1]. > > Besides, you could also follow the guide in "the brodcast state > pattern"[2] > > // a map descriptor to store the name of the rule (string) and the rule > itself.MapStateDescriptor<String, Rule> stateDescriptor = new > MapStateDescriptor<>( > "*RulesBroadcastState*", > BasicTypeInfo.STRING_TYPE_INFO, > TypeInformation.of(new TypeHint<Rule>() {})); > // broadcast the rules and create the broadcast > stateBroadcastStream<Rule> broadcastStream = ruleStream > .broadcast(stateDescriptor); > > colorPartitionedStream > .connect(broadcastStream) > .process( > > new KeyedBroadcastProcessFunction<Color, Item, Rule, > String>() { > // my matching logic > } > ).uid("*your-uid*"); > > Make sure the uid and the state-name are the same with those in your > savepoint, the CoBroadcastWithKeyedOperator would initialize the broadcast > state when opening. [3] > > > [1] > https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis > [3] > https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101 > > > Best > Yun Tang > > ------------------------------ > *From:* Jin Yi <eleanore....@gmail.com> > *Sent:* Monday, January 27, 2020 14:50 > *To:* Yun Tang <myas...@live.com> > *Cc:* user <user@flink.apache.org>; user...@flink.apache.org < > user...@flink.apache.org> > *Subject:* Re: [State Processor API] how to convert savepoint back to > broadcast state > > Hi Yun, > > After search around in the documentation, I tried extends > BroadcastProcessFunction implements CheckpointedFunction. And I have > initialized broadcast state in public void > initializeState(FunctionInitializationContext > context) method, it seems working fine. > > Here is the doc I followed: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction > > Thanks a lot for your help! > Eleanore > > On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <eleanore....@gmail.com> wrote: > > Hi Yun, > > Thanks for the response, I have checked official document, and I have > referred this example to write the broadcast state to a savepoint. > > My question is: I can use state processor api to read back the savepoint > into a dataSet, but how can I use the dataSet as the initial value for the > broadcast state in the BroadcastProcessFunction. > > Thanks a lot! > > Eleanore > > On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <myas...@live.com> wrote: > > Hi Yi > > Can the official doc of writing broad cast state [1] satisfies your > request? > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1 > > Best > Yun Tang > ------------------------------ > *From:* Jin Yi <eleanore....@gmail.com> > *Sent:* Thursday, January 23, 2020 8:12 > *To:* user <user@flink.apache.org>; user...@flink.apache.org < > user...@flink.apache.org> > *Subject:* [State Processor API] how to convert savepoint back to > broadcast state > > Hi there, > > I would like to read the savepoints (for broadcast state) back into the > broadcast state, how should I do it? > > // load the existingSavepoint; > ExistingSavepoint existingSavepoint = Savepoint.load(environment, > "file:///tmp/new_savepoints", new MemoryStateBackend()); > > // read state from existing savepoint > dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, > "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO); > > // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back > into BroadcastState? > > Thanks! > > Eleanore > >