A typo of "INSERTO"? Try this? tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
Best, Jark On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Now I'm able to run my code but there's something I don't understand: what > is the difference between the following two? > > //common code > final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", > "\t", 1, WriteMode.OVERWRITE); > tableEnv.registerTableSink("out", dsFields, > myInputformat.getFieldTypes(), outSink); > > - tableEnv.from("MySourceDataset").executeInsert("out"); > --> this works > - tableEnv.executeSql("INSERTO INTO out SELECT * FROM > MySourceDataset"); --> this does not work > > The second option fails with the following exception: > > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > SQL parse failed. Non-query expression encountered in illegal context > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) > > Best, > Flavio > > On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com> wrote: > >> hi Flavio, >> >> `BatchTableSource` can only be used for old planner. >> if you want to use Blink planner to run batch job, >> your table source should implement `StreamTableSource` >> and `isBounded` method return true. >> >> Best, >> Godfrey >> >> >> >> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道: >> >>> Is it correct to do something like this? >>> >>> TableSource<Row> myTableSource = new BatchTableSource<Row>() { >>> @Override >>> public TableSchema getTableSchema() { >>> return new TableSchema(dsFields, ft); >>> } >>> @Override >>> public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { >>> return execEnv.createInput(myInputformat); >>> } >>> }; >>> >>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> How can you reuse InputFormat to write a TableSource? I think that at >>>> least initially this could be the simplest way to test the migration..then >>>> I could try yo implement the new Table Source interface >>>> >>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> wrote: >>>> >>>>> hi Flavio, >>>>> Only old planner supports BatchTableEnvironment (which can convert >>>>> to/from DataSet), >>>>> while Blink planner in batch mode only support TableEnvironment. >>>>> Because Blink planner >>>>> convert the batch queries to Transformation (corresponding to >>>>> DataStream), instead of DataSet. >>>>> >>>>> one approach is you can migrate them to TableSource instead >>>>> (InputFormat can be reused), >>>>> but TableSource will be deprecated later. you can try new table >>>>> source[1] >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>>>> >>>>> Best, >>>>> Godfrey >>>>> >>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道: >>>>> >>>>>> Thanks but I still can't understand how to migrate my legacy code. >>>>>> The main problem is that I can't create a BatchTableEnv anymore so I >>>>>> can't >>>>>> call createInput. >>>>>> >>>>>> Is there a way to reuse InputFormats? Should I migrate them to >>>>>> TableSource instead? >>>>>> >>>>>> public static void main(String[] args) throws Exception { >>>>>> ExecutionEnvironment env = >>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>> BatchTableEnvironment btEnv = >>>>>> TableEnvironment.getTableEnvironment(env); >>>>>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>>>>> ft).finish(); >>>>>> DataSet<Row> rows = env.createInput(myInputformat); >>>>>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); >>>>>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", >>>>>> "\t", 1, WriteMode.OVERWRITE); >>>>>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>>>>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>>>>> env.execute(); >>>>>> } >>>>>> >>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>>>>> dwysakow...@apache.org> wrote: >>>>>> >>>>>>> You should be good with using the TableEnvironment. The >>>>>>> StreamTableEnvironment is needed only if you want to convert to >>>>>>> DataStream. We do not support converting batch Table programs to >>>>>>> DataStream yet. >>>>>>> >>>>>>> A following code should work: >>>>>>> >>>>>>> EnvironmentSettings settings = >>>>>>> EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>> >>>>>>> TableEnvironment.create(settings); >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Dawid >>>>>>> >>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>>>>>> > Hi to all, >>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was >>>>>>> > using a BatchTableEnv and now I've tried to use the following: >>>>>>> > >>>>>>> > EnvironmentSettings settings = >>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>> > >>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's : >>>>>>> > >>>>>>> > if (!settings.isStreamingMode()) { >>>>>>> > throw new TableException( >>>>>>> > "StreamTableEnvironment can not run in batch mode for now, please >>>>>>> use >>>>>>> > TableEnvironment."); >>>>>>> > } >>>>>>> > >>>>>>> > What should I do here? >>>>>>> > >>>>>>> > Thanks in advance, >>>>>>> > Flavio >>>>>>> >>>>>>> >>>>>>