Hi, according to flink doc, it seems that you need to pass at least one argument into the table function.
On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <827295...@qq.com> wrote: > Hey, I am new to flink and I have a question and want to see if anyone can > help here. > > How to use Dimension table in Flink TableAPI with > StreamExecutionEnvironment ? > > I use TableFuncion to deal this question, but it have some problem in debug > like this: > LogicalProject(col_1=[$0]) > LogicalJoin(condition=[true], joinType=[left]) > LogicalTableScan(table=[[test]]) > LogicalTableFunctionScan(invocation=[dim_test()], > rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], > elementType=[class [Ljava.lang.Object;]) > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) > at > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) > at > org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216) > at > org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692) > at > com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64) > > SQL : select t.col_1 from test t left join lateral table(dim_test()) b on > true > > Main Code: > public static void main(String[] args) throws Exception { > String sql = "select t.col_1 from test t left join lateral > table(dim_test()) b on true"; > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment stEnv = > TableEnvironment.getTableEnvironment(streamEnv); > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > kafkaProps.setProperty("group.id", "test"); > Kafka010JsonTableSource tableSource = > Kafka010JsonTableSource.builder() > .forTopic("test") > .withKafkaProperties(kafkaProps) > .withSchema(TableSchema.builder() > .field("col_1", Types.STRING) > .field("col_2",Types.STRING).build()) > .build(); > stEnv.registerTableSource("test", tableSource); > String[] columns = {"col","name"}; > TypeInformation[] typeInformations = > {TypeInformation.of(String.class),TypeInformation.of(String.class)}; > TableSchema tableSchema = new > TableSchema(columns,typeInformations); > Map<String,Object> context = new HashMap<>(); > context.put("mysql.url","jdbc:mysql://localhost:3306/test"); > context.put("mysql.driver","com.mysql.jdbc.Driver"); > context.put("mysql.user","test"); > context.put("mysql.password","test"); > context.put("mysql.table","dim_test"); > StreamSqlDim dim = new > MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context)); > stEnv.registerFunction("dim_test",dim); > > String[] outColumns = {"col"}; > TypeInformation[] outType = {TypeInformation.of(String.class)}; > TableSink tableSink = new > Kafka010JsonTableSink("test_out",kafkaProps); > stEnv.registerTableSink("test_out",outColumns,outType,tableSink); > Table t = stEnv.sql(sql); > stEnv.insertInto(t,"test_out",stEnv.queryConfig()); > streamEnv.execute(); > } > > MySqlDim is extends TableFunction ,and the method eval() is empty,like > this: > public void eval(){ > > } > > > > -- Liu, Renjie Software Engineer, MVAD