Hi to all, I'm trying to read and print out the content of my parquet directory with Flink 1.11 (using the bridge API). However Flink complains that there is no topology to execute..what am I doing wrong? The exception is:
java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at it.okkam.datalinks.batch.flink.ProfileTest.main(ProfileTest.java:52) This is the code: ------------------------ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().inStreamingMode().build()); tableEnv.executeSql(-----see below [1] ----); Table inputTable = tableEnv.sqlQuery("SELECT * FROM source"); tableEnv.toAppendStream(inputTable, new RowTypeInfo(inputTable.getSchema().getFieldTypes());).print() final JobExecutionResult jobRes = tableEnv.execute("test-job"); [1] ---------- CREATE TABLE `source` ( `col1` BIGINT, `col2` STRING ) WITH ( 'connector' = 'filesystem', 'format' = 'parquet', 'update-mode' = 'append', 'path' = '/tmp/parquet-test', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='success-file', 'format.parquet.compression'='snappy', 'format.parquet.enable.dictionary'='true', 'format.parquet.block.size'='0', 'sink.shuffle-by-partition.enable' = 'true' ) ----------- Thanks in advance, Flavio