[ https://issues.apache.org/jira/browse/FLINK-23819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu updated FLINK-23819: ---------------------------- Description: This feature is to support tar.gz files as python archives. In the past, it only support zip files as python archives. This feature could be tested as following: 1) Build PyFlink packages from source according to documentation: [https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] 2) Preparing tar.gz file which contains the conda Python virtual environment - Install MiniConda in your environment: [https://conda.io/projects/conda/en/latest/user-guide/install/macos.html] - Install conda pack: [https://conda.github.io/conda-pack/] - Prepare the conda environment and install the built PyFlink in the above step into the conda virtual environment: {code} conda create --name myenv conda activate myenv conda install python=3.8 python -m pip install ~/code/src/apache/flink/flink-python/apache-flink-libraries/dist/apache-flink-libraries-1.14.dev0.tar.gz python -m pip install ~/code/src/apache/flink/flink-python/dist/apache_flink-1.14.dev0-cp38-cp38-macosx_10_9_x86_64.whl {code} - You could verify the packages installed in the conda env **myenv** as following: {code} conda list -n myenv {code} - Package the conda virtual environment into a tgz file: (it will generate a file named myenv.tar.gz) {code} conda pack -n myenv {code} 3) Prepare a PyFlink job, here is an example: {code:java} import time from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction from pyflink.table import StreamTableEnvironment, DataTypes, Schema def test_chaining(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) # 1. create source Table t_env.execute_sql(""" CREATE TABLE datagen ( id INT, data STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1000000', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '1000' ) """) # 2. create sink Table t_env.execute_sql(""" CREATE TABLE print ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) t_env.execute_sql(""" CREATE TABLE print_2 ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) # 3. query from source table and perform calculations # create a Table from a Table API query: source_table = t_env.from_path("datagen") ds = t_env.to_append_stream( source_table, Types.ROW([Types.INT(), Types.STRING()])) ds1 = ds.map(lambda i: (i[0] * i[0], i[1])) ds2 = ds.map(lambda i: (i[0], i[1][2:])) class MyCoMapFunction(CoMapFunction): def map1(self, value): return value def map2(self, value): return value ds3 = ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.STRING()])) ds4 = ds3.map(lambda i: (i[0], i[1], "left"), output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\ .map(lambda i: i, output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) schema = Schema.new_builder() \ .column("f0", DataTypes.BIGINT()) \ .column("f1", DataTypes.STRING()) \ .column("f2", DataTypes.STRING()) \ .build() result_table_3 = t_env.from_data_stream(ds4, schema) statement_set = t_env.create_statement_set() statement_set.add_insert("print", result_table_3) result_table_4 = t_env.from_data_stream(ds5, schema) statement_set.add_insert("print_2", result_table_4) statement_set.execute() if __name__ == "__main__": start_ts = time.time() test_chaining() end_ts = time.time() print("--- %s seconds ---" % (end_ts - start_ts)) {code} 4) Submit the PyFlink job using the generated myenv.tar.gz ./bin/flink run -d -m localhost:8081 -py test_pyflink.py -pyarch myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec myenv/bin/python 5) The job should runs normally and you should see logs as following in the log file of TaskManager: {code} 2021-08-26 11:14:19,295 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/private/var/folders/jq/brl84gld47ngmcfyvwh2gtj40000gp/T/python-dist-a61682a6-79b0-443c-b3c8-f9dade55e5d6/python-archives/myenv/lib/python3.8/site-packages/pyflink/bin/pyflink-udf-runner.sh' for worker id 1-1 {code} It demonstrated that the Python worker was started using the Python interpreter contained in the myenv.tar.gz. was: This feature is to support tar.gz files as python archives. In the past, it only support zip files as python archives. This feature could be tested as following: 1) Build PyFlink packages from source according to documentation: [https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] 2) Preparing tar.gz file which contains the conda Python virtual environment - Install MiniConda in your environment: [https://conda.io/projects/conda/en/latest/user-guide/install/macos.html] - Install conda pack: [https://conda.github.io/conda-pack/] - Prepare the conda environment and install the built PyFlink in the above step into the conda virtual environment: {code} conda create --name myenv conda activate myenv conda install python=3.8 python -m pip install ~/code/src/apache/flink/flink-python/apache-flink-libraries/dist/apache-flink-libraries-1.14.dev0.tar.gz python -m pip install ~/code/src/apache/flink/flink-python/dist/apache_flink-1.14.dev0-cp38-cp38-macosx_10_9_x86_64.whl {code} - You could verify the packages installed in the conda env **myenv** as following: {code} conda list -n myenv {code} - Package the conda virtual environment into a tgz file: (it will generate a file named myenv.tar.gz) {code} conda pack -n myenv {code} 3) Prepare a PyFlink job, here is an example: {code:java} import time from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction from pyflink.table import StreamTableEnvironment, DataTypes, Schema def test_chaining(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) # 1. create source Table t_env.execute_sql(""" CREATE TABLE datagen ( id INT, data STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1000000', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '1000' ) """) # 2. create sink Table t_env.execute_sql(""" CREATE TABLE print ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) t_env.execute_sql(""" CREATE TABLE print_2 ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) # 3. query from source table and perform calculations # create a Table from a Table API query: source_table = t_env.from_path("datagen") ds = t_env.to_append_stream( source_table, Types.ROW([Types.INT(), Types.STRING()])) ds1 = ds.map(lambda i: (i[0] * i[0], i[1])) ds2 = ds.map(lambda i: (i[0], i[1][2:])) class MyCoMapFunction(CoMapFunction): def map1(self, value): return value def map2(self, value): return value ds3 = ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.STRING()])) ds4 = ds3.map(lambda i: (i[0], i[1], "left"), output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\ .map(lambda i: i, output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) schema = Schema.new_builder() \ .column("f0", DataTypes.BIGINT()) \ .column("f1", DataTypes.STRING()) \ .column("f2", DataTypes.STRING()) \ .build() result_table_3 = t_env.from_data_stream(ds4, schema) statement_set = t_env.create_statement_set() statement_set.add_insert("print", result_table_3) result_table_4 = t_env.from_data_stream(ds5, schema) statement_set.add_insert("print_2", result_table_4) statement_set.execute() if __name__ == "__main__": start_ts = time.time() test_chaining() end_ts = time.time() print("--- %s seconds ---" % (end_ts - start_ts)) {code} 4) Submit the PyFlink job using the generated myenv.tar.gz ./bin/flink run -d -m localhost:8082 -py test_pyflink.py -pyarch myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec myenv/bin/python 5) The job should runs normally and you should see logs as following in the log file of TaskManager: {code} 2021-08-26 11:14:19,295 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/private/var/folders/jq/brl84gld47ngmcfyvwh2gtj40000gp/T/python-dist-a61682a6-79b0-443c-b3c8-f9dade55e5d6/python-archives/myenv/lib/python3.8/site-packages/pyflink/bin/pyflink-udf-runner.sh' for worker id 1-1 {code} It demonstrated that the Python worker was started using the Python interpreter contained in the myenv.tar.gz. > Testing tgz file for python archives > ------------------------------------ > > Key: FLINK-23819 > URL: https://issues.apache.org/jira/browse/FLINK-23819 > Project: Flink > Issue Type: Improvement > Components: API / Python > Reporter: Dian Fu > Assignee: Leonard Xu > Priority: Blocker > Labels: release-testing > Fix For: 1.14.0 > > > This feature is to support tar.gz files as python archives. In the past, it > only support zip files as python archives. > This feature could be tested as following: > 1) Build PyFlink packages from source according to documentation: > [https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > 2) Preparing tar.gz file which contains the conda Python virtual environment > - Install MiniConda in your environment: > [https://conda.io/projects/conda/en/latest/user-guide/install/macos.html] > - Install conda pack: [https://conda.github.io/conda-pack/] > - Prepare the conda environment and install the built PyFlink in the above > step into the conda virtual environment: > {code} > conda create --name myenv > conda activate myenv > conda install python=3.8 > python -m pip install > ~/code/src/apache/flink/flink-python/apache-flink-libraries/dist/apache-flink-libraries-1.14.dev0.tar.gz > python -m pip install > ~/code/src/apache/flink/flink-python/dist/apache_flink-1.14.dev0-cp38-cp38-macosx_10_9_x86_64.whl > {code} > - You could verify the packages installed in the conda env **myenv** as > following: > {code} > conda list -n myenv > {code} > - Package the conda virtual environment into a tgz file: (it will generate a > file named myenv.tar.gz) > {code} > conda pack -n myenv > {code} > 3) Prepare a PyFlink job, here is an example: > {code:java} > import time > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction > from pyflink.table import StreamTableEnvironment, DataTypes, Schema > def test_chaining(): > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(stream_execution_environment=env) > # 1. create source Table > t_env.execute_sql(""" > CREATE TABLE datagen ( > id INT, > data STRING > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '1000000', > 'fields.id.kind' = 'sequence', > 'fields.id.start' = '1', > 'fields.id.end' = '1000' > ) > """) > # 2. create sink Table > t_env.execute_sql(""" > CREATE TABLE print ( > id BIGINT, > data STRING, > flag STRING > ) WITH ( > 'connector' = 'blackhole' > ) > """) > t_env.execute_sql(""" > CREATE TABLE print_2 ( > id BIGINT, > data STRING, > flag STRING > ) WITH ( > 'connector' = 'blackhole' > ) > """) > # 3. query from source table and perform calculations > # create a Table from a Table API query: > source_table = t_env.from_path("datagen") > ds = t_env.to_append_stream( > source_table, > Types.ROW([Types.INT(), Types.STRING()])) > ds1 = ds.map(lambda i: (i[0] * i[0], i[1])) > ds2 = ds.map(lambda i: (i[0], i[1][2:])) > class MyCoMapFunction(CoMapFunction): > def map1(self, value): > return value > def map2(self, value): > return value > ds3 = ds1.connect(ds2).map(MyCoMapFunction(), > output_type=Types.TUPLE([Types.LONG(), Types.STRING()])) > ds4 = ds3.map(lambda i: (i[0], i[1], "left"), > output_type=Types.TUPLE([Types.LONG(), Types.STRING(), > Types.STRING()])) > ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\ > .map(lambda i: i, > output_type=Types.TUPLE([Types.LONG(), Types.STRING(), > Types.STRING()])) > schema = Schema.new_builder() \ > .column("f0", DataTypes.BIGINT()) \ > .column("f1", DataTypes.STRING()) \ > .column("f2", DataTypes.STRING()) \ > .build() > result_table_3 = t_env.from_data_stream(ds4, schema) > statement_set = t_env.create_statement_set() > statement_set.add_insert("print", result_table_3) > result_table_4 = t_env.from_data_stream(ds5, schema) > statement_set.add_insert("print_2", result_table_4) > statement_set.execute() > if __name__ == "__main__": > start_ts = time.time() > test_chaining() > end_ts = time.time() > print("--- %s seconds ---" % (end_ts - start_ts)) > {code} > 4) Submit the PyFlink job using the generated myenv.tar.gz > ./bin/flink run -d -m localhost:8081 -py test_pyflink.py -pyarch > myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec myenv/bin/python > 5) The job should runs normally and you should see logs as following in the > log file of TaskManager: > {code} > 2021-08-26 11:14:19,295 INFO > org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] > - Still waiting for startup of environment > '/private/var/folders/jq/brl84gld47ngmcfyvwh2gtj40000gp/T/python-dist-a61682a6-79b0-443c-b3c8-f9dade55e5d6/python-archives/myenv/lib/python3.8/site-packages/pyflink/bin/pyflink-udf-runner.sh' > for worker id 1-1 > {code} > It demonstrated that the Python worker was started using the Python > interpreter contained in the myenv.tar.gz. -- This message was sent by Atlassian Jira (v8.3.4#803005)