Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
Thanks, that was definitely helpful! On Mon, Jul 13, 2020 at 4:39 PM Jark Wu wrote: > You can set string-based configuration on > `tEnv.getConfig.getConfiguration.setString(..)` to replace them. > Maybe you can try pipeline.default-kryo-serializers [1]. > > Best, > Jark > > [1]: > https://ci.apa

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Jark Wu
You can set string-based configuration on `tEnv.getConfig.getConfiguration.setString(..)` to replace them. Maybe you can try pipeline.default-kryo-serializers [1]. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers On Mon, 13

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
And what about the env.registerTypeWithKryoSerializer? Now to create the table environment I don't use the ExecutionEnvironment anymore..how can I register those serializers? For example I used to run env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class); Best, Flavio O

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Jark Wu
Hi Flavio, tableEnv.registerTableSource is deprecated in order to migrate to use DDL and the new connector interface (i.e. FLIP-95 [1]). You may need to implement a `ScanTableSource` that uses `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`. Best, Jark [1]: https://ci.apach

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
Ok..just one last thing: to use my TableSource I use the deprecated API registerTableSource: tableEnv.registerTableSource("MySourceDataset", tableSource); The javadoc says to use executeSql but this requires some extra steps (that are not mentioned in the documentation). Do I have to create a Tab

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Jark Wu
I agree with you @Flavio Pompermaier , the exception message definitely should be improved. We created a similar issue a long time before https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might be complicated. Best, Jark On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier wrote:

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
You're right Jark..sorry I didn't see the typo. The backticks are also mandatory. Maybe the exception message could be more meaningful and specify the token that caused the error instead of a general "SQL parse failed. Non-query expression encountered in illegal context". Thanks a lot for the supp

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Jark Wu
A typo of "INSERTO"? Try this? tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); Best, Jark On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier wrote: > Now I'm able to run my code but there's something I don't understand: what > is the difference between the following two?

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
Now I'm able to run my code but there's something I don't understand: what is the difference between the following two? //common code final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); tableEnv.registerTableSink("out", dsFields, myInpu

Re: Table API jobs migration to Flink 1.11

2020-07-12 Thread godfrey he
hi Flavio, `BatchTableSource` can only be used for old planner. if you want to use Blink planner to run batch job, your table source should implement `StreamTableSource` and `isBounded` method return true. Best, Godfrey Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: > Is it correct to do someth

Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread Flavio Pompermaier
Is it correct to do something like this? TableSource myTableSource = new BatchTableSource() { @Override public TableSchema getTableSchema() { return new TableSchema(dsFields, ft); } @Override public DataSet getDataSet(ExecutionEnvironment execEnv) { re

Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread Flavio Pompermaier
How can you reuse InputFormat to write a TableSource? I think that at least initially this could be the simplest way to test the migration..then I could try yo implement the new Table Source interface On Fri, Jul 10, 2020 at 3:38 PM godfrey he wrote: > hi Flavio, > Only old planner supports Batc

Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread godfrey he
hi Flavio, Only old planner supports BatchTableEnvironment (which can convert to/from DataSet), while Blink planner in batch mode only support TableEnvironment. Because Blink planner convert the batch queries to Transformation (corresponding to DataStream), instead of DataSet. one approach is you

Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread Flavio Pompermaier
Thanks but I still can't understand how to migrate my legacy code. The main problem is that I can't create a BatchTableEnv anymore so I can't call createInput. Is there a way to reuse InputFormats? Should I migrate them to TableSource instead? public static void main(String[] args) throws Excepti

Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread Dawid Wysakowicz
You should be good with using the TableEnvironment. The StreamTableEnvironment is needed only if you want to convert to DataStream. We do not support converting batch Table programs to DataStream yet. A following code should work: EnvironmentSettings settings = EnvironmentSettings.newInstance().i