And what about the env.registerTypeWithKryoSerializer? Now to create the table environment I don't use the ExecutionEnvironment anymore..how can I register those serializers? For example I used to run env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
Best, Flavio On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <imj...@gmail.com> wrote: > Hi Flavio, > > tableEnv.registerTableSource is deprecated in order to migrate to use DDL > and the new connector interface (i.e. FLIP-95 [1]). > You may need to implement a `ScanTableSource` that uses > `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`. > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html > > On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Ok..just one last thing: to use my TableSource I use the deprecated >> API registerTableSource: >> >> tableEnv.registerTableSource("MySourceDataset", tableSource); >> >> The javadoc says to use executeSql but this requires some extra steps >> (that are not mentioned in the documentation). >> Do I have to create a TableFactory, right? How do I register it? Is there >> an example somewhere? >> >> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote: >> >>> I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the >>> exception message definitely should be improved. >>> We created a similar issue a long time before >>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing >>> might be complicated. >>> >>> Best, >>> Jark >>> >>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> You're right Jark..sorry I didn't see the typo. The backticks are also >>>> mandatory. >>>> Maybe the exception message could be more meaningful and specify the >>>> token that caused the error instead of a general "SQL parse failed. >>>> Non-query expression encountered in illegal context". >>>> >>>> Thanks a lot for the support, >>>> Flavio >>>> >>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote: >>>> >>>>> A typo of "INSERTO"? Try this? >>>>> >>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <pomperma...@okkam.it> >>>>> wrote: >>>>> >>>>>> Now I'm able to run my code but there's something I don't understand: >>>>>> what is the difference between the following two? >>>>>> >>>>>> //common code >>>>>> final CsvTableSink outSink = new >>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >>>>>> tableEnv.registerTableSink("out", dsFields, >>>>>> myInputformat.getFieldTypes(), outSink); >>>>>> >>>>>> - tableEnv.from("MySourceDataset").executeInsert("out"); >>>>>> --> this works >>>>>> - tableEnv.executeSql("INSERTO INTO out SELECT * FROM >>>>>> MySourceDataset"); --> this does not work >>>>>> >>>>>> The second option fails with the following exception: >>>>>> >>>>>> Exception in thread "main" >>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. >>>>>> Non-query >>>>>> expression encountered in illegal context >>>>>> at >>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >>>>>> at >>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) >>>>>> at >>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> hi Flavio, >>>>>>> >>>>>>> `BatchTableSource` can only be used for old planner. >>>>>>> if you want to use Blink planner to run batch job, >>>>>>> your table source should implement `StreamTableSource` >>>>>>> and `isBounded` method return true. >>>>>>> >>>>>>> Best, >>>>>>> Godfrey >>>>>>> >>>>>>> >>>>>>> >>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道: >>>>>>> >>>>>>>> Is it correct to do something like this? >>>>>>>> >>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() { >>>>>>>> @Override >>>>>>>> public TableSchema getTableSchema() { >>>>>>>> return new TableSchema(dsFields, ft); >>>>>>>> } >>>>>>>> @Override >>>>>>>> public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { >>>>>>>> return execEnv.createInput(myInputformat); >>>>>>>> } >>>>>>>> }; >>>>>>>> >>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> How can you reuse InputFormat to write a TableSource? I think that >>>>>>>>> at least initially this could be the simplest way to test the >>>>>>>>> migration..then I could try yo implement the new Table Source >>>>>>>>> interface >>>>>>>>> >>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> hi Flavio, >>>>>>>>>> Only old planner supports BatchTableEnvironment (which can >>>>>>>>>> convert to/from DataSet), >>>>>>>>>> while Blink planner in batch mode only support TableEnvironment. >>>>>>>>>> Because Blink planner >>>>>>>>>> convert the batch queries to Transformation (corresponding to >>>>>>>>>> DataStream), instead of DataSet. >>>>>>>>>> >>>>>>>>>> one approach is you can migrate them to TableSource instead >>>>>>>>>> (InputFormat can be reused), >>>>>>>>>> but TableSource will be deprecated later. you can try new table >>>>>>>>>> source[1] >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Godfrey >>>>>>>>>> >>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道: >>>>>>>>>> >>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy >>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv >>>>>>>>>>> anymore so I >>>>>>>>>>> can't call createInput. >>>>>>>>>>> >>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to >>>>>>>>>>> TableSource instead? >>>>>>>>>>> >>>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>>> ExecutionEnvironment env = >>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>> BatchTableEnvironment btEnv = >>>>>>>>>>> TableEnvironment.getTableEnvironment(env); >>>>>>>>>>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>>>>>>>>>> ft).finish(); >>>>>>>>>>> DataSet<Row> rows = env.createInput(myInputformat); >>>>>>>>>>> Table table = btEnv.fromDataSet(rows, String.join(",", >>>>>>>>>>> dsFields)); >>>>>>>>>>> CsvTableSink outSink = new >>>>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >>>>>>>>>>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>>>>>>>>>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>>>>>>>>>> env.execute(); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>>>>>>>>>> dwysakow...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> You should be good with using the TableEnvironment. The >>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to >>>>>>>>>>>> DataStream. We do not support converting batch Table programs to >>>>>>>>>>>> DataStream yet. >>>>>>>>>>>> >>>>>>>>>>>> A following code should work: >>>>>>>>>>>> >>>>>>>>>>>> EnvironmentSettings settings = >>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>>>> >>>>>>>>>>>> TableEnvironment.create(settings); >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> >>>>>>>>>>>> Dawid >>>>>>>>>>>> >>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>>>>>>>>>>> > Hi to all, >>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I >>>>>>>>>>>> was >>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following: >>>>>>>>>>>> > >>>>>>>>>>>> > EnvironmentSettings settings = >>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>>>> > >>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's : >>>>>>>>>>>> > >>>>>>>>>>>> > if (!settings.isStreamingMode()) { >>>>>>>>>>>> > throw new TableException( >>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now, >>>>>>>>>>>> please use >>>>>>>>>>>> > TableEnvironment."); >>>>>>>>>>>> > } >>>>>>>>>>>> > >>>>>>>>>>>> > What should I do here? >>>>>>>>>>>> > >>>>>>>>>>>> > Thanks in advance, >>>>>>>>>>>> > Flavio >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>> >>