Hi Yi
Glad to know you have already resolved it. State process API would use data
stream API instead of data set API in the future [1].
Besides, you could also follow the guide in "the brodcast state pattern"[2]
// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> stateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> broadcastStream = ruleStream
.broadcast(stateDescriptor);
colorPartitionedStream
.connect(broadcastStream)
.process(
new KeyedBroadcastProcessFunction<Color, Item, Rule,
String>() {
// my matching logic
}
).uid("your-uid");
Make sure the uid and the state-name are the same with those in your savepoint,
the CoBroadcastWithKeyedOperator would initialize the broadcast state when
opening. [3]
[1]
https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
[3]
https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101
Best
Yun Tang
________________________________
From: Jin Yi <[email protected]>
Sent: Monday, January 27, 2020 14:50
To: Yun Tang <[email protected]>
Cc: user <[email protected]>; [email protected]
<[email protected]>
Subject: Re: [State Processor API] how to convert savepoint back to broadcast
state
Hi Yun,
After search around in the documentation, I tried extends
BroadcastProcessFunction implements CheckpointedFunction. And I have
initialized broadcast state in public void
initializeState(FunctionInitializationContext context) method, it seems working
fine.
Here is the doc I followed:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction
Thanks a lot for your help!
Eleanore
On Sun, Jan 26, 2020 at 6:53 PM Jin Yi
<[email protected]<mailto:[email protected]>> wrote:
Hi Yun,
Thanks for the response, I have checked official document, and I have referred
this example to write the broadcast state to a savepoint.
My question is: I can use state processor api to read back the savepoint into a
dataSet, but how can I use the dataSet as the initial value for the broadcast
state in the BroadcastProcessFunction.
Thanks a lot!
Eleanore
On Sun, Jan 26, 2020 at 8:53 AM Yun Tang
<[email protected]<mailto:[email protected]>> wrote:
Hi Yi
Can the official doc of writing broad cast state [1] satisfies your request?
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1
Best
Yun Tang
________________________________
From: Jin Yi <[email protected]<mailto:[email protected]>>
Sent: Thursday, January 23, 2020 8:12
To: user <[email protected]<mailto:[email protected]>>;
[email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Subject: [State Processor API] how to convert savepoint back to broadcast state
Hi there,
I would like to read the savepoints (for broadcast state) back into the
broadcast state, how should I do it?
// load the existingSavepoint;
ExistingSavepoint existingSavepoint = Savepoint.load(environment,
"file:///tmp/new_savepoints", new MemoryStateBackend());
// read state from existing savepoint
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID,
"largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
// TODO in BoradcastProcessFunction, how can I put the savepoint dataset back
into BroadcastState?
Thanks!
Eleanore