Hi Yi

Glad to know you have already resolved it. State process API would use data 
stream API instead of data set API in the future [1].

Besides, you could also follow the guide in "the brodcast state pattern"[2]


// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> stateDescriptor = new MapStateDescriptor<>(
                        "RulesBroadcastState",
                        BasicTypeInfo.STRING_TYPE_INFO,
                        TypeInformation.of(new TypeHint<Rule>() {}));

// broadcast the rules and create the broadcast state
BroadcastStream<Rule> broadcastStream = ruleStream
                        .broadcast(stateDescriptor);


colorPartitionedStream
                 .connect(broadcastStream)
                 .process(

                     new KeyedBroadcastProcessFunction<Color, Item, Rule, 
String>() {
                         // my matching logic
                     }
                 ).uid("your-uid");

Make sure the uid and the state-name are the same with those in your savepoint, 
the CoBroadcastWithKeyedOperator would initialize the broadcast state when 
opening. [3]


[1] 
https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
[3] 
https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101


Best
Yun Tang

________________________________
From: Jin Yi <eleanore....@gmail.com>
Sent: Monday, January 27, 2020 14:50
To: Yun Tang <myas...@live.com>
Cc: user <user@flink.apache.org>; user...@flink.apache.org 
<user...@flink.apache.org>
Subject: Re: [State Processor API] how to convert savepoint back to broadcast 
state

Hi Yun,

After search around in the documentation, I tried extends 
BroadcastProcessFunction implements CheckpointedFunction. And I have 
initialized broadcast state in public void 
initializeState(FunctionInitializationContext context) method, it seems working 
fine.

Here is the doc I followed: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction

Thanks a lot for your help!
Eleanore

On Sun, Jan 26, 2020 at 6:53 PM Jin Yi 
<eleanore....@gmail.com<mailto:eleanore....@gmail.com>> wrote:
Hi Yun,

Thanks for the response, I have checked official document, and I have referred 
this example to write the broadcast state to a savepoint.

My question is: I can use state processor api to read back the savepoint into a 
dataSet, but how can I use the dataSet as the initial value for the broadcast 
state in the BroadcastProcessFunction.

Thanks a lot!

Eleanore

On Sun, Jan 26, 2020 at 8:53 AM Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> wrote:
Hi Yi

Can the official doc of writing broad cast state [1] satisfies your request?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1

Best
Yun Tang
________________________________
From: Jin Yi <eleanore....@gmail.com<mailto:eleanore....@gmail.com>>
Sent: Thursday, January 23, 2020 8:12
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>; 
user...@flink.apache.org<mailto:user...@flink.apache.org> 
<user...@flink.apache.org<mailto:user...@flink.apache.org>>
Subject: [State Processor API] how to convert savepoint back to broadcast state

Hi there,

I would like to read the savepoints (for broadcast state) back into the 
broadcast state, how should I do it?


// load the existingSavepoint;
ExistingSavepoint existingSavepoint = Savepoint.load(environment, 
"file:///tmp/new_savepoints", new MemoryStateBackend());

// read state from existing savepoint
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, 
"largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);

// TODO in BoradcastProcessFunction, how can I put the savepoint dataset back 
into BroadcastState?

Thanks!

Eleanore

Reply via email to