Hi Guowei,
I am not using a keyed broadcast function, I use [1].  My question is, can
a non broadcast state, for instance value state/map state be updated
whenever I get a broadcast event in *processBroadcastElement*. This way the
state updates are consistent since each instance of the task gets the same
broadcast element.

```
private MapState<String, MyState> myState;

@Override
   public void processElement(InputType value, ReadOnlyContext ctx,
Collector<OutputType> out) throws Exception {
     // Iterate over map state.
       myState.iterator().forEach(entry -> ())// Business logic

       // Do things
   }

   @Override
   public void processBroadcastElement(BroadcastedStateType value, Context
ctx, Collector<OutputType> out) throws Exception {
     // update map state which is not a broadcast state. Same update in
every sub operator
       state.put(value.ID(), value.state());   // Update the mapState with
value from broadcast
   }


   @Override
 public void snapshotState(FunctionSnapshotContext context) throws
Exception {

     // called when it's time to save state

     myState.clear();

         // Update myState with current application state

 }

 @Override
 public void initializeState(FunctionInitializationContext context) throws
Exception {

     // called when things start up, possibly recovering from an error

     descriptor = new MapStateDescriptor<>("state", Types.STRING,
Types.POJO(BroadcastedStateType.class));

     myState = context.getKeyedStateStore().getMapState(descriptor);

     if (context.isRestored()) {

         // restore application state from myState

     }

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html
.


Best,
Nick.

On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi,Nick
> Normally you could not iterate all the keyed states, but the
> `BroadCastState` & `applyTokeyedState` could do that.
> For example, before you get the broadcast side elements you might choose
> to cache the non-broadcast element to the keyed state. After the broadcast
> elements arrive you need to use `applyTokeyedState`[1] to iterate all the
> elements you "cached" in the keyed state and do your business logic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>
> Best,
> Guowei
>
>
> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <buggi...@gmail.com> wrote:
>
>> Thanks Guowei. Another question I have is, what is the use of a broadcast
>> state when I can update a map state or value state inside of the process
>> broadcast element method and use that state to do a lookup in the process
>> element method like this example
>>
>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate
>>
>>
>> Best,
>> Nick
>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi, Nick
>>>   You might need to handle it yourself If you have to process an element
>>> only after you get the broadcast state.
>>>   For example, you could “cache” the element to the state and handle it
>>> when the element from the broadcast side elements are arrived. Specially if
>>> you are using the `KeyedBroadcastProcessFunction` you could use the
>>> `applyToKeyedState` to access the element you cache before.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <buggi...@gmail.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>> What is the way to initialize broadcast state(say with default values)
>>>> before the first element shows up in the broadcasting stream? I do a lookup
>>>> on the broadcast state to process transactions which come from another
>>>> stream. The problem is the broadcast state is empty until the first element
>>>> shows up.
>>>>
>>>>
>>>> Best,
>>>> Nick.
>>>>
>>>

Reply via email to