Hi Yi Glad to know you have already resolved it. State process API would use data stream API instead of data set API in the future [1].
Besides, you could also follow the guide in "the brodcast state pattern"[2] // a map descriptor to store the name of the rule (string) and the rule itself. MapStateDescriptor<String, Rule> stateDescriptor = new MapStateDescriptor<>( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Rule>() {})); // broadcast the rules and create the broadcast state BroadcastStream<Rule> broadcastStream = ruleStream .broadcast(stateDescriptor); colorPartitionedStream .connect(broadcastStream) .process( new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() { // my matching logic } ).uid("your-uid"); Make sure the uid and the state-name are the same with those in your savepoint, the CoBroadcastWithKeyedOperator would initialize the broadcast state when opening. [3] [1] https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis [3] https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101 Best Yun Tang ________________________________ From: Jin Yi <eleanore....@gmail.com> Sent: Monday, January 27, 2020 14:50 To: Yun Tang <myas...@live.com> Cc: user <user@flink.apache.org>; user...@flink.apache.org <user...@flink.apache.org> Subject: Re: [State Processor API] how to convert savepoint back to broadcast state Hi Yun, After search around in the documentation, I tried extends BroadcastProcessFunction implements CheckpointedFunction. And I have initialized broadcast state in public void initializeState(FunctionInitializationContext context) method, it seems working fine. Here is the doc I followed: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction Thanks a lot for your help! Eleanore On Sun, Jan 26, 2020 at 6:53 PM Jin Yi <eleanore....@gmail.com<mailto:eleanore....@gmail.com>> wrote: Hi Yun, Thanks for the response, I have checked official document, and I have referred this example to write the broadcast state to a savepoint. My question is: I can use state processor api to read back the savepoint into a dataSet, but how can I use the dataSet as the initial value for the broadcast state in the BroadcastProcessFunction. Thanks a lot! Eleanore On Sun, Jan 26, 2020 at 8:53 AM Yun Tang <myas...@live.com<mailto:myas...@live.com>> wrote: Hi Yi Can the official doc of writing broad cast state [1] satisfies your request? [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1 Best Yun Tang ________________________________ From: Jin Yi <eleanore....@gmail.com<mailto:eleanore....@gmail.com>> Sent: Thursday, January 23, 2020 8:12 To: user <user@flink.apache.org<mailto:user@flink.apache.org>>; user...@flink.apache.org<mailto:user...@flink.apache.org> <user...@flink.apache.org<mailto:user...@flink.apache.org>> Subject: [State Processor API] how to convert savepoint back to broadcast state Hi there, I would like to read the savepoints (for broadcast state) back into the broadcast state, how should I do it? // load the existingSavepoint; ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); // read state from existing savepoint dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back into BroadcastState? Thanks! Eleanore