Is it correct to do something like this? TableSource<Row> myTableSource = new BatchTableSource<Row>() { @Override public TableSchema getTableSchema() { return new TableSchema(dsFields, ft); } @Override public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { return execEnv.createInput(myInputformat); } };
On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <pomperma...@okkam.it> wrote: > How can you reuse InputFormat to write a TableSource? I think that at > least initially this could be the simplest way to test the migration..then > I could try yo implement the new Table Source interface > > On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> wrote: > >> hi Flavio, >> Only old planner supports BatchTableEnvironment (which can convert >> to/from DataSet), >> while Blink planner in batch mode only support TableEnvironment. Because >> Blink planner >> convert the batch queries to Transformation (corresponding to >> DataStream), instead of DataSet. >> >> one approach is you can migrate them to TableSource instead (InputFormat >> can be reused), >> but TableSource will be deprecated later. you can try new table source[1] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >> >> Best, >> Godfrey >> >> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道: >> >>> Thanks but I still can't understand how to migrate my legacy code. The >>> main problem is that I can't create a BatchTableEnv anymore so I can't >>> call createInput. >>> >>> Is there a way to reuse InputFormats? Should I migrate them to >>> TableSource instead? >>> >>> public static void main(String[] args) throws Exception { >>> ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> BatchTableEnvironment btEnv = >>> TableEnvironment.getTableEnvironment(env); >>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>> ft).finish(); >>> DataSet<Row> rows = env.createInput(myInputformat); >>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); >>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", >>> 1, WriteMode.OVERWRITE); >>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>> env.execute(); >>> } >>> >>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>> dwysakow...@apache.org> wrote: >>> >>>> You should be good with using the TableEnvironment. The >>>> StreamTableEnvironment is needed only if you want to convert to >>>> DataStream. We do not support converting batch Table programs to >>>> DataStream yet. >>>> >>>> A following code should work: >>>> >>>> EnvironmentSettings settings = >>>> EnvironmentSettings.newInstance().inBatchMode().build(); >>>> >>>> TableEnvironment.create(settings); >>>> >>>> Best, >>>> >>>> Dawid >>>> >>>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>>> > Hi to all, >>>> > I was trying to update my legacy code to Flink 1.11. Before I was >>>> > using a BatchTableEnv and now I've tried to use the following: >>>> > >>>> > EnvironmentSettings settings = >>>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>>> > >>>> > Unfortunately in the StreamTableEnvironmentImpl code there's : >>>> > >>>> > if (!settings.isStreamingMode()) { >>>> > throw new TableException( >>>> > "StreamTableEnvironment can not run in batch mode for now, please use >>>> > TableEnvironment."); >>>> > } >>>> > >>>> > What should I do here? >>>> > >>>> > Thanks in advance, >>>> > Flavio >>>> >>>> >>>