[ https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580054#comment-17580054 ]
Luning Wang commented on FLINK-28918: ------------------------------------- This test can successfully run on an M1 chip. [~hxbks2ks] > Release Testing: Verify FLIP-206 in Python DataStream API > --------------------------------------------------------- > > Key: FLINK-28918 > URL: https://issues.apache.org/jira/browse/FLINK-28918 > Project: Flink > Issue Type: Sub-task > Components: API / Python > Affects Versions: 1.16.0 > Reporter: Huang Xingbo > Assignee: Luning Wang > Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream job in thread mode > {code:python} > from pyflink.common import Configuration > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > def main(): > config = Configuration() > config.set_string("python.execution-mode", "thread") > env = StreamExecutionEnvironment.get_execution_environment(config) > ds = env.from_collection( > [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)], > type_info=Types.ROW_NAMED(["v1", "v2", "v3"], > [Types.INT(), Types.STRING(), Types.INT()])) > def flat_map_func1(data): > for i in data: > yield int(i), 1 > def flat_map_func2(data): > for i in data: > yield i > ds = ds.key_by(lambda x: x[0]) \ > .min_by("v2") \ > .map(lambda x: (x[0], x[1], x[2]), > output_type=Types.TUPLE([Types.INT(), Types.STRING(), > Types.INT()])) \ > .key_by(lambda x: x[2]) \ > .max_by(0) \ > .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), > Types.INT()])) \ > .key_by(lambda x: [1]) \ > .min_by() \ > .flat_map(flat_map_func2, output_type=Types.INT()) \ > .key_by(lambda x: x) \ > .max_by() > ds.print() > env.execute("key_by_min_by_max_by_test_batch") > if __name__ == '__main__': > main() > {code} > * run the python datastream job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)