I’m using JDBCInputFormat to read snapshot of a MySQL table and FlinkKafkaConsumer to read binlog which is written to Kafka by Debezium.
DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…)); DataStream snapshotStream = env.createInput(JDBCInputFormat.buildJDBCInputFormat()….); // map() is to convert two streams into same type: (action, fields…), where action is “insert”, “update”, “delete”. The action for “snapshotStream” is always “insert”. DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…)); tableStream.print(); env.execute(“example”); 1. To make sure “tableStream” doesn’t miss any row, the “binlogStream” must connect to Kafka first so that binlog starts before the table snapshot, I can roughly achieve this by “myKafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() – 600*1000)”. 2. To make sure changes from “binlogStream” always overwrite upon “snapshotStream”, I need a way to hold “binlogStream” until “snapshotStream” is drained, so that changes from “binlogStream” are all behind changes from “snapshotStream”. How can I achieve this ? I’m considering a wrapper SourceFunction to combine FlinkKafkaConsumer and JDBCInputFormat, but they are different on parallelism and checkpointing, I’m not sure how to get the wrapper right and even whether it’s right direction. Any suggestion will be very appreciated!