The first time you ran it without having specified the parallelism, and so
you got the default parallelism -- which is greater than 1 (probably 4 or
8, depending on how many cores your computer has).

Flink is designed to be scalable, and to achieve that, parallel instances
of an operator, such as a sink, are decoupled from one another. Imagine,
for example, a large cluster with 100s or 1000s of nodes. For this to work
well, each instance needs to write to its own file.

The commas were changed to tabs because you specified .field_delimiter('\t')
.


Regards,

David

On Mon, Mar 15, 2021 at 9:49 AM Yik San Chan <evan.chanyik...@gmail.com>
wrote:

> The question is cross-posted on StackOverflow
> https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files
> .
>
> I want to use Flink to read from an input file, do some aggregation, and
> write the result to an output file. The job is in batch mode. See
> `wordcount.py` below:
>
> ```python
> from pyflink.table import EnvironmentSettings, BatchTableEnvironment
>
> #
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
>
> env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
> table_env = BatchTableEnvironment.create(environment_settings=env_settings)
>
> my_source_ddl = """
>     create table mySource (
>         word VARCHAR
>     ) with (
>         'connector' = 'filesystem',
>         'format' = 'csv',
>         'path' = '/tmp/input'
>     )
> """
>
> my_sink_ddl = """
>     create table mySink (
>         word VARCHAR,
>         `count` BIGINT
>     ) with (
>         'connector' = 'filesystem',
>         'format' = 'csv',
>         'path' = '/tmp/output'
>     )
> """
>
> transform_dml = """
> INSERT INTO mySink
> SELECT word, COUNT(1) FROM mySource GROUP BY word
> """
>
> table_env.execute_sql(my_source_ddl)
> table_env.execute_sql(my_sink_ddl)
> table_env.execute_sql(transform_dml).wait()
>
> # before run: echo -e  "flink\npyflink\nflink" > /tmp/input
> # after run: cat /tmp/output
> ```
>
> Before running `python wordcount.py`, I run `echo -e
>  "flink\npyflink\nflink" > /tmp/input` to make sure data exist in
> /tmp/input. However, after the run, there are two files in /tmp/output:
>
> ```
> > ls /tmp/output
> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> > cat
> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
> pyflink,1
> > cat
> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> flink,2
> ```
>
> While I expect a single file /tmp/output with content:
>
> ```
> pyflink,1
> flink,2
> ```
>
> Actually, I got the above python program by adjusting the below that
> produces the single file /tmp/output.
>
> ```python
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.expressions import lit
>
> #
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
>
> exec_env = ExecutionEnvironment.get_execution_environment()
> exec_env.set_parallelism(1)
> t_config = TableConfig()
> t_env = BatchTableEnvironment.create(exec_env, t_config)
>
> t_env.connect(FileSystem().path('/tmp/input')) \
>     .with_format(OldCsv()
>                  .field('word', DataTypes.STRING())) \
>     .with_schema(Schema()
>                  .field('word', DataTypes.STRING())) \
>     .create_temporary_table('mySource')
>
> t_env.connect(FileSystem().path('/tmp/output')) \
>     .with_format(OldCsv()
>                  .field_delimiter('\t')
>                  .field('word', DataTypes.STRING())
>                  .field('count', DataTypes.BIGINT())) \
>     .with_schema(Schema()
>                  .field('word', DataTypes.STRING())
>                  .field('count', DataTypes.BIGINT())) \
>     .create_temporary_table('mySink')
>
> tab = t_env.from_path('mySource')
> tab.group_by(tab.word) \
>    .select(tab.word, lit(1).count) \
>    .execute_insert('mySink').wait()
> ```
>
> Running this version will generate a /tmp/output. Note it doesn't come
> with comma delimiter.
>
> ```
> > cat /tmp/output
> flink 2
> pyflink 1
> ```
>
> Any idea why? Thanks!
>
> Best,
> Yik San Chan
>

Reply via email to