So I was trying to have something like this: PipelineConfigOperator pipelineConfigOperator = new PipelineConfigOperator();
messageStream .connect(pipelineConfigStream) .process(*pipelineConfigOperator*) .keyBy(tradeKeySelector) .window(ProcessingTimeSessionWindows.withDynamicGap(*pipelineConfigOperator*)) .process(new CumulativeTransactionOperator()) .name("Aggregate Transaction Builder"); where: PipelineConfigOperator extends BroadcastProcessFunction<EnrichedMessage, String, EnrichedMessage> implements SessionWindowTimeGapExtractor<EnrichedMessage> BroadcastStream<String> pipelineConfigStream = configRulesStream .broadcast(pipelineConfigStateDescriptor); MapStateDescriptor<String, String> pipelineConfigStateDescriptor = new MapStateDescriptor<>( "PipelineConfigBroadcastState", Types.STRING, TypeInformation.of(new TypeHint<String>() { })); SingleOutputStreamOperator<String> configRulesStream = env.addSource(new FlinkKafkaConsumer<>("pipeline-config", new SimpleStringSchema(), properties)) .name("Pipeline config stream"); PipelineConfigOperator keeps config in Broadcast state and its copy in local, transient HashMap. Whenever processBroadcastElement is called, Broadcast state and HashMap are updated. The problem is that when "extract" method is called the hashMap is null even thou it was initialized in open method. I was implementing Broadcastr state Pattern to standard operators like it is presented in documentation so Im familair with this concept. I assumed I can reuse it here. The bottom line is that I will not want to fetch state every time, only after config update -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/