Dear All,    I have a question about  TableSource.
    I defined a TableSource By StreamTableSource,then register a table and 
execute a query.the sql as "select f0 from myTable". final,turn the result 
table to DataStream.
The following error occurred in execution and how to solve?
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
Incompatible types of expression and result type.
        at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:966)
        at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:964)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

    MyCode as follows:
    ======================================
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnvironment = 
TableEnvironment.getTableEnvironment(environment);

    tableEnvironment.registerTableSource("myTable",new MyTableSource());
    String sql = "select f0 from myTable";
    Table sqlResult = tableEnvironment.sql(sql);

    DataStream<Tuple2<Boolean,String>> result = 
tableEnvironment.toRetractStream(sqlResult,String.class);
    result.print();

    environment.execute();
}package com.xiaoju.manhattan.fbi.data.calc.source;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

public class MyTableSource implements StreamTableSource<Row>{
    @Override
    public TypeInformation<Row> getReturnType() {
        TypeInformation<Row> typeInformation = new 
RowTypeInfo(Types.STRING,Types.STRING,Types.STRING);//Types.STRING,Types.STRING,Types.STRING
        return typeInformation;
    }

    @Override
    public String explainSource() {
        return "";
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

        DataStream<Row> dataStream = execEnv.addSource(new 
SourceFunction<Row>() {
            private long count = 0L;
            private volatile boolean isRunning = true;
            private String str = "{\"ak\":\"av\",\"bk\":\"bv\",\"ck\":\"cv\"}";
            @Override
            public void run(SourceContext<Row> ctx) throws Exception {
                while (isRunning && count < 10){
                    synchronized (ctx.getCheckpointLock()){
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode jsonNode = objectMapper.readTree(str);
                        Row row = new Row(jsonNode.size()-1);
                        for(int i=0;i<jsonNode.size();i++){
                            row.setField(i,jsonNode.get(i));
                        }
                        ctx.collect(row);
                        count++;
                    }
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        return dataStream;
    }
}

Reply via email to