hi Flavio,

`BatchTableSource` can only be used for old planner.
if you want to use Blink planner to run batch job,
your table source should implement `StreamTableSource`
and `isBounded` method return true.

Best,
Godfrey



Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道:

> Is it correct to do something like this?
>
> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>       @Override
>       public TableSchema getTableSchema() {
>         return new TableSchema(dsFields, ft);
>       }
>       @Override
>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>         return execEnv.createInput(myInputformat);
>       }
>     };
>
> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> How can you reuse InputFormat to write a TableSource? I think that at
>> least initially this could be the simplest way to test the migration..then
>> I could try yo implement the new Table Source interface
>>
>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> wrote:
>>
>>> hi Flavio,
>>> Only old planner supports BatchTableEnvironment (which can convert
>>> to/from DataSet),
>>> while Blink planner in batch mode only support TableEnvironment. Because
>>> Blink planner
>>> convert the batch queries to Transformation (corresponding to
>>> DataStream), instead of DataSet.
>>>
>>> one approach is you can migrate them to TableSource instead (InputFormat
>>> can be reused),
>>> but TableSource will be deprecated later. you can try new table source[1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>
>>>> Thanks but I still can't understand how to migrate my legacy code. The
>>>> main problem is that I can't create a BatchTableEnv anymore so I can't
>>>> call createInput.
>>>>
>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>> TableSource instead?
>>>>
>>>> public static void main(String[] args) throws Exception {
>>>>     ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>     BatchTableEnvironment btEnv =
>>>> TableEnvironment.getTableEnvironment(env);
>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>> ft).finish();
>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>     Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
>>>> 1, WriteMode.OVERWRITE);
>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>     env.execute();
>>>>   }
>>>>
>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>> dwysakow...@apache.org> wrote:
>>>>
>>>>> You should be good with using the TableEnvironment. The
>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>> DataStream. We do not support converting batch Table programs to
>>>>> DataStream yet.
>>>>>
>>>>> A following code should work:
>>>>>
>>>>> EnvironmentSettings settings =
>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>
>>>>> TableEnvironment.create(settings);
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>> > Hi to all,
>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>> >
>>>>> > EnvironmentSettings settings =
>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>> >
>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>> >
>>>>> > if (!settings.isStreamingMode()) {
>>>>> >     throw new TableException(
>>>>> > "StreamTableEnvironment can not run in batch mode for now, please use
>>>>> > TableEnvironment.");
>>>>> > }
>>>>> >
>>>>> > What should I do here?
>>>>> >
>>>>> > Thanks in advance,
>>>>> > Flavio
>>>>>
>>>>>
>>>>

Reply via email to