im using flink 1.81.1 api on java 11 and im trying to use a BroadcastProcessFunction to filter a Products Datastream with a brand autorized Datastream as broadcast.
So my first products Datastream contains different products that has a field brand and my second brands Datastream contains only brands that should be allowed . The problem is that when my products comes to processElement of the BroadcastProcessFunction , the brandState is not yet full of the brands Datastream records, for example i have 4800 brands in my brands DataStream but when the products goes to processElement, the brandState only contains few of them (like 200 brands) , and this is causing problems because i have products which will not be allowed because their brands are not uploaded yet in the brandState Here is my BroadcastProcessFunction public class GateCoProcess extends BroadcastProcessFunction<CrawlData, Brand, CrawlData> { private final MapStateDescriptor<String, Boolean> broadcastStateDescriptor; public GateCoProcess(MapStateDescriptor<String, Boolean> broadcastStateDescriptor) { this.broadcastStateDescriptor = broadcastStateDescriptor; } @Override public void processElement(CrawlData value, ReadOnlyContext ctx, Collector<CrawlData> out) throws Exception { ReadOnlyBroadcastState<String, Boolean> brandState = ctx.getBroadcastState(broadcastStateDescriptor); if (brandState.contains(value.data.product.brand)) { out.collect(value); } } @Override public void processBroadcastElement(Brand brand, Context ctx, Collector<CrawlData> out) throws Exception { BroadcastState<String, Boolean> brandState = ctx.getBroadcastState(broadcastStateDescriptor); if (brand.active) { brandState.put(brand.getName(), true); } } } and here is my Datastreams and call of the function final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Brand> brands = env.fromSource(KafkaSources.brandsSource, WatermarkStrategy.noWatermarks(), "gatebrand-cdc-records"); MapStateDescriptor<String, Boolean> broadcastStateDescriptor = new MapStateDescriptor<>( "broadcastState", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); BroadcastStream<Brand> broadcastStream = brands.broadcast(broadcastStateDescriptor); // integration is the products Datastream DataStream<CrawlData> integration = ExtractData.extractProducts(env); DataStream<CrawlData> filtered = integration.connect(broadcastStream).process(new GateCoProcess(broadcastStateDescriptor)); env.execute("mon job de products"); What should I do to get around this problem ? thanks