Hi, udtf variables should not be empty, otherwise calcite failed to convert the join to an correlate.
Best, Hequn 2018-02-23 0:34 GMT+08:00 叶振宝 <827295...@qq.com>: > Hey, I am new to flink and I have a question and want to see if anyone can > help here. > > How to use Dimension table in Flink TableAPI with > StreamExecutionEnvironment ? > > I use TableFuncion to deal this question, but it have some problem in debug > like this: > LogicalProject(col_1=[$0]) > LogicalJoin(condition=[true], joinType=[left]) > LogicalTableScan(table=[[test]]) > LogicalTableFunctionScan(invocation=[dim_test()], > rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], > elementType=[class [Ljava.lang.Object;]) > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner( > TableEnvironment.scala:274) > at org.apache.flink.table.api.StreamTableEnvironment.optimize( > StreamTableEnvironment.scala:674) > at org.apache.flink.table.api.StreamTableEnvironment.translate( > StreamTableEnvironment.scala:730) > at org.apache.flink.table.api.StreamTableEnvironment.writeToSink( > StreamTableEnvironment.scala:216) > at org.apache.flink.table.api.TableEnvironment.insertInto( > TableEnvironment.scala:692) > at com.bigdata.stream.streamsql.FlinkSqlTest.main( > FlinkSqlTest.java:64) > > SQL : select t.col_1 from test t left join lateral table(dim_test()) b on > true > > Main Code: > public static void main(String[] args) throws Exception { > String sql = "select t.col_1 from test t left join lateral > table(dim_test()) b on true"; > StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment. > getExecutionEnvironment(); > StreamTableEnvironment stEnv = TableEnvironment. > getTableEnvironment(streamEnv); > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > kafkaProps.setProperty("group.id", "test"); > Kafka010JsonTableSource tableSource = Kafka010JsonTableSource. > builder() > .forTopic("test") > .withKafkaProperties(kafkaProps) > .withSchema(TableSchema.builder() > .field("col_1", Types.STRING) > .field("col_2",Types.STRING).build()) > .build(); > stEnv.registerTableSource("test", tableSource); > String[] columns = {"col","name"}; > TypeInformation[] typeInformations = {TypeInformation.of(String. > class),TypeInformation.of(String.class)}; > TableSchema tableSchema = new TableSchema(columns, > typeInformations); > Map<String,Object> context = new HashMap<>(); > context.put("mysql.url","jdbc:mysql://localhost:3306/test"); > context.put("mysql.driver","com.mysql.jdbc.Driver"); > context.put("mysql.user","test"); > context.put("mysql.password","test"); > context.put("mysql.table","dim_test"); > StreamSqlDim dim = new MySqlDimFactory().getInstance(tableSchema,new > StreamSqlContext(context)); > stEnv.registerFunction("dim_test",dim); > > String[] outColumns = {"col"}; > TypeInformation[] outType = {TypeInformation.of(String.class)}; > TableSink tableSink = new Kafka010JsonTableSink("test_ > out",kafkaProps); > stEnv.registerTableSink("test_out",outColumns,outType,tableSink); > Table t = stEnv.sql(sql); > stEnv.insertInto(t,"test_out",stEnv.queryConfig()); > streamEnv.execute(); > } > > MySqlDim is extends TableFunction ,and the method eval() is empty,like > this: > public void eval(){ > > } > > > >