Hi kristoffSC, >> I've noticed that all methods are called by the same thread. Would it be always the case, or could those methods be called by different threads?
No, open/processXXX/close methods are called in the different stages of a task thread's life cycle. The framework must keep the call order. >> The second thing I've noticed is that "open" method was executed only before the first "fast stream" element was received (before execution of processElement method). That means that if I received the control stream element first (the broadcast stream element) then method open would not be called and I will not initialize my processing rule descriptor - I will loose the event. There is a similar question I joined that you can consider.[1] There is also another similar question that comes from StackOverflow.[2] Best, Vino [1]: http://mail-archives.apache.org/mod_mbox/flink-user/201911.mbox/%3CCAArWwf4jmbaFeizO_YBZVBAMyiuvV95DetoVCkj4rJi4PYorpQ%40mail.gmail.com%3E [2]: https://stackoverflow.com/questions/54748158/how-could-flink-broadcast-state-be-initialized KristoffSC <krzysiek.chmielew...@gmail.com> 于2019年12月11日周三 上午5:56写道: > Hi, > I was playing around with BroadcastProcessFunction and I've observe a > specific behavior. > > My setup: > > MapStateDescriptor<Void, ProcessingRule> ruleStateDescriptor = new > MapStateDescriptor<>( > "RulesBroadcastState", > Types.VOID, > TypeInformation.of(new TypeHint<ProcessingRule>() { > })); > > BroadcastStream<ProcessingRule> processingRulesBroadcastStream = > processingRulesStream > .broadcast(ruleStateDescriptor); > > > SingleOutputStreamOperator<EvaluatedTransaction> evaluatedTrades = > enrichedTransactionStream > .connect(processingRulesBroadcastStream) > .process(new DriveEngineRuleOperator()) > .name("Drive Rule Evaluation"); > > Where DriveEngineRuleOperator extends BroadcastProcessFunction and > implements open, processElement and processBroadcastElement methods. > > I was following Flink's tutorials about broadcast state pattern and my > "open" method looks like this: > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > processingRulesDesc = new MapStateDescriptor<>( > "RulesBroadcastState", > Types.VOID, > TypeInformation.of(new TypeHint<RuleParams>() { > })); > > > } > > > I've noticed that all methods are called by the same thread. Would it be > always the case, or could those methods be called by different threads? > > The second thing I've noticed is that "open" method was executed only > before > the first "fast stream" element was received (before execution of > processElement method). That means that if I received the control stream > element first (the broadcast stream element) then method open would not be > called and I will not initialize my processing rule descriptor - I will > loose the event. > > What are the good practices in this case? > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >