Ok..just one last thing: to use my TableSource I use the deprecated API registerTableSource:
tableEnv.registerTableSource("MySourceDataset", tableSource); The javadoc says to use executeSql but this requires some extra steps (that are not mentioned in the documentation). Do I have to create a TableFactory, right? How do I register it? Is there an example somewhere? On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote: > I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the > exception message definitely should be improved. > We created a similar issue a long time before > https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might > be complicated. > > Best, > Jark > > On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> You're right Jark..sorry I didn't see the typo. The backticks are also >> mandatory. >> Maybe the exception message could be more meaningful and specify the >> token that caused the error instead of a general "SQL parse failed. >> Non-query expression encountered in illegal context". >> >> Thanks a lot for the support, >> Flavio >> >> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote: >> >>> A typo of "INSERTO"? Try this? >>> >>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); >>> >>> Best, >>> Jark >>> >>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Now I'm able to run my code but there's something I don't understand: >>>> what is the difference between the following two? >>>> >>>> //common code >>>> final CsvTableSink outSink = new >>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >>>> tableEnv.registerTableSink("out", dsFields, >>>> myInputformat.getFieldTypes(), outSink); >>>> >>>> - tableEnv.from("MySourceDataset").executeInsert("out"); >>>> --> this works >>>> - tableEnv.executeSql("INSERTO INTO out SELECT * FROM >>>> MySourceDataset"); --> this does not work >>>> >>>> The second option fails with the following exception: >>>> >>>> Exception in thread "main" >>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query >>>> expression encountered in illegal context >>>> at >>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >>>> at >>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>>> >>>> Best, >>>> Flavio >>>> >>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com> wrote: >>>> >>>>> hi Flavio, >>>>> >>>>> `BatchTableSource` can only be used for old planner. >>>>> if you want to use Blink planner to run batch job, >>>>> your table source should implement `StreamTableSource` >>>>> and `isBounded` method return true. >>>>> >>>>> Best, >>>>> Godfrey >>>>> >>>>> >>>>> >>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道: >>>>> >>>>>> Is it correct to do something like this? >>>>>> >>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() { >>>>>> @Override >>>>>> public TableSchema getTableSchema() { >>>>>> return new TableSchema(dsFields, ft); >>>>>> } >>>>>> @Override >>>>>> public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { >>>>>> return execEnv.createInput(myInputformat); >>>>>> } >>>>>> }; >>>>>> >>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> How can you reuse InputFormat to write a TableSource? I think that >>>>>>> at least initially this could be the simplest way to test the >>>>>>> migration..then I could try yo implement the new Table Source interface >>>>>>> >>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> hi Flavio, >>>>>>>> Only old planner supports BatchTableEnvironment (which can convert >>>>>>>> to/from DataSet), >>>>>>>> while Blink planner in batch mode only support TableEnvironment. >>>>>>>> Because Blink planner >>>>>>>> convert the batch queries to Transformation (corresponding to >>>>>>>> DataStream), instead of DataSet. >>>>>>>> >>>>>>>> one approach is you can migrate them to TableSource instead >>>>>>>> (InputFormat can be reused), >>>>>>>> but TableSource will be deprecated later. you can try new table >>>>>>>> source[1] >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>>>>>>> >>>>>>>> Best, >>>>>>>> Godfrey >>>>>>>> >>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道: >>>>>>>> >>>>>>>>> Thanks but I still can't understand how to migrate my legacy code. >>>>>>>>> The main problem is that I can't create a BatchTableEnv anymore so I >>>>>>>>> can't >>>>>>>>> call createInput. >>>>>>>>> >>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to >>>>>>>>> TableSource instead? >>>>>>>>> >>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>> ExecutionEnvironment env = >>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>> BatchTableEnvironment btEnv = >>>>>>>>> TableEnvironment.getTableEnvironment(env); >>>>>>>>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>>>>>>>> ft).finish(); >>>>>>>>> DataSet<Row> rows = env.createInput(myInputformat); >>>>>>>>> Table table = btEnv.fromDataSet(rows, String.join(",", >>>>>>>>> dsFields)); >>>>>>>>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", >>>>>>>>> "\t", 1, WriteMode.OVERWRITE); >>>>>>>>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>>>>>>>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>>>>>>>> env.execute(); >>>>>>>>> } >>>>>>>>> >>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>>>>>>>> dwysakow...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> You should be good with using the TableEnvironment. The >>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to >>>>>>>>>> DataStream. We do not support converting batch Table programs to >>>>>>>>>> DataStream yet. >>>>>>>>>> >>>>>>>>>> A following code should work: >>>>>>>>>> >>>>>>>>>> EnvironmentSettings settings = >>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>> >>>>>>>>>> TableEnvironment.create(settings); >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> >>>>>>>>>> Dawid >>>>>>>>>> >>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>>>>>>>>> > Hi to all, >>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I >>>>>>>>>> was >>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following: >>>>>>>>>> > >>>>>>>>>> > EnvironmentSettings settings = >>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>> > >>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's : >>>>>>>>>> > >>>>>>>>>> > if (!settings.isStreamingMode()) { >>>>>>>>>> > throw new TableException( >>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now, >>>>>>>>>> please use >>>>>>>>>> > TableEnvironment."); >>>>>>>>>> > } >>>>>>>>>> > >>>>>>>>>> > What should I do here? >>>>>>>>>> > >>>>>>>>>> > Thanks in advance, >>>>>>>>>> > Flavio >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>