Hi,
I am trying to get a simple streaming job running in pyFlink and understand
the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source()
and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
execute_sql() in the deprecation warning.

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,

environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
                                      )

ddl_source = f"""
    CREATE TABLE {INPUT_TABLE} (
        `monitorId` VARCHAR,
        `time_st` TIMESTAMP(3),
        WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
        `data` DOUBLE
    ) WITH (
        'connector' = 'kafka',
        'topic' = '{INPUT_TOPIC}',
        'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
        'properties.group.id' = 'myGroup',
        'format' = 'json'
    )
"""

ddl_sink = f"""
    CREATE TABLE {OUTPUT_TABLE} (
        `monitorId` VARCHAR,
        `max` DOUBLE
    ) WITH (
        'connector' = 'kafka',
        'topic' = '{OUTPUT_TOPIC}',
        'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
        'format' = 'json'
    )
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

t_env.execute_sql(f"""
    INSERT INTO {OUTPUT_TABLE}
    SELECT monitorId, data
    FROM {INPUT_TABLE}
""")


This gives me the error :
: java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot generate StreamGraph.

I am aware this is lazily evaluated, so is there some equivalent SQL
statement for t_env.execute() that I should be calling?

Thanks,
Manas

Reply via email to