One thing to consider could be using a CoProcessFunction instead of a BroadcastProcessFunction, and calling .broadcast on the input stream you want every task manager to receive. Then you could follow the pattern you laid out in your sample code (e.g. initialize state in the initializeState function, update myState in processElement2, and do your business logic in processElement1). You would still need some way to initialize your state with cached values, but you would have needed that anyway with the code sample you shared.
From: Nick Bendtner <buggi...@gmail.com> Date: Tuesday, January 26, 2021 at 12:31 PM To: Guowei Ma <guowei....@gmail.com> Cc: user <user@flink.apache.org> Subject: Re: Initializing broadcast state Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can you point me to any example which shows what is the most efficient way to cache elements. Thanks a ton for your help. Best, Nick On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma <guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote: Hi,Nick I do not think you could update the `myState` in the `processBroadcastElement`. It is because you need a key before to update the keyedstate. But there is no key in `processBroadcastElement` . Best, Guowei On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <buggi...@gmail.com<mailto:buggi...@gmail.com>> wrote: Hi Guowei, I am not using a keyed broadcast function, I use [1]. My question is, can a non broadcast state, for instance value state/map state be updated whenever I get a broadcast event in processBroadcastElement. This way the state updates are consistent since each instance of the task gets the same broadcast element. ``` private MapState<String, MyState> myState; @Override public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception { // Iterate over map state. myState.iterator().forEach(entry -> ())// Business logic // Do things } @Override public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception { // update map state which is not a broadcast state. Same update in every sub operator state.put(value.ID(), value.state()); // Update the mapState with value from broadcast } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // called when it's time to save state myState.clear(); // Update myState with current application state } @Override public void initializeState(FunctionInitializationContext context) throws Exception { // called when things start up, possibly recovering from an error descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)); myState = context.getKeyedStateStore().getMapState(descriptor); if (context.isRestored()) { // restore application state from myState } [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_api_java_org_apache_flink_streaming_api_functions_co_BroadcastProcessFunction.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=a5MrOON04pU86P3jwvZ4_BVB9kp3qscpfOyt31y31Ls&s=1X1Y093CuuClMSFYxz1xEQRC4Q6vrOHSI9a1blWHUw4&e=>. Best, Nick. On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote: Hi,Nick Normally you could not iterate all the keyed states, but the `BroadCastState` & `applyTokeyedState` could do that. For example, before you get the broadcast side elements you might choose to cache the non-broadcast element to the keyed state. After the broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate all the elements you "cached" in the keyed state and do your business logic. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.12_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=a5MrOON04pU86P3jwvZ4_BVB9kp3qscpfOyt31y31Ls&s=oAThncwHP8SAi-8hRn9eX1Wv5xHKHhH98qWYbvZ8LV8&e=> Best, Guowei On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <buggi...@gmail.com<mailto:buggi...@gmail.com>> wrote: Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate<https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_58307154_initialize-2Dthe-2Dcontent-2Dof-2Da-2Dmapstate&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=a5MrOON04pU86P3jwvZ4_BVB9kp3qscpfOyt31y31Ls&s=vdP28GzL-JSh1j2oXXsDF4dEbXFPPuJ8O7I-Kcgx66s&e=> Best, Nick On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote: Hi, Nick You might need to handle it yourself If you have to process an element only after you get the broadcast state. For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` to access the element you cache before. Best, Guowei On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <buggi...@gmail.com<mailto:buggi...@gmail.com>> wrote: Hi guys, What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first element shows up. Best, Nick.