[ https://issues.apache.org/jira/browse/FLINK-7852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fabian Hueske updated FLINK-7852: --------------------------------- Description: Dear All: I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP. I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following: =================The error msg======================================= Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo. at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88) at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85) In addition, My code as below: ========================My Code================================== {code} public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.getConfig().disableSysoutLogging(); StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment); DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() { private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}"; private long count = 0L; private volatile boolean isRunning = true; @Override public void run(SourceContext<String> ctx) throws Exception { while (isRunning && count<2){ synchronized (ctx.getCheckpointLock()){ ctx.collect(str1); count++; } } } @Override public void cancel() { isRunning = false; } }); DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() { @Override public JsonNode map(String s) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); JsonNode node = objectMapper.readTree(s); return node; } }); DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() { @Override public Row map(JsonNode jsonNode) throws Exception { int pos = 0; Row row = new Row(jsonNode.size()); Iterator<String> iterator = jsonNode.fieldNames(); while (iterator.hasNext()){ String key = iterator.next(); row.setField(pos,jsonNode.get(key).asText()); pos++; } return row; } }); dataStreamRow.addSink(new SinkFunction<Row>() { @Override public void invoke(Row value) throws Exception { System.out.println(value.getField(0)); } }); Table myTable = tableEnvironment.fromDataStream(dataStreamRow); Table result = myTable.select("f0"); DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class); dataStreamResult.print(); environment.execute(); } {code} Waiting for your earlier reply, thanks. Best Wishes, Han was: Dear All: I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP. I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following: =================The error msg======================================= Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo. at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88) at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85) In addition, My code as below: ========================My Code================================== public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.getConfig().disableSysoutLogging(); StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment); DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() { private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}"; private long count = 0L; private volatile boolean isRunning = true; @Override public void run(SourceContext<String> ctx) throws Exception { while (isRunning && count<2){ synchronized (ctx.getCheckpointLock()){ ctx.collect(str1); count++; } } } @Override public void cancel() { isRunning = false; } }); DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() { @Override public JsonNode map(String s) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); JsonNode node = objectMapper.readTree(s); return node; } }); DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() { @Override public Row map(JsonNode jsonNode) throws Exception { int pos = 0; Row row = new Row(jsonNode.size()); Iterator<String> iterator = jsonNode.fieldNames(); while (iterator.hasNext()){ String key = iterator.next(); row.setField(pos,jsonNode.get(key).asText()); pos++; } return row; } }); dataStreamRow.addSink(new SinkFunction<Row>() { @Override public void invoke(Row value) throws Exception { System.out.println(value.getField(0)); } }); Table myTable = tableEnvironment.fromDataStream(dataStreamRow); Table result = myTable.select("f0"); DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class); dataStreamResult.print(); environment.execute(); } Waiting for your earlier reply, thanks. Best Wishes, Han > An input of GenericTypeInfo<Row> cannot be converted to Table > ------------------------------------------------------------- > > Key: FLINK-7852 > URL: https://issues.apache.org/jira/browse/FLINK-7852 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.3.2 > Reporter: hanningning > > Dear All: > I'm starting to learn about Flink,and I have a question about Table > API&SQL as follows. It will be much appreciated to get your help ASAP. > I tried to convert a stream into a table. The initial data type of this > stream is String, and I converted the String type to Row through the map > method, then converted this Row type DataStream to a Table, but I got a > error, the error details is following: > =================The error msg======================================= > Exception in thread "main" org.apache.flink.table.api.TableException: An > input of GenericTypeInfo<Row> cannot be converted to Table. Please specify > the type of the input with a RowTypeInfo. > at > org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620) > at > org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) > at > org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88) > at > com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85) > In addition, My code as below: > ========================My Code================================== > {code} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > environment.getConfig().disableSysoutLogging(); > StreamTableEnvironment tableEnvironment = > TableEnvironment.getTableEnvironment(environment); > DataStream<String> dataStream = environment.addSource(new > SourceFunction<String>() { > private String str1 = > "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}"; > private long count = 0L; > private volatile boolean isRunning = true; > @Override > public void run(SourceContext<String> ctx) throws Exception { > while (isRunning && count<2){ > synchronized (ctx.getCheckpointLock()){ > ctx.collect(str1); > count++; > } > } > } > @Override > public void cancel() { > isRunning = false; > } > }); > DataStream<JsonNode> dataStreamJson = dataStream.map(new > MapFunction<String, JsonNode>() { > @Override > public JsonNode map(String s) throws Exception { > ObjectMapper objectMapper = new ObjectMapper(); > JsonNode node = objectMapper.readTree(s); > return node; > } > }); > DataStream<Row> dataStreamRow = dataStreamJson.map(new > MapFunction<JsonNode, Row>() { > @Override > public Row map(JsonNode jsonNode) throws Exception { > int pos = 0; > Row row = new Row(jsonNode.size()); > Iterator<String> iterator = jsonNode.fieldNames(); > while (iterator.hasNext()){ > String key = iterator.next(); > row.setField(pos,jsonNode.get(key).asText()); > pos++; > } > return row; > } > }); > dataStreamRow.addSink(new SinkFunction<Row>() { > @Override > public void invoke(Row value) throws Exception { > System.out.println(value.getField(0)); > } > }); > Table myTable = tableEnvironment.fromDataStream(dataStreamRow); > Table result = myTable.select("f0"); > DataStream<String> dataStreamResult = > tableEnvironment.toAppendStream(result,String.class); > dataStreamResult.print(); > environment.execute(); > } > {code} > Waiting for your earlier reply, thanks. > Best Wishes, > Han -- This message was sent by Atlassian JIRA (v6.4.14#64029)