[ https://issues.apache.org/jira/browse/FLINK-7852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fabian Hueske closed FLINK-7852. -------------------------------- Resolution: Invalid Not a bug report or feature request. > An input of GenericTypeInfo<Row> cannot be converted to Table > ------------------------------------------------------------- > > Key: FLINK-7852 > URL: https://issues.apache.org/jira/browse/FLINK-7852 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.3.2 > Reporter: hanningning > > Dear All: > I'm starting to learn about Flink,and I have a question about Table > API&SQL as follows. It will be much appreciated to get your help ASAP. > I tried to convert a stream into a table. The initial data type of this > stream is String, and I converted the String type to Row through the map > method, then converted this Row type DataStream to a Table, but I got a > error, the error details is following: > =================The error msg======================================= > Exception in thread "main" org.apache.flink.table.api.TableException: An > input of GenericTypeInfo<Row> cannot be converted to Table. Please specify > the type of the input with a RowTypeInfo. > at > org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620) > at > org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) > at > org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88) > at > com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85) > In addition, My code as below: > ========================My Code================================== > {code} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > environment.getConfig().disableSysoutLogging(); > StreamTableEnvironment tableEnvironment = > TableEnvironment.getTableEnvironment(environment); > DataStream<String> dataStream = environment.addSource(new > SourceFunction<String>() { > private String str1 = > "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}"; > private long count = 0L; > private volatile boolean isRunning = true; > @Override > public void run(SourceContext<String> ctx) throws Exception { > while (isRunning && count<2){ > synchronized (ctx.getCheckpointLock()){ > ctx.collect(str1); > count++; > } > } > } > @Override > public void cancel() { > isRunning = false; > } > }); > DataStream<JsonNode> dataStreamJson = dataStream.map(new > MapFunction<String, JsonNode>() { > @Override > public JsonNode map(String s) throws Exception { > ObjectMapper objectMapper = new ObjectMapper(); > JsonNode node = objectMapper.readTree(s); > return node; > } > }); > DataStream<Row> dataStreamRow = dataStreamJson.map(new > MapFunction<JsonNode, Row>() { > @Override > public Row map(JsonNode jsonNode) throws Exception { > int pos = 0; > Row row = new Row(jsonNode.size()); > Iterator<String> iterator = jsonNode.fieldNames(); > while (iterator.hasNext()){ > String key = iterator.next(); > row.setField(pos,jsonNode.get(key).asText()); > pos++; > } > return row; > } > }); > dataStreamRow.addSink(new SinkFunction<Row>() { > @Override > public void invoke(Row value) throws Exception { > System.out.println(value.getField(0)); > } > }); > Table myTable = tableEnvironment.fromDataStream(dataStreamRow); > Table result = myTable.select("f0"); > DataStream<String> dataStreamResult = > tableEnvironment.toAppendStream(result,String.class); > dataStreamResult.print(); > environment.execute(); > } > {code} > Waiting for your earlier reply, thanks. > Best Wishes, > Han -- This message was sent by Atlassian JIRA (v6.4.14#64029)