Hi Yik San, You need to set the rolling policy for filesystem. You could refer to the Rolling Policy section [1] for more details.
Actually there are output and you could execute command `ls -la /tmp/output/`, then you will see several files named “.part-xxx”. For your job, you need to set the `execution.checkpointing.interval` in the configuration and `sink.rolling-policy.rollover-interval` in the property of Filesystem connector. The job will look like the following: ``` from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s") table_env.execute_sql(""" CREATE TABLE datagen ( id INT, data STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ) """) table_env.execute_sql(""" CREATE TABLE print ( id INT, data STRING ) WITH ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/tmp/output', 'sink.rolling-policy.rollover-interval' = '10s' ) """) table_env.execute_sql(""" INSERT INTO print SELECT id, data FROM datagen """).wait() ``` [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#rolling-policy > 2021年4月21日 下午7:44,Yik San Chan <evan.chanyik...@gmail.com> 写道: > > The question is cross posted on Stack Overflow > https://stackoverflow.com/questions/67195207/flink-not-able-to-sink-a-stream-into-csv > > <https://stackoverflow.com/questions/67195207/flink-not-able-to-sink-a-stream-into-csv>. > > I am trying to sink a stream into filesystem in csv format using PyFlink, > however it does not work. > > ```python > # stream_to_csv.py > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > > table_env.execute_sql(""" > CREATE TABLE datagen ( > id INT, > data STRING > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '1' > ) > """) > > table_env.execute_sql(""" > CREATE TABLE print ( > id INT, > data STRING > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'path' = '/tmp/output' > ) > """) > > table_env.execute_sql(""" > INSERT INTO print > SELECT id, data > FROM datagen > """).wait() > ``` > > To run the script: > > ``` > $ python stream_to_csv.py > ``` > > I expect records go to /tmp/output folder, however that doesn't happen. > > ``` > $ ~ ls /tmp/output > (nothing shown here) > ``` > > Anything I miss? > > Best, > Yik San