Hello, I am trying to implement a broadcast join of two streams in flink using the broadcast functionality. In my usecase I have a large stream that will be enriched with a much smaller stream. In order to first test my approach, I have adapted the Taxi ride exercise in the official training repository (this one: https://github.com/apache/flink-training/blob/release-1.13/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala ), where the two streams are joined using .connect()
Instead, I have adapted my code as follows: //The main function has been abbreviated for ease of reading def main(){ //Main stream val rides = env .addSource(rideSourceOrTest(new TaxiRideGenerator())) .filter { ride => ride.isStart } // .keyBy { ride => ride.rideId } //Small stream val fares = env .addSource(fareSourceOrTest(new TaxiFareGenerator())) val broadcastStateDescriptor = new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare]) val faresBroadcast: BroadcastStream[TaxiFare] = fares .broadcast(broadcastStateDescriptor) val result: DataStream[(TaxiRide,TaxiFare)] = rides .connect(faresBroadcast) .process(new BroadcastJoin()) } class BroadcastJoin() extends BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]{//IN1, IN2, OUT。 That is, non broadcast stream type, broadcast stream type and output stream type //Broadcast state descriptor private lazy val broadcastStateDescriptor = new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare]) //Process the broadcast stream element, value is the broadcast stream element passed in, and the modifiable broadcast state can be obtained through CTX override def processBroadcastElement(value: TaxiFare, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#Context, out: Collector[(TaxiRide,TaxiFare)]): Unit = { val broadcast_status: BroadcastState[Long,TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor) broadcast_status.put ( value.rideId , value) // add the broadcast stream element to the broadcast state, which will be saved in local memory } //Handle non broadcast stream elements. Value is the non broadcast stream element passed in. Only read-only broadcast status can be obtained through CTX override def processElement(value: TaxiRide, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#ReadOnlyContext, out: Collector[(TaxiRide,TaxiFare)]): Unit = { //Read broadcast status val broadcast_status: ReadOnlyBroadcastState[Long, TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor) if(broadcast_status.contains(value.rideId)) { val foundMatch = broadcast_status.get(value.rideId) out.collect((value, foundMatch)) //Send out the desired results } } } I have limited the TaxiFare generator to only produce 20 samples. This approach seems to work, but I am not always getting 20 joined samples (both generators output samples starting with id=1 and increase by one). I did some investigating and what I believe is happening is this: In the case a sample is broadcasted to at least one of the nodes (I have 4) before the corresponding sample from the main stream is processed, then everything is fine and these 2 records will be joined. However if it happens that a record from the main sample is processed before the corresponding record from the small stream is broadcasted to at least one of the 4 nodes, this join never happens, as when the processElement() function is called, the lookup on the broadcast_status map will not find anything with that ride_id. There is clearly something wrong with this approach. If anyone has any idea of what I am doing wrong, I would very much appreciate any advice. Thank you, Gerald