Dear All, I have a question about TableSource. I defined a TableSource By StreamTableSource,then register a table and execute a query.the sql as "select f0 from myTable". final,turn the result table to DataStream. The following error occurred in execution and how to solve? Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Incompatible types of expression and result type. at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:966) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:964) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
MyCode as follows: ====================================== public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment); tableEnvironment.registerTableSource("myTable",new MyTableSource()); String sql = "select f0 from myTable"; Table sqlResult = tableEnvironment.sql(sql); DataStream<Tuple2<Boolean,String>> result = tableEnvironment.toRetractStream(sqlResult,String.class); result.print(); environment.execute(); }package com.xiaoju.manhattan.fbi.data.calc.source; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; public class MyTableSource implements StreamTableSource<Row>{ @Override public TypeInformation<Row> getReturnType() { TypeInformation<Row> typeInformation = new RowTypeInfo(Types.STRING,Types.STRING,Types.STRING);//Types.STRING,Types.STRING,Types.STRING return typeInformation; } @Override public String explainSource() { return ""; } @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() { private long count = 0L; private volatile boolean isRunning = true; private String str = "{\"ak\":\"av\",\"bk\":\"bv\",\"ck\":\"cv\"}"; @Override public void run(SourceContext<Row> ctx) throws Exception { while (isRunning && count < 10){ synchronized (ctx.getCheckpointLock()){ ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(str); Row row = new Row(jsonNode.size()-1); for(int i=0;i<jsonNode.size();i++){ row.setField(i,jsonNode.get(i)); } ctx.collect(row); count++; } } } @Override public void cancel() { isRunning = false; } }); return dataStream; } }