Now I'm able to run my code but there's something I don't understand: what
is the difference between the following two?

     //common code
     final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
"\t", 1, WriteMode.OVERWRITE);
     tableEnv.registerTableSink("out", dsFields,
myInputformat.getFieldTypes(), outSink);

   - tableEnv.from("MySourceDataset").executeInsert("out");
                             --> this works
   - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
   MySourceDataset");   --> this does not work

The second option fails with the following exception:

Exception in thread "main" org.apache.flink.table.api.SqlParserException:
SQL parse failed. Non-query expression encountered in illegal context
at
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)

Best,
Flavio

On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com> wrote:

> hi Flavio,
>
> `BatchTableSource` can only be used for old planner.
> if you want to use Blink planner to run batch job,
> your table source should implement `StreamTableSource`
> and `isBounded` method return true.
>
> Best,
> Godfrey
>
>
>
> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>
>> Is it correct to do something like this?
>>
>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>       @Override
>>       public TableSchema getTableSchema() {
>>         return new TableSchema(dsFields, ft);
>>       }
>>       @Override
>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>         return execEnv.createInput(myInputformat);
>>       }
>>     };
>>
>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> How can you reuse InputFormat to write a TableSource? I think that at
>>> least initially this could be the simplest way to test the migration..then
>>> I could try yo implement the new Table Source interface
>>>
>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> wrote:
>>>
>>>> hi Flavio,
>>>> Only old planner supports BatchTableEnvironment (which can convert
>>>> to/from DataSet),
>>>> while Blink planner in batch mode only support TableEnvironment.
>>>> Because Blink planner
>>>> convert the batch queries to Transformation (corresponding to
>>>> DataStream), instead of DataSet.
>>>>
>>>> one approach is you can migrate them to TableSource instead
>>>> (InputFormat can be reused),
>>>> but TableSource will be deprecated later. you can try new table
>>>> source[1]
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>
>>>> Best,
>>>> Godfrey
>>>>
>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>
>>>>> Thanks but I still can't understand how to migrate my legacy code. The
>>>>> main problem is that I can't create a BatchTableEnv anymore so I can't
>>>>> call createInput.
>>>>>
>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>> TableSource instead?
>>>>>
>>>>> public static void main(String[] args) throws Exception {
>>>>>     ExecutionEnvironment env =
>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>     BatchTableEnvironment btEnv =
>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>> ft).finish();
>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>>>> "\t", 1, WriteMode.OVERWRITE);
>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>     env.execute();
>>>>>   }
>>>>>
>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>> dwysakow...@apache.org> wrote:
>>>>>
>>>>>> You should be good with using the TableEnvironment. The
>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>> DataStream yet.
>>>>>>
>>>>>> A following code should work:
>>>>>>
>>>>>> EnvironmentSettings settings =
>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>
>>>>>> TableEnvironment.create(settings);
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>> > Hi to all,
>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>> >
>>>>>> > EnvironmentSettings settings =
>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>> >
>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>> >
>>>>>> > if (!settings.isStreamingMode()) {
>>>>>> >     throw new TableException(
>>>>>> > "StreamTableEnvironment can not run in batch mode for now, please
>>>>>> use
>>>>>> > TableEnvironment.");
>>>>>> > }
>>>>>> >
>>>>>> > What should I do here?
>>>>>> >
>>>>>> > Thanks in advance,
>>>>>> > Flavio
>>>>>>
>>>>>>
>>>>>

Reply via email to