How can you reuse InputFormat to write a TableSource? I think that at least initially this could be the simplest way to test the migration..then I could try yo implement the new Table Source interface
On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> wrote: > hi Flavio, > Only old planner supports BatchTableEnvironment (which can convert to/from > DataSet), > while Blink planner in batch mode only support TableEnvironment. Because > Blink planner > convert the batch queries to Transformation (corresponding to DataStream), > instead of DataSet. > > one approach is you can migrate them to TableSource instead (InputFormat > can be reused), > but TableSource will be deprecated later. you can try new table source[1] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html > > Best, > Godfrey > > Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道: > >> Thanks but I still can't understand how to migrate my legacy code. The >> main problem is that I can't create a BatchTableEnv anymore so I can't >> call createInput. >> >> Is there a way to reuse InputFormats? Should I migrate them to >> TableSource instead? >> >> public static void main(String[] args) throws Exception { >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> BatchTableEnvironment btEnv = >> TableEnvironment.getTableEnvironment(env); >> MyInputFormat myInputformat = new MyInputFormat(dsFields, >> ft).finish(); >> DataSet<Row> rows = env.createInput(myInputformat); >> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); >> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", >> 1, WriteMode.OVERWRITE); >> btEnv.registerTableSink("out", dsFields, ft, outSink); >> btEnv.insertInto(table, "out", btEnv.queryConfig()); >> env.execute(); >> } >> >> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >>> You should be good with using the TableEnvironment. The >>> StreamTableEnvironment is needed only if you want to convert to >>> DataStream. We do not support converting batch Table programs to >>> DataStream yet. >>> >>> A following code should work: >>> >>> EnvironmentSettings settings = >>> EnvironmentSettings.newInstance().inBatchMode().build(); >>> >>> TableEnvironment.create(settings); >>> >>> Best, >>> >>> Dawid >>> >>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>> > Hi to all, >>> > I was trying to update my legacy code to Flink 1.11. Before I was >>> > using a BatchTableEnv and now I've tried to use the following: >>> > >>> > EnvironmentSettings settings = >>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>> > >>> > Unfortunately in the StreamTableEnvironmentImpl code there's : >>> > >>> > if (!settings.isStreamingMode()) { >>> > throw new TableException( >>> > "StreamTableEnvironment can not run in batch mode for now, please use >>> > TableEnvironment."); >>> > } >>> > >>> > What should I do here? >>> > >>> > Thanks in advance, >>> > Flavio >>> >>> >>