hi,i was use 1.4.0。

Yezhenbao

> 在 2018年2月24日,17:55,Renjie Liu <liurenjie2...@gmail.com> 写道:
> 
> Hi, according to flink doc, it seems that you need to pass at least one
> argument into the table function.
> 
>> On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <827295...@qq.com> wrote:
>> 
>> Hey, I am new to flink and I have a question and want to see if anyone can
>> help here.
>> 
>> How to use Dimension table in Flink TableAPI with
>> StreamExecutionEnvironment ?
>> 
>> I use TableFuncion to deal this question, but it have some problem in debug
>> like this:
>> LogicalProject(col_1=[$0])
>>  LogicalJoin(condition=[true], joinType=[left])
>>    LogicalTableScan(table=[[test]])
>>    LogicalTableFunctionScan(invocation=[dim_test()],
>> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)],
>> elementType=[class [Ljava.lang.Object;])
>> 
>> This exception indicates that the query uses an unsupported SQL feature.
>> Please check the documentation for the set of currently supported SQL
>> features.
>>        at
>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>>        at
>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674)
>>        at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>>        at
>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216)
>>        at
>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692)
>>        at
>> com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64)
>> 
>> SQL : select t.col_1 from test t left join lateral table(dim_test()) b on
>> true
>> 
>> Main Code:
>> public static void main(String[] args) throws Exception {
>>        String sql = "select t.col_1 from test t left join lateral
>> table(dim_test()) b on true";
>>        StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>        StreamTableEnvironment stEnv =
>> TableEnvironment.getTableEnvironment(streamEnv);
>>        Properties kafkaProps = new Properties();
>>        kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
>>        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>>        kafkaProps.setProperty("group.id", "test");
>>        Kafka010JsonTableSource tableSource =
>> Kafka010JsonTableSource.builder()
>>                .forTopic("test")
>>                .withKafkaProperties(kafkaProps)
>>                .withSchema(TableSchema.builder()
>>                        .field("col_1", Types.STRING)
>>                        .field("col_2",Types.STRING).build())
>>                .build();
>>        stEnv.registerTableSource("test", tableSource);
>>        String[] columns = {"col","name"};
>>        TypeInformation[] typeInformations =
>> {TypeInformation.of(String.class),TypeInformation.of(String.class)};
>>        TableSchema tableSchema = new
>> TableSchema(columns,typeInformations);
>>        Map<String,Object> context = new HashMap<>();
>>        context.put("mysql.url","jdbc:mysql://localhost:3306/test");
>>        context.put("mysql.driver","com.mysql.jdbc.Driver");
>>        context.put("mysql.user","test");
>>        context.put("mysql.password","test");
>>        context.put("mysql.table","dim_test");
>>        StreamSqlDim dim = new
>> MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context));
>>        stEnv.registerFunction("dim_test",dim);
>> 
>>        String[] outColumns = {"col"};
>>        TypeInformation[] outType = {TypeInformation.of(String.class)};
>>        TableSink tableSink = new
>> Kafka010JsonTableSink("test_out",kafkaProps);
>>        stEnv.registerTableSink("test_out",outColumns,outType,tableSink);
>>        Table t = stEnv.sql(sql);
>>        stEnv.insertInto(t,"test_out",stEnv.queryConfig());
>>        streamEnv.execute();
>>    }
>> 
>> MySqlDim is extends TableFunction ,and the method eval() is empty,like
>> this:
>> public void eval(){
>> 
>> }
>> 
>> 
>> 
>> --
> Liu, Renjie
> Software Engineer, MVAD
> 



Reply via email to