And what about the env.registerTypeWithKryoSerializer?
Now to create the table environment I don't use the ExecutionEnvironment
anymore..how can I register those serializers?
For example I used to run
env.registerTypeWithKryoSerializer(DateTime.class,
JodaDateTimeSerializer.class);

Best,
Flavio

On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Flavio,
>
> tableEnv.registerTableSource is deprecated in order to migrate to use DDL
> and the new connector interface (i.e. FLIP-95 [1]).
> You may need to implement a `ScanTableSource` that uses
> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
>
> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Ok..just one last thing: to use my TableSource I use the deprecated
>> API registerTableSource:
>>
>> tableEnv.registerTableSource("MySourceDataset", tableSource);
>>
>> The javadoc says to use executeSql but this requires some extra steps
>> (that are not mentioned in the documentation).
>> Do I have to create a TableFactory, right? How do I register it? Is there
>> an example somewhere?
>>
>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote:
>>
>>> I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the
>>> exception message definitely should be improved.
>>> We created a similar issue a long time before
>>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing
>>> might be complicated.
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> You're right Jark..sorry I didn't see the typo. The backticks are also
>>>> mandatory.
>>>> Maybe the exception message could be more meaningful and specify the
>>>> token that caused the error instead of a general "SQL parse failed.
>>>> Non-query expression encountered in illegal context".
>>>>
>>>> Thanks a lot for the support,
>>>> Flavio
>>>>
>>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote:
>>>>
>>>>> A typo of "INSERTO"? Try this?
>>>>>
>>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <pomperma...@okkam.it>
>>>>> wrote:
>>>>>
>>>>>> Now I'm able to run my code but there's something I don't understand:
>>>>>> what is the difference between the following two?
>>>>>>
>>>>>>      //common code
>>>>>>      final CsvTableSink outSink = new
>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>      tableEnv.registerTableSink("out", dsFields,
>>>>>> myInputformat.getFieldTypes(), outSink);
>>>>>>
>>>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>>>                                    --> this works
>>>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>>>    MySourceDataset");   --> this does not work
>>>>>>
>>>>>> The second option fails with the following exception:
>>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>>>>>> Non-query
>>>>>> expression encountered in illegal context
>>>>>> at
>>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>>>> at
>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>>>> at
>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> hi Flavio,
>>>>>>>
>>>>>>> `BatchTableSource` can only be used for old planner.
>>>>>>> if you want to use Blink planner to run batch job,
>>>>>>> your table source should implement `StreamTableSource`
>>>>>>> and `isBounded` method return true.
>>>>>>>
>>>>>>> Best,
>>>>>>> Godfrey
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>>>
>>>>>>>> Is it correct to do something like this?
>>>>>>>>
>>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>>>       @Override
>>>>>>>>       public TableSchema getTableSchema() {
>>>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>>>       }
>>>>>>>>       @Override
>>>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>>>       }
>>>>>>>>     };
>>>>>>>>
>>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> How can you reuse InputFormat to write a TableSource? I think that
>>>>>>>>> at least initially this could be the simplest way to test the
>>>>>>>>> migration..then I could try yo implement the new Table Source 
>>>>>>>>> interface
>>>>>>>>>
>>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> hi Flavio,
>>>>>>>>>> Only old planner supports BatchTableEnvironment (which can
>>>>>>>>>> convert to/from DataSet),
>>>>>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>>>>>> Because Blink planner
>>>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>>>
>>>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>>>> (InputFormat can be reused),
>>>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>>>> source[1]
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Godfrey
>>>>>>>>>>
>>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>>>>>>>
>>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy
>>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv 
>>>>>>>>>>> anymore so I
>>>>>>>>>>> can't call createInput.
>>>>>>>>>>>
>>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>>>> TableSource instead?
>>>>>>>>>>>
>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>>>> ft).finish();
>>>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>>>> dsFields));
>>>>>>>>>>>     CsvTableSink outSink = new
>>>>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>>>     env.execute();
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>>>> dwysakow...@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>>>>>>>> DataStream yet.
>>>>>>>>>>>>
>>>>>>>>>>>> A following code should work:
>>>>>>>>>>>>
>>>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>
>>>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>>
>>>>>>>>>>>> Dawid
>>>>>>>>>>>>
>>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>>>> > Hi to all,
>>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I
>>>>>>>>>>>> was
>>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>>>>>>>> >
>>>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>> >
>>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>>>>>>>> >
>>>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>>>> >     throw new TableException(
>>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>>>> please use
>>>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>>>> > }
>>>>>>>>>>>> >
>>>>>>>>>>>> > What should I do here?
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks in advance,
>>>>>>>>>>>> > Flavio
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>
>>

Reply via email to