Hi Guowei, I am not using a keyed broadcast function, I use [1]. My question is, can a non broadcast state, for instance value state/map state be updated whenever I get a broadcast event in *processBroadcastElement*. This way the state updates are consistent since each instance of the task gets the same broadcast element.
``` private MapState<String, MyState> myState; @Override public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception { // Iterate over map state. myState.iterator().forEach(entry -> ())// Business logic // Do things } @Override public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception { // update map state which is not a broadcast state. Same update in every sub operator state.put(value.ID(), value.state()); // Update the mapState with value from broadcast } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // called when it's time to save state myState.clear(); // Update myState with current application state } @Override public void initializeState(FunctionInitializationContext context) throws Exception { // called when things start up, possibly recovering from an error descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)); myState = context.getKeyedStateStore().getMapState(descriptor); if (context.isRestored()) { // restore application state from myState } [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html . Best, Nick. On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi,Nick > Normally you could not iterate all the keyed states, but the > `BroadCastState` & `applyTokeyedState` could do that. > For example, before you get the broadcast side elements you might choose > to cache the non-broadcast element to the keyed state. After the broadcast > elements arrive you need to use `applyTokeyedState`[1] to iterate all the > elements you "cached" in the keyed state and do your business logic. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html > > Best, > Guowei > > > On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <buggi...@gmail.com> wrote: > >> Thanks Guowei. Another question I have is, what is the use of a broadcast >> state when I can update a map state or value state inside of the process >> broadcast element method and use that state to do a lookup in the process >> element method like this example >> >> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate >> >> >> Best, >> Nick >> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, Nick >>> You might need to handle it yourself If you have to process an element >>> only after you get the broadcast state. >>> For example, you could “cache” the element to the state and handle it >>> when the element from the broadcast side elements are arrived. Specially if >>> you are using the `KeyedBroadcastProcessFunction` you could use the >>> `applyToKeyedState` to access the element you cache before. >>> >>> Best, >>> Guowei >>> >>> >>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <buggi...@gmail.com> >>> wrote: >>> >>>> Hi guys, >>>> What is the way to initialize broadcast state(say with default values) >>>> before the first element shows up in the broadcasting stream? I do a lookup >>>> on the broadcast state to process transactions which come from another >>>> stream. The problem is the broadcast state is empty until the first element >>>> shows up. >>>> >>>> >>>> Best, >>>> Nick. >>>> >>>