hi,

I want to use flink sql to left join static dimension table from mysql 
currently, so I converted the mysql table into data stream to join with 
datastream which has converted to flink table. While I found that the real-time 
stream data is not joined correctly with mysql data  at the beginning, but the 
latter stream can be joined correctly. So I want to ask that is there any good 
way to make real-time stream can join with mysql data with table api which has 
loaded and supporting dynamicly loading mysql data into memory once each hour. 
Thanks a lot.

The following is the some example code:

public static JDBCInputFormatBuilder inputBuilder = 
JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername(DRIVER_CLASS)
        .setDBUrl(DB_URL)
        .setUsername(USER_NAME)
        .setPassword(USER_PASS)
        .setQuery(SELECT_ALL_PERSONS)
        .setRowTypeInfo(ROW_TYPE_INFO);

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment  tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Order> orderA = env.addSource(new OrderFunction());
tEnv.registerDataStream("tableA", orderA, "name, product, amount");

DataStream<Row> mysql_table = env.createInput(inputBuilder.finish());
String[] dim_table_fileds = {"id","name","age","address"};

tEnv.registerDataStream("tableB",mysql_table);
Table result = tEnv.sqlQuery("SELECT 
tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB  join tableA on 
tableA.name = tableB.name" );
tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print();
env.execute();

Thanks a lot.

Reply via email to