Hello Team,
public class FlinkJoinDataStream { @SuppressWarnings("serial") public static void main(String[] args) { Properties props = new Properties(); props.setProperty("zookeeper.connect", "localhost:2181"); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "myGroup"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple4<Integer, Integer, String, Long>> order_details = env .addSource(new FlinkKafkaConsumer010<String>("test1", new SimpleStringSchema(), props)).map(new Mapper1()); DataStream<Tuple4<Integer, Integer, String, Long>> invoice_details = env .addSource(new FlinkKafkaConsumer010<String>("test2", new SimpleStringSchema(), props)).map(new Mapper2()); long maxOutOfOrderness=550000L; DataStream<Tuple4<Integer, Integer, String, Long>> invoice_watermark = invoice_details.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<Integer, Integer, String, Long>>(){ long currentTimestamp; @Override public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element, long previousElementTimestamp) { currentTimestamp = element.f3; return currentTimestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentTimestamp); } }); invoice_watermark.print(); DataStream<Tuple4<Integer, Integer, String, Long>> order_watermark = order_details.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<Integer,Integer,String,Long>>() { long currentTimestamp; @Override public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element, long previousElementTimestamp) { currentTimestamp = element.f3; return currentTimestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentTimestamp-maxOutOfOrderness); } }); order_watermark.print(); DataStream<Tuple4<Integer, Integer, String, Integer>> joinedData = order_watermark.keyBy(0).join(invoice_watermark.keyBy(0)) .where(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>() { @Override public Integer getKey( Tuple4<Integer, Integer, String, Long>value) throws Exception { return value.f0; } }) .equalTo(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>() { @Override public Integer getKey(Tuple4<Integer, Integer, String, Long> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .apply(new JoinFunction<Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer, String,Integer>>() { @Override public Tuple4<Integer, Integer, String,Integer> join( Tuple4<Integer, Integer, String, Long> first, Tuple4<Integer, Integer, String, Long> second) throws Exception { return new Tuple4<Integer, Integer, String,Integer>(first.f0,first.f1,first. f2,second.f1); } }); joinedData.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } private static class Mapper1 implements MapFunction<String, Tuple4<Integer, Integer, String, Long>>{ private static final long serialVersionUID = 1L; //{"order_id":317,"customer_id":654,"tstamp_trans":"20181130090300"} @Override public Tuple4<Integer, Integer, String, Long> map(String value) throws Exception { JSONObject jsonObject = new JSONObject(value); final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss"); return new Tuple4<Integer, Integer, String, Long>( jsonObject.getInt("order_id"), jsonObject.getInt("customer_id"), jsonObject.getString("tstamp_trans"), dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000); } } private static class Mapper2 implements MapFunction<String, Tuple4<Integer, Integer, String, Long>>{ private static final long serialVersionUID = 1L; //{"order_id":317,"invoice_status":1,"tstamp_trans":"20181130090300"} @Override public Tuple4<Integer, Integer, String, Long> map(String value) throws Exception { JSONObject jsonObject = new JSONObject(value); final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss"); return new Tuple4<Integer, Integer, String, Long>( jsonObject.getInt("order_id"), jsonObject.getInt("invoice_status"), jsonObject.getString("tstamp_trans"), dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000); } } } *If I'm reading the same data using collection, everything is working fine:* private static List<String> createOrderRecords() { List<String>orderRecords=new ArrayList<>(); orderRecords.add( "{\"order_id\":312,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}" ); orderRecords.add( "{\"order_id\":314,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}" ); orderRecords.add( "{\"order_id\":316,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}" ); orderRecords.add( "{\"order_id\":317,\"customer_id\":654,\"tstamp_trans\":\"20181130096300\"}" ); orderRecords.add( "{\"order_id\":315,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}" ); orderRecords.add( "{\"order_id\":318,\"customer_id\":654,\"tstamp_trans\":\"20181130099000\"}" ); return orderRecords; } private static List<String> createInvoiceRecords() { List<String>invoiceRecords=new ArrayList<>(); invoiceRecords.add( "{\"order_id\":312,\"invoice_status\":1,\"tstamp_trans\":\"20181130090300\"}" ); invoiceRecords.add( "{\"order_id\":318,\"invoice_status\":1,\"tstamp_trans\":\"20181130099000\"}" ); invoiceRecords.add( "{\"order_id\":317,\"invoice_status\":1,\"tstamp_trans\":\"20181130096300\"}" ); invoiceRecords.add( "{\"order_id\":311,\"invoice_status\":1,\"tstamp_trans\":\"20181130050300\"}" ); return invoiceRecords; } If I'm excluding Kafka as data source and these collections as data source then thing's working fine. Thank you, Rakesh Kumar