Hi Radu,

I cannot make a full understanding of your question but I guess the answer is 
NO.

The broadcast state pattern just provides you with an automatic data 
broadcasting and a bunch of map states to cache the "low-throughput” patterns. 
Also, to keep consistency, it forbid the `processElement()` to modify the 
states. But this API does not really broadcast the states. You should keep the 
logic for `processBraodcastElement()` deterministic. Maybe the equation below 
could make the pattern clear.

<identical input> + <deterministic logic> = <identical states> = <broadcast 
state>

Best,
Xingcan

> On Aug 27, 2018, at 10:23 PM, Radu Tudoran <radu.tudo...@huawei.com> wrote:
> 
> Hi Fabian,
>  
> Thanks for the blog post about broadcast state. I have a question with 
> respect to the update capabilities of the broadcast state:
>  
> Assume you do whatever processing logic in the main processElement function 
> .. and at a given context marker you 1) would change a local field marker, to 
> 2) signal that next time the broadcast function is triggered a special 
> pattern should be created and broadcasted.
>  
> My question is: is such a behavior allowed? Would the new special Pattern 
> that originates in an operator be shared across the other instances of the 
> KeyedProcessFunction?
>  
>  
> public static class PatternEvaluator
>  extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, 
> Pattern>> {
> 
> public bolean test = false;
>  
>   @Override
>   public void processElement(
>      Action action, 
>      ReadOnlyContext ctx, 
>      Collector<Tuple2<Long, Pattern>> out) throws Exception {
>  
>    //…logic
>   
>    if (..whatever context) {
>       Test = true;
>    }
>  
>    }
> 
>  @Override
>  public void processBroadcastElement(
>      Pattern pattern, 
>      Context ctx, 
>      Collector<Tuple2<Long, Pattern>> out) throws Exception {
>    // store the new pattern by updating the broadcast state
>   
>  BroadcastState<Void, Pattern> bcState =
>      ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, 
> Types.POJO(Pattern.class)));
>    // storing in MapState with null as VOID default value
>    bcState.put(null, pattern);
> 
>    If (test) {
>        bcState.put(null, new Pattern(test) );
>    }
>  
>  }
> }
>  
>  
> Dr. Radu Tudoran
> Staff Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com>
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
> <http://www.huawei.com/>
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com <mailto:fhue...@gmail.com>] 
> Sent: Monday, August 20, 2018 9:40 AM
> To: Paul Lam <paullin3...@gmail.com <mailto:paullin3...@gmail.com>>
> Cc: Rong Rong <walter...@gmail.com <mailto:walter...@gmail.com>>; Hequn Cheng 
> <chenghe...@gmail.com <mailto:chenghe...@gmail.com>>; user 
> <user@flink.apache.org <mailto:user@flink.apache.org>>
> Subject: Re: What's the advantage of using BroadcastState?
>  
> Hi,
>  
> I've recently published a blog post about Broadcast State [1].
>  
> Cheers,
> Fabian
>  
> [1] 
> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>  
> <https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink>
>  
> 2018-08-20 3:58 GMT+02:00 Paul Lam <paullin3...@gmail.com 
> <mailto:paullin3...@gmail.com>>:
> Hi Rong, Hequn
>  
> Your answers are very helpful! Thank you!
>  
> Best Regards,
> Paul Lam
> 
> 
> 在 2018年8月19日,23:30,Rong Rong <walter...@gmail.com 
> <mailto:walter...@gmail.com>> 写道:
>  
> Hi Paul,
>  
> To add to Hequn's answer. Broadcast state can typically be used as "a 
> low-throughput stream containing a set of rules which we want to evaluate 
> against all elements coming from another stream" [1] 
> So to add to the difference list is: whether it is "broadcast" across all 
> keys if processing a keyed stream. This is typically when it is not possible 
> to derive same key field using KeySelector in CoStream.
> Another additional difference is performance: BroadcastStream is "stored 
> locally and is used to process all incoming elements on the other stream" 
> thus requires to carefully manage the size of the BroadcastStream.
>  
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html>
>  
> On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng <chenghe...@gmail.com 
> <mailto:chenghe...@gmail.com>> wrote:
> Hi Paul,
>  
> There are some differences:
> 1. The BroadcastStream can broadcast data for you, i.e, data will be 
> broadcasted to all downstream tasks automatically. 
> 2. To guarantee that the contents in the Broadcast State are the same across 
> all parallel instances of our operator, read-write access is only given to 
> the broadcast side
> 3. For BroadcastState, flink guarantees that upon restoring/rescaling there 
> will be no duplicates and no missing data. In case of recovery with the same 
> or smaller parallelism, each task reads its checkpointed state. Upon scaling 
> up, each task reads its own state, and the remaining tasks (p_new-p_old) read 
> checkpoints of previous tasks in a round-robin manner. While MapState doesn't 
> have such abilities.
>  
> Best, Hequn
>  
> On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <paullin3...@gmail.com 
> <mailto:paullin3...@gmail.com>> wrote:
> Hi, 
> 
> AFAIK, the difference between a BroadcastStream and a normal DataStream is 
> that the BroadcastStream is with a BroadcastState, but it seems that the 
> functionality of BroadcastState can also be achieved by MapState in a 
> CoMapFunction or something since the control stream is still broadcasted 
> without being turned into BroadcastStream. So, I’m wondering what’s the 
> advantage of using BroadcastState? Thanks a lot!
> 
> Best Regards,
> Paul Lam

Reply via email to