Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can you point me to any example which shows what is the most efficient way to cache elements. Thanks a ton for your help.
Best, Nick On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi,Nick > I do not think you could update the `myState` in the > `processBroadcastElement`. It is because you need a key before to update > the keyedstate. But there is no key in `processBroadcastElement` . > Best, > Guowei > > > On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <buggi...@gmail.com> wrote: > >> Hi Guowei, >> I am not using a keyed broadcast function, I use [1]. My question is, >> can a non broadcast state, for instance value state/map state be updated >> whenever I get a broadcast event in *processBroadcastElement*. This way >> the state updates are consistent since each instance of the task gets the >> same broadcast element. >> >> ``` >> private MapState<String, MyState> myState; >> >> @Override >> public void processElement(InputType value, ReadOnlyContext ctx, >> Collector<OutputType> out) throws Exception { >> // Iterate over map state. >> myState.iterator().forEach(entry -> ())// Business logic >> >> // Do things >> } >> >> @Override >> public void processBroadcastElement(BroadcastedStateType value, >> Context ctx, Collector<OutputType> out) throws Exception { >> // update map state which is not a broadcast state. Same update in >> every sub operator >> state.put(value.ID(), value.state()); // Update the mapState >> with value from broadcast >> } >> >> >> @Override >> public void snapshotState(FunctionSnapshotContext context) throws >> Exception { >> >> // called when it's time to save state >> >> myState.clear(); >> >> // Update myState with current application state >> >> } >> >> @Override >> public void initializeState(FunctionInitializationContext context) >> throws Exception { >> >> // called when things start up, possibly recovering from an error >> >> descriptor = new MapStateDescriptor<>("state", Types.STRING, >> Types.POJO(BroadcastedStateType.class)); >> >> myState = context.getKeyedStateStore().getMapState(descriptor); >> >> if (context.isRestored()) { >> >> // restore application state from myState >> >> } >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html >> . >> >> >> Best, >> Nick. >> >> On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi,Nick >>> Normally you could not iterate all the keyed states, but the >>> `BroadCastState` & `applyTokeyedState` could do that. >>> For example, before you get the broadcast side elements you might choose >>> to cache the non-broadcast element to the keyed state. After the broadcast >>> elements arrive you need to use `applyTokeyedState`[1] to iterate all the >>> elements you "cached" in the keyed state and do your business logic. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html >>> >>> Best, >>> Guowei >>> >>> >>> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <buggi...@gmail.com> >>> wrote: >>> >>>> Thanks Guowei. Another question I have is, what is the use of a >>>> broadcast state when I can update a map state or value state inside of the >>>> process broadcast element method and use that state to do a lookup in the >>>> process element method like this example >>>> >>>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate >>>> >>>> >>>> Best, >>>> Nick >>>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <guowei....@gmail.com> wrote: >>>> >>>>> Hi, Nick >>>>> You might need to handle it yourself If you have to process an >>>>> element only after you get the broadcast state. >>>>> For example, you could “cache” the element to the state and handle >>>>> it when the element from the broadcast side elements are arrived. >>>>> Specially >>>>> if you are using the `KeyedBroadcastProcessFunction` you could use the >>>>> `applyToKeyedState` to access the element you cache before. >>>>> >>>>> Best, >>>>> Guowei >>>>> >>>>> >>>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <buggi...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi guys, >>>>>> What is the way to initialize broadcast state(say with default >>>>>> values) before the first element shows up in the broadcasting stream? I >>>>>> do >>>>>> a lookup on the broadcast state to process transactions which come from >>>>>> another stream. The problem is the broadcast state is empty until the >>>>>> first >>>>>> element shows up. >>>>>> >>>>>> >>>>>> Best, >>>>>> Nick. >>>>>> >>>>>