Hi yelun, Currently, there are no direct ways to dynamically load data and do join in Flink-SQL, as a workaround you can implement your logic with an udtf. In the udtf, you can load the data into a cache and update it according to your requirement.
Best, Hequn On Wed, Nov 14, 2018 at 10:34 AM yelun <986463...@qq.com> wrote: > hi, > > I want to use flink sql to left join static dimension table from mysql > currently, so I converted the mysql table into data stream to join with > datastream which has converted to flink table. While I found that the > real-time stream data is not joined correctly with mysql data at the > beginning, but the latter stream can be joined correctly. So I want to ask > that is there any good way to make real-time stream can join with mysql > data with table api which has loaded and supporting dynamicly loading mysql > data into memory once each hour. Thanks a lot. > > The following is the some example code: > > public static JDBCInputFormatBuilder inputBuilder = > JDBCInputFormat.buildJDBCInputFormat() > .setDrivername(DRIVER_CLASS) > .setDBUrl(DB_URL) > .setUsername(USER_NAME) > .setPassword(USER_PASS) > .setQuery(SELECT_ALL_PERSONS) > .setRowTypeInfo(ROW_TYPE_INFO); > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); > > DataStream<Order> orderA = env.addSource(new OrderFunction()); > tEnv.registerDataStream("tableA", orderA, "name, product, amount"); > > DataStream<Row> mysql_table = env.createInput(inputBuilder.finish()); > String[] dim_table_fileds = {"id","name","age","address"}; > > tEnv.registerDataStream("tableB",mysql_table); > Table result = tEnv.sqlQuery("SELECT > tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB join > tableA on tableA.name = tableB.name" ); > tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print(); > env.execute(); > > Thanks a lot. >