Hi, Liu and Hequn are right. You need to pass at least one parameter into the table function, i.e.,
select t.col_1 from test t left join lateral table(dim_test(SOME_ATTRIBUTE)) b on true Best, Fabian 2018-02-24 13:24 GMT+01:00 ZhenBao Ye <ye_zhen...@qq.com>: > hi,i was use 1.4.0。 > > Yezhenbao > > > 在 2018年2月24日,17:55,Renjie Liu <liurenjie2...@gmail.com> 写道: > > > > Hi, according to flink doc, it seems that you need to pass at least one > > argument into the table function. > > > >> On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <827295...@qq.com> wrote: > >> > >> Hey, I am new to flink and I have a question and want to see if anyone > can > >> help here. > >> > >> How to use Dimension table in Flink TableAPI with > >> StreamExecutionEnvironment ? > >> > >> I use TableFuncion to deal this question, but it have some problem in > debug > >> like this: > >> LogicalProject(col_1=[$0]) > >> LogicalJoin(condition=[true], joinType=[left]) > >> LogicalTableScan(table=[[test]]) > >> LogicalTableFunctionScan(invocation=[dim_test()], > >> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], > >> elementType=[class [Ljava.lang.Object;]) > >> > >> This exception indicates that the query uses an unsupported SQL feature. > >> Please check the documentation for the set of currently supported SQL > >> features. > >> at > >> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner( > TableEnvironment.scala:274) > >> at > >> org.apache.flink.table.api.StreamTableEnvironment.optimize( > StreamTableEnvironment.scala:674) > >> at > >> org.apache.flink.table.api.StreamTableEnvironment.translate( > StreamTableEnvironment.scala:730) > >> at > >> org.apache.flink.table.api.StreamTableEnvironment.writeToSink( > StreamTableEnvironment.scala:216) > >> at > >> org.apache.flink.table.api.TableEnvironment.insertInto( > TableEnvironment.scala:692) > >> at > >> com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64) > >> > >> SQL : select t.col_1 from test t left join lateral table(dim_test()) b > on > >> true > >> > >> Main Code: > >> public static void main(String[] args) throws Exception { > >> String sql = "select t.col_1 from test t left join lateral > >> table(dim_test()) b on true"; > >> StreamExecutionEnvironment streamEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment(); > >> StreamTableEnvironment stEnv = > >> TableEnvironment.getTableEnvironment(streamEnv); > >> Properties kafkaProps = new Properties(); > >> kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > >> kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > >> kafkaProps.setProperty("group.id", "test"); > >> Kafka010JsonTableSource tableSource = > >> Kafka010JsonTableSource.builder() > >> .forTopic("test") > >> .withKafkaProperties(kafkaProps) > >> .withSchema(TableSchema.builder() > >> .field("col_1", Types.STRING) > >> .field("col_2",Types.STRING).build()) > >> .build(); > >> stEnv.registerTableSource("test", tableSource); > >> String[] columns = {"col","name"}; > >> TypeInformation[] typeInformations = > >> {TypeInformation.of(String.class),TypeInformation.of(String.class)}; > >> TableSchema tableSchema = new > >> TableSchema(columns,typeInformations); > >> Map<String,Object> context = new HashMap<>(); > >> context.put("mysql.url","jdbc:mysql://localhost:3306/test"); > >> context.put("mysql.driver","com.mysql.jdbc.Driver"); > >> context.put("mysql.user","test"); > >> context.put("mysql.password","test"); > >> context.put("mysql.table","dim_test"); > >> StreamSqlDim dim = new > >> MySqlDimFactory().getInstance(tableSchema,new > StreamSqlContext(context)); > >> stEnv.registerFunction("dim_test",dim); > >> > >> String[] outColumns = {"col"}; > >> TypeInformation[] outType = {TypeInformation.of(String.class)}; > >> TableSink tableSink = new > >> Kafka010JsonTableSink("test_out",kafkaProps); > >> stEnv.registerTableSink("test_out",outColumns,outType, > tableSink); > >> Table t = stEnv.sql(sql); > >> stEnv.insertInto(t,"test_out",stEnv.queryConfig()); > >> streamEnv.execute(); > >> } > >> > >> MySqlDim is extends TableFunction ,and the method eval() is empty,like > >> this: > >> public void eval(){ > >> > >> } > >> > >> > >> > >> -- > > Liu, Renjie > > Software Engineer, MVAD > > > > > >