Thanks a lot Guowei, that makes sense. I will go with the caching approach.
Can you point me to any example which shows what is the most efficient way
to cache elements.
Thanks a ton for your help.

Best,
Nick

On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi,Nick
> I do not think you could update the `myState`  in the
> `processBroadcastElement`. It is because you need a key before to update
> the keyedstate. But there is no key in `processBroadcastElement` .
> Best,
> Guowei
>
>
> On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <buggi...@gmail.com> wrote:
>
>> Hi Guowei,
>> I am not using a keyed broadcast function, I use [1].  My question is,
>> can a non broadcast state, for instance value state/map state be updated
>> whenever I get a broadcast event in *processBroadcastElement*. This way
>> the state updates are consistent since each instance of the task gets the
>> same broadcast element.
>>
>> ```
>> private MapState<String, MyState> myState;
>>
>> @Override
>>    public void processElement(InputType value, ReadOnlyContext ctx,
>> Collector<OutputType> out) throws Exception {
>>      // Iterate over map state.
>>        myState.iterator().forEach(entry -> ())// Business logic
>>
>>        // Do things
>>    }
>>
>>    @Override
>>    public void processBroadcastElement(BroadcastedStateType value,
>> Context ctx, Collector<OutputType> out) throws Exception {
>>      // update map state which is not a broadcast state. Same update in
>> every sub operator
>>        state.put(value.ID(), value.state());   // Update the mapState
>> with value from broadcast
>>    }
>>
>>
>>    @Override
>>  public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {
>>
>>      // called when it's time to save state
>>
>>      myState.clear();
>>
>>          // Update myState with current application state
>>
>>  }
>>
>>  @Override
>>  public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>>
>>      // called when things start up, possibly recovering from an error
>>
>>      descriptor = new MapStateDescriptor<>("state", Types.STRING,
>> Types.POJO(BroadcastedStateType.class));
>>
>>      myState = context.getKeyedStateStore().getMapState(descriptor);
>>
>>      if (context.isRestored()) {
>>
>>          // restore application state from myState
>>
>>      }
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html
>> .
>>
>>
>> Best,
>> Nick.
>>
>> On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi,Nick
>>> Normally you could not iterate all the keyed states, but the
>>> `BroadCastState` & `applyTokeyedState` could do that.
>>> For example, before you get the broadcast side elements you might choose
>>> to cache the non-broadcast element to the keyed state. After the broadcast
>>> elements arrive you need to use `applyTokeyedState`[1] to iterate all the
>>> elements you "cached" in the keyed state and do your business logic.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <buggi...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Guowei. Another question I have is, what is the use of a
>>>> broadcast state when I can update a map state or value state inside of the
>>>> process broadcast element method and use that state to do a lookup in the
>>>> process element method like this example
>>>>
>>>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate
>>>>
>>>>
>>>> Best,
>>>> Nick
>>>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <guowei....@gmail.com> wrote:
>>>>
>>>>> Hi, Nick
>>>>>   You might need to handle it yourself If you have to process an
>>>>> element only after you get the broadcast state.
>>>>>   For example, you could “cache” the element to the state and handle
>>>>> it when the element from the broadcast side elements are arrived. 
>>>>> Specially
>>>>> if you are using the `KeyedBroadcastProcessFunction` you could use the
>>>>> `applyToKeyedState` to access the element you cache before.
>>>>>
>>>>> Best,
>>>>> Guowei
>>>>>
>>>>>
>>>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <buggi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi guys,
>>>>>> What is the way to initialize broadcast state(say with default
>>>>>> values) before the first element shows up in the broadcasting stream? I 
>>>>>> do
>>>>>> a lookup on the broadcast state to process transactions which come from
>>>>>> another stream. The problem is the broadcast state is empty until the 
>>>>>> first
>>>>>> element shows up.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Nick.
>>>>>>
>>>>>

Reply via email to