Huang Xingbo created FLINK-28918: ------------------------------------ Summary: Release Testing: Running Python DataStream jobs in Thread Mode Key: FLINK-28918 URL: https://issues.apache.org/jira/browse/FLINK-28918 Project: Flink Issue Type: Sub-task Components: API / Python Affects Versions: 1.16.0 Reporter: Huang Xingbo Fix For: 1.16.0
* Build flink source code and compile source code {code:bash} $ cd {flink-source-code} $ mvn clean install -DskipTests {code} * Prepare a Python Virtual Environment {code:bash} $ cd flink-python/dev $ ./lint-python.sh -s basic $ source .conda/bin/activate {code} * Install PyFlink from source code. For more details, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] {code:bash} $ cd flink-python/apache-flink-libraries $ python setup.py sdist $ pip install dist/*.tar.gz $ cd .. $ pip install -r dev/dev-requirements.txt $ python setpy.py sdist $ pip install dist/*.tar.gz {code} h1. Test * Write a python datastream job in thread mode {code:python} from pyflink.common import Configuration from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment def main(): config = Configuration() config.set_integer("python.execution-mode", "thread") env = StreamExecutionEnvironment.get_execution_environment(config) ds = env.from_collection( [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)], type_info=Types.ROW_NAMED(["v1", "v2", "v3"], [Types.INT(), Types.STRING(), Types.INT()])) def flat_map_func1(data): for i in data: yield int(i), 1 def flat_map_func2(data): for i in data: yield i ds = ds.key_by(lambda x: x[0]) \ .min_by("v2") \ .map(lambda x: (x[0], x[1], x[2]), output_type=Types.TUPLE([Types.INT(), Types.STRING(), Types.INT()])) \ .key_by(lambda x: x[2]) \ .max_by(0) \ .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), Types.INT()])) \ .key_by(lambda x: [1]) \ .min_by() \ .flat_map(flat_map_func2, output_type=Types.INT()) \ .key_by(lambda x: x) \ .max_by() ds.print() self.env.execute("key_by_min_by_max_by_test_batch") if __name__ == '__main__': main() {code} * run the python datastream job and watch the result {code:bash} $ python demo.py {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)