hi Flavio, `BatchTableSource` can only be used for old planner. if you want to use Blink planner to run batch job, your table source should implement `StreamTableSource` and `isBounded` method return true.
Best, Godfrey Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道: > Is it correct to do something like this? > > TableSource<Row> myTableSource = new BatchTableSource<Row>() { > @Override > public TableSchema getTableSchema() { > return new TableSchema(dsFields, ft); > } > @Override > public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { > return execEnv.createInput(myInputformat); > } > }; > > On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> How can you reuse InputFormat to write a TableSource? I think that at >> least initially this could be the simplest way to test the migration..then >> I could try yo implement the new Table Source interface >> >> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> wrote: >> >>> hi Flavio, >>> Only old planner supports BatchTableEnvironment (which can convert >>> to/from DataSet), >>> while Blink planner in batch mode only support TableEnvironment. Because >>> Blink planner >>> convert the batch queries to Transformation (corresponding to >>> DataStream), instead of DataSet. >>> >>> one approach is you can migrate them to TableSource instead (InputFormat >>> can be reused), >>> but TableSource will be deprecated later. you can try new table source[1] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>> >>> Best, >>> Godfrey >>> >>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道: >>> >>>> Thanks but I still can't understand how to migrate my legacy code. The >>>> main problem is that I can't create a BatchTableEnv anymore so I can't >>>> call createInput. >>>> >>>> Is there a way to reuse InputFormats? Should I migrate them to >>>> TableSource instead? >>>> >>>> public static void main(String[] args) throws Exception { >>>> ExecutionEnvironment env = >>>> ExecutionEnvironment.getExecutionEnvironment(); >>>> BatchTableEnvironment btEnv = >>>> TableEnvironment.getTableEnvironment(env); >>>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>>> ft).finish(); >>>> DataSet<Row> rows = env.createInput(myInputformat); >>>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); >>>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", >>>> 1, WriteMode.OVERWRITE); >>>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>>> env.execute(); >>>> } >>>> >>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>>> dwysakow...@apache.org> wrote: >>>> >>>>> You should be good with using the TableEnvironment. The >>>>> StreamTableEnvironment is needed only if you want to convert to >>>>> DataStream. We do not support converting batch Table programs to >>>>> DataStream yet. >>>>> >>>>> A following code should work: >>>>> >>>>> EnvironmentSettings settings = >>>>> EnvironmentSettings.newInstance().inBatchMode().build(); >>>>> >>>>> TableEnvironment.create(settings); >>>>> >>>>> Best, >>>>> >>>>> Dawid >>>>> >>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>>>> > Hi to all, >>>>> > I was trying to update my legacy code to Flink 1.11. Before I was >>>>> > using a BatchTableEnv and now I've tried to use the following: >>>>> > >>>>> > EnvironmentSettings settings = >>>>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>>>> > >>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's : >>>>> > >>>>> > if (!settings.isStreamingMode()) { >>>>> > throw new TableException( >>>>> > "StreamTableEnvironment can not run in batch mode for now, please use >>>>> > TableEnvironment."); >>>>> > } >>>>> > >>>>> > What should I do here? >>>>> > >>>>> > Thanks in advance, >>>>> > Flavio >>>>> >>>>> >>>>