Hi Pakesh Kuma, I think you can using the interval-join, e.g.: orderStream .keyBy(<KeySelector>) .intervalJoin(invoiceStream.keyBy(<KeySelector>)) .between(Time.minutes(-5), Time.minutes(5))
The semantics of interval-join and detailed usage description can refer to https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join Hope to help you, and any feedback is welcome! Bests, Jincheng Rakesh Kumar <rakkukumar2...@gmail.com> 于2018年12月6日周四 下午7:10写道: > Hi, > I have two data sources one is for order data and another one is for > invoice data, these two data i am pushing into kafka topic in json form. I > wanted to delay order data for 5 mins because invoice data comes only after > order data is generated. So, for that i have written a flink program which > will take these two data from kafka and apply watermarks and delay order > data for 5 mins. After applying watermarks on these data, i wanted to join > these data based on order_id which is present in both order and invoice > data. After Joining i wanted to push it to kafka in different topic. > > But, i am not able to join these data streams with 5 min delay and i am > not able to figure it out. > > I am attaching my flink program below and it's dependency. >