Re: Initializing broadcast state

2021-01-28 Thread Guowei Ma
Hi Nick Following is an example(could not run but just to explain the idea). I use the `KeyedBroadcastProcessFunction` because I saw your code use the keyedstate. private static class StatefulFunctionWithKeyedStateAccessedOnBroadcast extends KeyedBroadcastProcessFunction { private sta

Re: Initializing broadcast state

2021-01-27 Thread Wei Jiang
Hi guys, i meet the same question, but i use a different way to init: ``` val list = ... //i use jdbc to get the init data val dimensionInitStream = env.fromCollection(list) //the main stream and the `dimensionStream` is a stream from flink cdc val dimension = dimensionStream.union(dimensionInit

Re: Initializing broadcast state

2021-01-26 Thread Jaffe, Julian
:31 PM To: Guowei Ma Cc: user Subject: Re: Initializing broadcast state Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can you point me to any example which shows what is the most efficient way to cache elements. Thanks a ton for your help. Best, Nick On Mon, Jan

Re: Initializing broadcast state

2021-01-26 Thread Nick Bendtner
Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can you point me to any example which shows what is the most efficient way to cache elements. Thanks a ton for your help. Best, Nick On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma wrote: > Hi,Nick > I do not think you could

Re: Initializing broadcast state

2021-01-25 Thread Guowei Ma
Hi,Nick I do not think you could update the `myState` in the `processBroadcastElement`. It is because you need a key before to update the keyedstate. But there is no key in `processBroadcastElement` . Best, Guowei On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner wrote: > Hi Guowei, > I am not usi

Re: Initializing broadcast state

2021-01-25 Thread Nick Bendtner
Hi Guowei, I am not using a keyed broadcast function, I use [1]. My question is, can a non broadcast state, for instance value state/map state be updated whenever I get a broadcast event in *processBroadcastElement*. This way the state updates are consistent since each instance of the task gets th

Re: Initializing broadcast state

2021-01-24 Thread Guowei Ma
Hi,Nick Normally you could not iterate all the keyed states, but the `BroadCastState` & `applyTokeyedState` could do that. For example, before you get the broadcast side elements you might choose to cache the non-broadcast element to the keyed state. After the broadcast elements arrive you need to

Re: Initializing broadcast state

2021-01-24 Thread Nick Bendtner
Thanks Guowei. Another question I have is, what is the use of a broadcast state when I can update a map state or value state inside of the process broadcast element method and use that state to do a lookup in the process element method like this example https://stackoverflow.com/questions/58307154/

Re: Initializing broadcast state

2021-01-24 Thread Guowei Ma
Hi, Nick You might need to handle it yourself If you have to process an element only after you get the broadcast state. For example, you could “cache” the element to the state and handle it when the element from the broadcast side elements are arrived. Specially if you are using the `KeyedBroad

Initializing broadcast state

2021-01-24 Thread Nick Bendtner
Hi guys, What is the way to initialize broadcast state(say with default values) before the first element shows up in the broadcasting stream? I do a lookup on the broadcast state to process transactions which come from another stream. The problem is the broadcast state is empty until the first elem