I’m using JDBCInputFormat to read snapshot of a MySQL table  and 
FlinkKafkaConsumer to read binlog which is written to Kafka by Debezium.

DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
DataStream snapshotStream = 
env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);

// map() is to convert two streams into same type:  (action,  fields…),  where 
action is “insert”, “update”, “delete”.  The action for “snapshotStream” is 
always “insert”.
DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…));

tableStream.print();
env.execute(“example”);


  1.  To make sure “tableStream” doesn’t miss any row,  the “binlogStream” must 
connect to  Kafka first so that binlog starts before the table snapshot,  I can 
roughly achieve this by 
“myKafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() – 600*1000)”.
  2.  To make sure changes from “binlogStream” always overwrite upon 
“snapshotStream”,   I need a way to hold “binlogStream”  until “snapshotStream” 
is drained,  so that changes from “binlogStream” are all behind changes from 
“snapshotStream”.  How can I achieve this ?

I’m considering a wrapper SourceFunction to combine FlinkKafkaConsumer and 
JDBCInputFormat,  but they are different on parallelism  and checkpointing,  
I’m not sure how to get the wrapper right and even whether it’s right direction.

Any suggestion will be very appreciated!

Reply via email to