Hi, according to flink doc, it seems that you need to pass at least one
argument into the table function.

On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <827295...@qq.com> wrote:

> Hey, I am new to flink and I have a question and want to see if anyone can
> help here.
>
> How to use Dimension table in Flink TableAPI with
> StreamExecutionEnvironment ?
>
> I use TableFuncion to deal this question, but it have some problem in debug
> like this:
> LogicalProject(col_1=[$0])
>   LogicalJoin(condition=[true], joinType=[left])
>     LogicalTableScan(table=[[test]])
>     LogicalTableFunctionScan(invocation=[dim_test()],
> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)],
> elementType=[class [Ljava.lang.Object;])
>
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
>         at
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>         at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674)
>         at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>         at
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216)
>         at
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692)
>         at
> com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64)
>
> SQL : select t.col_1 from test t left join lateral table(dim_test()) b on
> true
>
> Main Code:
> public static void main(String[] args) throws Exception {
>         String sql = "select t.col_1 from test t left join lateral
> table(dim_test()) b on true";
>         StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment stEnv =
> TableEnvironment.getTableEnvironment(streamEnv);
>         Properties kafkaProps = new Properties();
>         kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
>         kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>         kafkaProps.setProperty("group.id", "test");
>         Kafka010JsonTableSource tableSource =
> Kafka010JsonTableSource.builder()
>                 .forTopic("test")
>                 .withKafkaProperties(kafkaProps)
>                 .withSchema(TableSchema.builder()
>                         .field("col_1", Types.STRING)
>                         .field("col_2",Types.STRING).build())
>                 .build();
>         stEnv.registerTableSource("test", tableSource);
>         String[] columns = {"col","name"};
>         TypeInformation[] typeInformations =
> {TypeInformation.of(String.class),TypeInformation.of(String.class)};
>         TableSchema tableSchema = new
> TableSchema(columns,typeInformations);
>         Map<String,Object> context = new HashMap<>();
>         context.put("mysql.url","jdbc:mysql://localhost:3306/test");
>         context.put("mysql.driver","com.mysql.jdbc.Driver");
>         context.put("mysql.user","test");
>         context.put("mysql.password","test");
>         context.put("mysql.table","dim_test");
>         StreamSqlDim dim = new
> MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context));
>         stEnv.registerFunction("dim_test",dim);
>
>         String[] outColumns = {"col"};
>         TypeInformation[] outType = {TypeInformation.of(String.class)};
>         TableSink tableSink = new
> Kafka010JsonTableSink("test_out",kafkaProps);
>         stEnv.registerTableSink("test_out",outColumns,outType,tableSink);
>         Table t = stEnv.sql(sql);
>         stEnv.insertInto(t,"test_out",stEnv.queryConfig());
>         streamEnv.execute();
>     }
>
> MySqlDim is extends TableFunction ,and the method eval() is empty,like
> this:
> public void eval(){
>
> }
>
>
>
> --
Liu, Renjie
Software Engineer, MVAD

Reply via email to