Hi,
I was playing around with BroadcastProcessFunction and I've observe a
specific behavior. 

My setup:

MapStateDescriptor<Void, ProcessingRule> ruleStateDescriptor = new
MapStateDescriptor<>(
                "RulesBroadcastState",
                Types.VOID,
                TypeInformation.of(new TypeHint<ProcessingRule>() {
                }));

BroadcastStream<ProcessingRule> processingRulesBroadcastStream =
processingRulesStream
               .broadcast(ruleStateDescriptor);


SingleOutputStreamOperator<EvaluatedTransaction> evaluatedTrades =
enrichedTransactionStream
                .connect(processingRulesBroadcastStream)
                .process(new DriveEngineRuleOperator())
                .name("Drive Rule Evaluation");

Where DriveEngineRuleOperator extends BroadcastProcessFunction and
implements open, processElement and processBroadcastElement methods.

I was following Flink's tutorials about broadcast state pattern and my
"open" method looks like this:

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        processingRulesDesc = new MapStateDescriptor<>(
                "RulesBroadcastState",
                Types.VOID,
                TypeInformation.of(new TypeHint<RuleParams>() {
                }));

  
    }


I've noticed that all methods are called by the same thread. Would it be
always the case, or could those methods be called by different threads?

The second thing I've noticed is that "open" method was executed only before
the first "fast stream" element was received (before execution of
processElement method). That means that if I received the control stream
element first (the broadcast stream element) then method open would not be
called and I will not initialize my processing rule descriptor - I will
loose the event.

What are the good practices in this case?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to