Hi Fabian,
Thanks for the blog post about broadcast state. I have a question with respect
to the update capabilities of the broadcast state:
Assume you do whatever processing logic in the main processElement function ..
and at a given context marker you 1) would change a local field marker, to 2)
signal that next time the broadcast function is triggered a special pattern
should be created and broadcasted.
My question is: is such a behavior allowed? Would the new special Pattern that
originates in an operator be shared across the other instances of the
KeyedProcessFunction?
public static class PatternEvaluator
extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long,
Pattern>> {
public bolean test = false;
@Override
public void processElement(
Action action,
ReadOnlyContext ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
//…logic
if (..whatever context) {
Test = true;
}
}
@Override
public void processBroadcastElement(
Pattern pattern,
Context ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState<Void, Pattern> bcState =
ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID,
Types.POJO(Pattern.class)));
// storing in MapState with null as VOID default value
bcState.put(null, pattern);
If (test) {
bcState.put(null, new Pattern(test) );
}
}
}
Dr. Radu Tudoran
Staff Research Engineer - Big Data Expert
IT R&D Division
[cid:[email protected]]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München
E-mail: [email protected]<mailto:[email protected]>
Mobile: +49 15209084330
Telephone: +49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
This e-mail and its attachments contain confidential information from HUAWEI,
which is intended only for the person or entity whose address is listed above.
Any use of the information contained herein in any way (including, but not
limited to, total or partial disclosure, reproduction, or dissemination) by
persons other than the intended recipient(s) is prohibited. If you receive this
e-mail in error, please notify the sender by phone or email immediately and
delete it!
From: Fabian Hueske [mailto:[email protected]]
Sent: Monday, August 20, 2018 9:40 AM
To: Paul Lam <[email protected]>
Cc: Rong Rong <[email protected]>; Hequn Cheng <[email protected]>; user
<[email protected]>
Subject: Re: What's the advantage of using BroadcastState?
Hi,
I've recently published a blog post about Broadcast State [1].
Cheers,
Fabian
[1]
https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
2018-08-20 3:58 GMT+02:00 Paul Lam
<[email protected]<mailto:[email protected]>>:
Hi Rong, Hequn
Your answers are very helpful! Thank you!
Best Regards,
Paul Lam
在 2018年8月19日,23:30,Rong Rong <[email protected]<mailto:[email protected]>>
写道:
Hi Paul,
To add to Hequn's answer. Broadcast state can typically be used as "a
low-throughput stream containing a set of rules which we want to evaluate
against all elements coming from another stream" [1]
So to add to the difference list is: whether it is "broadcast" across all keys
if processing a keyed stream. This is typically when it is not possible to
derive same key field using KeySelector in CoStream.
Another additional difference is performance: BroadcastStream is "stored
locally and is used to process all incoming elements on the other stream" thus
requires to carefully manage the size of the BroadcastStream.
[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng
<[email protected]<mailto:[email protected]>> wrote:
Hi Paul,
There are some differences:
1. The BroadcastStream can broadcast data for you, i.e, data will be
broadcasted to all downstream tasks automatically.
2. To guarantee that the contents in the Broadcast State are the same across
all parallel instances of our operator, read-write access is only given to the
broadcast side
3. For BroadcastState, flink guarantees that upon restoring/rescaling there
will be no duplicates and no missing data. In case of recovery with the same or
smaller parallelism, each task reads its checkpointed state. Upon scaling up,
each task reads its own state, and the remaining tasks (p_new-p_old) read
checkpoints of previous tasks in a round-robin manner. While MapState doesn't
have such abilities.
Best, Hequn
On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam
<[email protected]<mailto:[email protected]>> wrote:
Hi,
AFAIK, the difference between a BroadcastStream and a normal DataStream is that
the BroadcastStream is with a BroadcastState, but it seems that the
functionality of BroadcastState can also be achieved by MapState in a
CoMapFunction or something since the control stream is still broadcasted
without being turned into BroadcastStream. So, I’m wondering what’s the
advantage of using BroadcastState? Thanks a lot!
Best Regards,
Paul Lam