Hi Flavio, There is 2 main entry points in Table API, one is `TableEnvironment`, another is `StreamTableEnvironment`. - `TableEnvironment` is used for pure Table API & SQL programs. - `StreamTableEnvironment` can be used to convert from/to DataStream.
These two interface will be kept in the future, but `BatchTableEnvironment` will be removed in the future. Currently, it's true that blink planner batch mode can't convert into DataStream, because `StreamTableEnvironment#create` only accepts streaming mode. But this is on the roadmap, once DataStream support a set of batch operations, I think we can make `StreamTableEnvironment` support batch mode. If you want to conver into DataStream in blink planner batch mode, there is a tricky workaround that create a `StreamTableEnvironment` via the `StreamTableEnvironmentImpl` constructor. SQL CLI does in this way, you can take as an example [1]. Best, Jark [1]: https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java#L413 On Thu, 13 Feb 2020 at 00:15, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Hi to all, > I was trying to use the new Table API using the new Blink planner but I > figured out that they do not use exactly the same APIs..for example I can't > go back and forth from Tables to Dataset/Datastream anymore (using > tableEnv.toDataset for example). > Is this a temporary behaviour or this functionality will be removed sooner > or later (since from what I understood the current Flink planner will be > replaced by the blink one)? > I like the idea of creating sourcing just using properties but then I > can't transform it into a Dataset/Datastream. > Also the documentation seems to be quite messy about this transition > phase... > > This is my test program: > > public static void main(String[] args) throws Exception { > // set up the batch execution environment > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); > > > String createTableSql = "CREATE TABLE civici (\n" // > + "\t`X` DOUBLE,\n" // > + "\t`Y` DOUBLE\n" // > + ") WITH (\n"// > + "\t'connector.type' = 'filesystem',\n"// > + "\t'connector.path' = 'file:///tmp/test.csv',\n"// > + "\t'format.type' = 'csv',\n" // > + "\t'format.fields.0.name' = 'X',\n"// > + "\t'format.fields.0.data-type' = 'DOUBLE',\n" // > + "\t'format.fields.1.name' = 'Y',\n"// > + "\t'format.fields.1.data-type' = 'DOUBLE',\n"// > + "\t'format.field-delimiter' = ',',\n"// > // + "\t'format.line-delimiter' = '\n',\n"// > + "\t'format.quote-character' = '\"',\n"// > // +"\t'format.allow-comments' = 'true',"// > // +"\t'format.ignore-parse-errors' = 'true',"// > + "\t'format.ignore-first-line' = 'true'\n"// > // +"\t'format.array-element-delimiter' = '|', "// > // +"\t'format.escape-character' = '\\',"// > // +"\t'format.null-literal' = 'n/a'"// > + ")"; > System.out.println(createTableSql); > bbTableEnv.sqlUpdate(createTableSql); > > Table table = bbTableEnv.sqlQuery("SELECT * FROM civici"); > List<Row> rows = TableUtils.collectToList(table); > > for (Row row : rows) { > System.out.println(row); > } > } > > > Best, > Flavio >