Hi,Nick I do not think you could update the `myState` in the `processBroadcastElement`. It is because you need a key before to update the keyedstate. But there is no key in `processBroadcastElement` . Best, Guowei
On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <buggi...@gmail.com> wrote: > Hi Guowei, > I am not using a keyed broadcast function, I use [1]. My question is, can > a non broadcast state, for instance value state/map state be updated > whenever I get a broadcast event in *processBroadcastElement*. This way > the state updates are consistent since each instance of the task gets the > same broadcast element. > > ``` > private MapState<String, MyState> myState; > > @Override > public void processElement(InputType value, ReadOnlyContext ctx, > Collector<OutputType> out) throws Exception { > // Iterate over map state. > myState.iterator().forEach(entry -> ())// Business logic > > // Do things > } > > @Override > public void processBroadcastElement(BroadcastedStateType value, Context > ctx, Collector<OutputType> out) throws Exception { > // update map state which is not a broadcast state. Same update in > every sub operator > state.put(value.ID(), value.state()); // Update the mapState with > value from broadcast > } > > > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > > // called when it's time to save state > > myState.clear(); > > // Update myState with current application state > > } > > @Override > public void initializeState(FunctionInitializationContext context) throws > Exception { > > // called when things start up, possibly recovering from an error > > descriptor = new MapStateDescriptor<>("state", Types.STRING, > Types.POJO(BroadcastedStateType.class)); > > myState = context.getKeyedStateStore().getMapState(descriptor); > > if (context.isRestored()) { > > // restore application state from myState > > } > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html > . > > > Best, > Nick. > > On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi,Nick >> Normally you could not iterate all the keyed states, but the >> `BroadCastState` & `applyTokeyedState` could do that. >> For example, before you get the broadcast side elements you might choose >> to cache the non-broadcast element to the keyed state. After the broadcast >> elements arrive you need to use `applyTokeyedState`[1] to iterate all the >> elements you "cached" in the keyed state and do your business logic. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html >> >> Best, >> Guowei >> >> >> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <buggi...@gmail.com> >> wrote: >> >>> Thanks Guowei. Another question I have is, what is the use of a >>> broadcast state when I can update a map state or value state inside of the >>> process broadcast element method and use that state to do a lookup in the >>> process element method like this example >>> >>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate >>> >>> >>> Best, >>> Nick >>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <guowei....@gmail.com> wrote: >>> >>>> Hi, Nick >>>> You might need to handle it yourself If you have to process an >>>> element only after you get the broadcast state. >>>> For example, you could “cache” the element to the state and handle it >>>> when the element from the broadcast side elements are arrived. Specially if >>>> you are using the `KeyedBroadcastProcessFunction` you could use the >>>> `applyToKeyedState` to access the element you cache before. >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <buggi...@gmail.com> >>>> wrote: >>>> >>>>> Hi guys, >>>>> What is the way to initialize broadcast state(say with default values) >>>>> before the first element shows up in the broadcasting stream? I do a >>>>> lookup >>>>> on the broadcast state to process transactions which come from another >>>>> stream. The problem is the broadcast state is empty until the first >>>>> element >>>>> shows up. >>>>> >>>>> >>>>> Best, >>>>> Nick. >>>>> >>>>