Hi,Nick
I do not think you could update the `myState`  in the
`processBroadcastElement`. It is because you need a key before to update
the keyedstate. But there is no key in `processBroadcastElement` .
Best,
Guowei


On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <buggi...@gmail.com> wrote:

> Hi Guowei,
> I am not using a keyed broadcast function, I use [1].  My question is, can
> a non broadcast state, for instance value state/map state be updated
> whenever I get a broadcast event in *processBroadcastElement*. This way
> the state updates are consistent since each instance of the task gets the
> same broadcast element.
>
> ```
> private MapState<String, MyState> myState;
>
> @Override
>    public void processElement(InputType value, ReadOnlyContext ctx,
> Collector<OutputType> out) throws Exception {
>      // Iterate over map state.
>        myState.iterator().forEach(entry -> ())// Business logic
>
>        // Do things
>    }
>
>    @Override
>    public void processBroadcastElement(BroadcastedStateType value, Context
> ctx, Collector<OutputType> out) throws Exception {
>      // update map state which is not a broadcast state. Same update in
> every sub operator
>        state.put(value.ID(), value.state());   // Update the mapState with
> value from broadcast
>    }
>
>
>    @Override
>  public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
>
>      // called when it's time to save state
>
>      myState.clear();
>
>          // Update myState with current application state
>
>  }
>
>  @Override
>  public void initializeState(FunctionInitializationContext context) throws
> Exception {
>
>      // called when things start up, possibly recovering from an error
>
>      descriptor = new MapStateDescriptor<>("state", Types.STRING,
> Types.POJO(BroadcastedStateType.class));
>
>      myState = context.getKeyedStateStore().getMapState(descriptor);
>
>      if (context.isRestored()) {
>
>          // restore application state from myState
>
>      }
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html
> .
>
>
> Best,
> Nick.
>
> On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi,Nick
>> Normally you could not iterate all the keyed states, but the
>> `BroadCastState` & `applyTokeyedState` could do that.
>> For example, before you get the broadcast side elements you might choose
>> to cache the non-broadcast element to the keyed state. After the broadcast
>> elements arrive you need to use `applyTokeyedState`[1] to iterate all the
>> elements you "cached" in the keyed state and do your business logic.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <buggi...@gmail.com>
>> wrote:
>>
>>> Thanks Guowei. Another question I have is, what is the use of a
>>> broadcast state when I can update a map state or value state inside of the
>>> process broadcast element method and use that state to do a lookup in the
>>> process element method like this example
>>>
>>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate
>>>
>>>
>>> Best,
>>> Nick
>>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <guowei....@gmail.com> wrote:
>>>
>>>> Hi, Nick
>>>>   You might need to handle it yourself If you have to process an
>>>> element only after you get the broadcast state.
>>>>   For example, you could “cache” the element to the state and handle it
>>>> when the element from the broadcast side elements are arrived. Specially if
>>>> you are using the `KeyedBroadcastProcessFunction` you could use the
>>>> `applyToKeyedState` to access the element you cache before.
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <buggi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>> What is the way to initialize broadcast state(say with default values)
>>>>> before the first element shows up in the broadcasting stream? I do a 
>>>>> lookup
>>>>> on the broadcast state to process transactions which come from another
>>>>> stream. The problem is the broadcast state is empty until the first 
>>>>> element
>>>>> shows up.
>>>>>
>>>>>
>>>>> Best,
>>>>> Nick.
>>>>>
>>>>

Reply via email to