The question is cross-posted on StackOverflow https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files .
I want to use Flink to read from an input file, do some aggregation, and write the result to an output file. The job is in batch mode. See `wordcount.py` below: ```python from pyflink.table import EnvironmentSettings, BatchTableEnvironment # https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html env_settings = EnvironmentSettings.new_instance().in_batch_mode().build() table_env = BatchTableEnvironment.create(environment_settings=env_settings) my_source_ddl = """ create table mySource ( word VARCHAR ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/tmp/input' ) """ my_sink_ddl = """ create table mySink ( word VARCHAR, `count` BIGINT ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/tmp/output' ) """ transform_dml = """ INSERT INTO mySink SELECT word, COUNT(1) FROM mySource GROUP BY word """ table_env.execute_sql(my_source_ddl) table_env.execute_sql(my_sink_ddl) table_env.execute_sql(transform_dml).wait() # before run: echo -e "flink\npyflink\nflink" > /tmp/input # after run: cat /tmp/output ``` Before running `python wordcount.py`, I run `echo -e "flink\npyflink\nflink" > /tmp/input` to make sure data exist in /tmp/input. However, after the run, there are two files in /tmp/output: ``` > ls /tmp/output part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0 > cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 pyflink,1 > cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0 flink,2 ``` While I expect a single file /tmp/output with content: ``` pyflink,1 flink,2 ``` Actually, I got the above python program by adjusting the below that produces the single file /tmp/output. ```python from pyflink.dataset import ExecutionEnvironment from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.expressions import lit # https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html exec_env = ExecutionEnvironment.get_execution_environment() exec_env.set_parallelism(1) t_config = TableConfig() t_env = BatchTableEnvironment.create(exec_env, t_config) t_env.connect(FileSystem().path('/tmp/input')) \ .with_format(OldCsv() .field('word', DataTypes.STRING())) \ .with_schema(Schema() .field('word', DataTypes.STRING())) \ .create_temporary_table('mySource') t_env.connect(FileSystem().path('/tmp/output')) \ .with_format(OldCsv() .field_delimiter('\t') .field('word', DataTypes.STRING()) .field('count', DataTypes.BIGINT())) \ .with_schema(Schema() .field('word', DataTypes.STRING()) .field('count', DataTypes.BIGINT())) \ .create_temporary_table('mySink') tab = t_env.from_path('mySource') tab.group_by(tab.word) \ .select(tab.word, lit(1).count) \ .execute_insert('mySink').wait() ``` Running this version will generate a /tmp/output. Note it doesn't come with comma delimiter. ``` > cat /tmp/output flink 2 pyflink 1 ``` Any idea why? Thanks! Best, Yik San Chan