Hi, I am trying to get a simple streaming job running in pyFlink and understand the new 1.11 API. I just want to read from and write to kafka topics. Previously I was using t_env.execute("jobname"), register_table_source() and register_table_sink() but in 1.11 all 3 were deprecated and replaced by execute_sql() in the deprecation warning.
exec_env = StreamExecutionEnvironment.get_execution_environment() exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) t_env = StreamTableEnvironment.create(exec_env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() ) ddl_source = f""" CREATE TABLE {INPUT_TABLE} ( `monitorId` VARCHAR, `time_st` TIMESTAMP(3), WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, `data` DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = '{INPUT_TOPIC}', 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', 'properties.group.id' = 'myGroup', 'format' = 'json' ) """ ddl_sink = f""" CREATE TABLE {OUTPUT_TABLE} ( `monitorId` VARCHAR, `max` DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = '{OUTPUT_TOPIC}', 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', 'format' = 'json' ) """ t_env.execute_sql(ddl_source) t_env.execute_sql(ddl_sink) t_env.execute_sql(f""" INSERT INTO {OUTPUT_TABLE} SELECT monitorId, data FROM {INPUT_TABLE} """) This gives me the error : : java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. I am aware this is lazily evaluated, so is there some equivalent SQL statement for t_env.execute() that I should be calling? Thanks, Manas