Huang Xingbo created FLINK-26462: ------------------------------------ Summary: Release Testing: Running Python UDF in different Execution Mode Key: FLINK-26462 URL: https://issues.apache.org/jira/browse/FLINK-26462 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.15.0 Reporter: Huang Xingbo Fix For: 1.15.0
h1. Setup Prepare a Python Virtual Environment {code:bash} $ cd flink-python/dev $ ./lint-python.sh -s basic $ source .conda/bin/activate {code} h1. Test # Write a python udf job named demo.py in process mode {code:python} from pyflink.table.table_environment import TableEnvironment from pyflink.table.environment_settings import EnvironmentSettings from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes, expressions as expr class SubtractOne(ScalarFunction): def eval(self, i): return i - 1 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j def main(): t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) # process mode ! t_env.get_config().get_configuration().set_string("python.execution-mode", "process") # optinal values t_env.get_config().get_configuration().set_string("parallelism.default", "2") add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c']) result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) print(result.to_pandas()) if __name__ == '__main__': main() {code} # run the python udf job and watch the result {code:bash} $ python demo.py _c0 c _c2 0 3 1 1 1 7 2 1 2 4 3 1 {code} # change the python udf job to multi-thread mode {code:python} from pyflink.table.table_environment import TableEnvironment from pyflink.table.environment_settings import EnvironmentSettings from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes, expressions as expr class SubtractOne(ScalarFunction): def eval(self, i): return i - 1 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j def main(): t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) # multi-thread mode t_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread") t_env.get_config().get_configuration().set_string("parallelism.default", "2") add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c']) result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) print(result.to_pandas()) if __name__ == '__main__': main() {code} # run the python udf job and watch the result {code:bash} $ python demo.py _c0 c _c2 0 3 1 1 1 7 2 1 2 4 3 1 {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)