Is it correct to do something like this?

TableSource<Row> myTableSource = new BatchTableSource<Row>() {
      @Override
      public TableSchema getTableSchema() {
        return new TableSchema(dsFields, ft);
      }
      @Override
      public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
        return execEnv.createInput(myInputformat);
      }
    };

On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> How can you reuse InputFormat to write a TableSource? I think that at
> least initially this could be the simplest way to test the migration..then
> I could try yo implement the new Table Source interface
>
> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> wrote:
>
>> hi Flavio,
>> Only old planner supports BatchTableEnvironment (which can convert
>> to/from DataSet),
>> while Blink planner in batch mode only support TableEnvironment. Because
>> Blink planner
>> convert the batch queries to Transformation (corresponding to
>> DataStream), instead of DataSet.
>>
>> one approach is you can migrate them to TableSource instead (InputFormat
>> can be reused),
>> but TableSource will be deprecated later. you can try new table source[1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>
>>> Thanks but I still can't understand how to migrate my legacy code. The
>>> main problem is that I can't create a BatchTableEnv anymore so I can't
>>> call createInput.
>>>
>>> Is there a way to reuse InputFormats? Should I migrate them to
>>> TableSource instead?
>>>
>>> public static void main(String[] args) throws Exception {
>>>     ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>     BatchTableEnvironment btEnv =
>>> TableEnvironment.getTableEnvironment(env);
>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>> ft).finish();
>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>     Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
>>> 1, WriteMode.OVERWRITE);
>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>     env.execute();
>>>   }
>>>
>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>> dwysakow...@apache.org> wrote:
>>>
>>>> You should be good with using the TableEnvironment. The
>>>> StreamTableEnvironment is needed only if you want to convert to
>>>> DataStream. We do not support converting batch Table programs to
>>>> DataStream yet.
>>>>
>>>> A following code should work:
>>>>
>>>> EnvironmentSettings settings =
>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>
>>>> TableEnvironment.create(settings);
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>> > Hi to all,
>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>> >
>>>> > EnvironmentSettings settings =
>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>> >
>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>> >
>>>> > if (!settings.isStreamingMode()) {
>>>> >     throw new TableException(
>>>> > "StreamTableEnvironment can not run in batch mode for now, please use
>>>> > TableEnvironment.");
>>>> > }
>>>> >
>>>> > What should I do here?
>>>> >
>>>> > Thanks in advance,
>>>> > Flavio
>>>>
>>>>
>>>

Reply via email to