So I was trying to have something like this:

PipelineConfigOperator pipelineConfigOperator = new
PipelineConfigOperator();

messageStream
                .connect(pipelineConfigStream)
                .process(*pipelineConfigOperator*)
                .keyBy(tradeKeySelector)
               
.window(ProcessingTimeSessionWindows.withDynamicGap(*pipelineConfigOperator*))
                .process(new CumulativeTransactionOperator())
                .name("Aggregate Transaction Builder");

where:

PipelineConfigOperator extends BroadcastProcessFunction<EnrichedMessage,
String, EnrichedMessage> implements
SessionWindowTimeGapExtractor<EnrichedMessage>


BroadcastStream<String> pipelineConfigStream = configRulesStream
                .broadcast(pipelineConfigStateDescriptor);

MapStateDescriptor<String, String> pipelineConfigStateDescriptor = new
MapStateDescriptor<>(
                "PipelineConfigBroadcastState",
                Types.STRING,
                TypeInformation.of(new TypeHint<String>() {
                }));

SingleOutputStreamOperator<String> configRulesStream = env.addSource(new
FlinkKafkaConsumer<>("pipeline-config",
                new SimpleStringSchema(), properties))
                .name("Pipeline config stream");


PipelineConfigOperator keeps config in Broadcast state and its copy in
local, transient HashMap.
Whenever processBroadcastElement is called, Broadcast state and HashMap are
updated.

The problem is that when "extract" method is called the hashMap is null even
thou it was initialized in open method. 

I was implementing Broadcastr state Pattern to standard operators like it is
presented in documentation so Im familair with this concept. I assumed I can
reuse it here. The bottom line is that I will not want to fetch state every
time, only after config update




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to