Hello Team,

public class FlinkJoinDataStream {


@SuppressWarnings("serial")

public static void main(String[] args) {


Properties props = new Properties();

props.setProperty("zookeeper.connect", "localhost:2181");

props.setProperty("bootstrap.servers", "localhost:9092");

props.setProperty("group.id", "myGroup");

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(1000);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


DataStream<Tuple4<Integer, Integer, String, Long>> order_details = env
.addSource(new FlinkKafkaConsumer010<String>("test1", new SimpleStringSchema(),
props)).map(new Mapper1());


DataStream<Tuple4<Integer, Integer, String, Long>> invoice_details = env
.addSource(new FlinkKafkaConsumer010<String>("test2", new SimpleStringSchema(),
props)).map(new Mapper2());

long maxOutOfOrderness=550000L;


DataStream<Tuple4<Integer, Integer, String, Long>> invoice_watermark =
invoice_details.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<Tuple4<Integer, Integer, String, Long>>(){


long currentTimestamp;


@Override

public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element,
long previousElementTimestamp) {

currentTimestamp = element.f3;

return currentTimestamp;

}


@Override

public Watermark getCurrentWatermark() {

return new Watermark(currentTimestamp);

}

});

invoice_watermark.print();

DataStream<Tuple4<Integer, Integer, String, Long>> order_watermark =
order_details.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<Tuple4<Integer,Integer,String,Long>>() {

long currentTimestamp;

@Override

public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element,
long previousElementTimestamp) {

currentTimestamp = element.f3;

return currentTimestamp;

}

@Override

public Watermark getCurrentWatermark() {

return new Watermark(currentTimestamp-maxOutOfOrderness);

}

});

order_watermark.print();

DataStream<Tuple4<Integer, Integer, String, Integer>> joinedData =
order_watermark.keyBy(0).join(invoice_watermark.keyBy(0))

.where(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>() {

@Override

public Integer getKey(

Tuple4<Integer, Integer, String, Long>value)

throws Exception {

return value.f0;

}

})

.equalTo(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>()
{


@Override

public Integer getKey(Tuple4<Integer, Integer, String, Long> value) throws
Exception {

return value.f0;

}

})

.window(TumblingEventTimeWindows.of(Time.seconds(60)))

.apply(new JoinFunction<Tuple4<Integer, Integer, String, Long>,
Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer,
String,Integer>>() {


@Override

public Tuple4<Integer, Integer, String,Integer> join(

Tuple4<Integer, Integer, String, Long> first,

Tuple4<Integer, Integer, String, Long> second) throws Exception {

return new Tuple4<Integer, Integer, String,Integer>(first.f0,first.f1,first.
f2,second.f1);

}

});

joinedData.print();

try {

env.execute();

} catch (Exception e) {

e.printStackTrace();

}

}

private static class Mapper1 implements MapFunction<String, Tuple4<Integer,
Integer, String, Long>>{


private static final long serialVersionUID = 1L;

//{"order_id":317,"customer_id":654,"tstamp_trans":"20181130090300"}

@Override

public Tuple4<Integer, Integer, String, Long> map(String value) throws
Exception {

JSONObject jsonObject = new JSONObject(value);

final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss");


return new Tuple4<Integer, Integer, String, Long>(

jsonObject.getInt("order_id"), jsonObject.getInt("customer_id"),

jsonObject.getString("tstamp_trans"),

dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);

}

}

private static class Mapper2 implements MapFunction<String, Tuple4<Integer,
Integer, String, Long>>{


private static final long serialVersionUID = 1L;

//{"order_id":317,"invoice_status":1,"tstamp_trans":"20181130090300"}


@Override

public Tuple4<Integer, Integer, String, Long> map(String value) throws
Exception {

JSONObject jsonObject = new JSONObject(value);

final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss");


return new Tuple4<Integer, Integer, String, Long>(

jsonObject.getInt("order_id"), jsonObject.getInt("invoice_status"),

jsonObject.getString("tstamp_trans"),

dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);

}

}


}


*If I'm reading the same data using collection, everything is working fine:*


private static List<String> createOrderRecords() {

List<String>orderRecords=new ArrayList<>();

orderRecords.add(
"{\"order_id\":312,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}"
);

orderRecords.add(
"{\"order_id\":314,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}"
);

orderRecords.add(
"{\"order_id\":316,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}"
);

orderRecords.add(
"{\"order_id\":317,\"customer_id\":654,\"tstamp_trans\":\"20181130096300\"}"
);

orderRecords.add(
"{\"order_id\":315,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}"
);

orderRecords.add(
"{\"order_id\":318,\"customer_id\":654,\"tstamp_trans\":\"20181130099000\"}"
);

return orderRecords;

}

private static List<String> createInvoiceRecords() {

List<String>invoiceRecords=new ArrayList<>();

invoiceRecords.add(
"{\"order_id\":312,\"invoice_status\":1,\"tstamp_trans\":\"20181130090300\"}"
);

invoiceRecords.add(
"{\"order_id\":318,\"invoice_status\":1,\"tstamp_trans\":\"20181130099000\"}"
);

invoiceRecords.add(
"{\"order_id\":317,\"invoice_status\":1,\"tstamp_trans\":\"20181130096300\"}"
);

invoiceRecords.add(
"{\"order_id\":311,\"invoice_status\":1,\"tstamp_trans\":\"20181130050300\"}"
);

return invoiceRecords;

}


If I'm excluding Kafka as data source and these collections as data source
then thing's working fine.


Thank you,

Rakesh Kumar

Reply via email to