Now I'm able to run my code but there's something I don't understand: what is the difference between the following two?
//common code final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); tableEnv.registerTableSink("out", dsFields, myInputformat.getFieldTypes(), outSink); - tableEnv.from("MySourceDataset").executeInsert("out"); --> this works - tableEnv.executeSql("INSERTO INTO out SELECT * FROM MySourceDataset"); --> this does not work The second option fails with the following exception: Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query expression encountered in illegal context at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) Best, Flavio On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com> wrote: > hi Flavio, > > `BatchTableSource` can only be used for old planner. > if you want to use Blink planner to run batch job, > your table source should implement `StreamTableSource` > and `isBounded` method return true. > > Best, > Godfrey > > > > Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道: > >> Is it correct to do something like this? >> >> TableSource<Row> myTableSource = new BatchTableSource<Row>() { >> @Override >> public TableSchema getTableSchema() { >> return new TableSchema(dsFields, ft); >> } >> @Override >> public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { >> return execEnv.createInput(myInputformat); >> } >> }; >> >> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> How can you reuse InputFormat to write a TableSource? I think that at >>> least initially this could be the simplest way to test the migration..then >>> I could try yo implement the new Table Source interface >>> >>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> wrote: >>> >>>> hi Flavio, >>>> Only old planner supports BatchTableEnvironment (which can convert >>>> to/from DataSet), >>>> while Blink planner in batch mode only support TableEnvironment. >>>> Because Blink planner >>>> convert the batch queries to Transformation (corresponding to >>>> DataStream), instead of DataSet. >>>> >>>> one approach is you can migrate them to TableSource instead >>>> (InputFormat can be reused), >>>> but TableSource will be deprecated later. you can try new table >>>> source[1] >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>>> >>>> Best, >>>> Godfrey >>>> >>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道: >>>> >>>>> Thanks but I still can't understand how to migrate my legacy code. The >>>>> main problem is that I can't create a BatchTableEnv anymore so I can't >>>>> call createInput. >>>>> >>>>> Is there a way to reuse InputFormats? Should I migrate them to >>>>> TableSource instead? >>>>> >>>>> public static void main(String[] args) throws Exception { >>>>> ExecutionEnvironment env = >>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>> BatchTableEnvironment btEnv = >>>>> TableEnvironment.getTableEnvironment(env); >>>>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>>>> ft).finish(); >>>>> DataSet<Row> rows = env.createInput(myInputformat); >>>>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); >>>>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", >>>>> "\t", 1, WriteMode.OVERWRITE); >>>>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>>>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>>>> env.execute(); >>>>> } >>>>> >>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>>>> dwysakow...@apache.org> wrote: >>>>> >>>>>> You should be good with using the TableEnvironment. The >>>>>> StreamTableEnvironment is needed only if you want to convert to >>>>>> DataStream. We do not support converting batch Table programs to >>>>>> DataStream yet. >>>>>> >>>>>> A following code should work: >>>>>> >>>>>> EnvironmentSettings settings = >>>>>> EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>> >>>>>> TableEnvironment.create(settings); >>>>>> >>>>>> Best, >>>>>> >>>>>> Dawid >>>>>> >>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>>>>> > Hi to all, >>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was >>>>>> > using a BatchTableEnv and now I've tried to use the following: >>>>>> > >>>>>> > EnvironmentSettings settings = >>>>>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>> > >>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's : >>>>>> > >>>>>> > if (!settings.isStreamingMode()) { >>>>>> > throw new TableException( >>>>>> > "StreamTableEnvironment can not run in batch mode for now, please >>>>>> use >>>>>> > TableEnvironment."); >>>>>> > } >>>>>> > >>>>>> > What should I do here? >>>>>> > >>>>>> > Thanks in advance, >>>>>> > Flavio >>>>>> >>>>>> >>>>>