Huang Xingbo created FLINK-28920: ------------------------------------ Summary: Release Testing: Running Python DataStream Window Job Key: FLINK-28920 URL: https://issues.apache.org/jira/browse/FLINK-28920 Project: Flink Issue Type: Sub-task Components: API / Python Affects Versions: 1.16.0 Reporter: Huang Xingbo Fix For: 1.16.0
* Build flink source code and compile source code {code:bash} $ cd {flink-source-code} $ mvn clean install -DskipTests {code} * Prepare a Python Virtual Environment {code:bash} $ cd flink-python/dev $ ./lint-python.sh -s basic $ source .conda/bin/activate {code} * Install PyFlink from source code. For more details, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] {code:bash} $ cd flink-python/apache-flink-libraries $ python setup.py sdist $ pip install dist/*.tar.gz $ cd .. $ pip install -r dev/dev-requirements.txt $ python setpy.py sdist $ pip install dist/*.tar.gz {code} h1. Test * Write a python datastream window job in thread mode. For details of Window, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/]. {code:python} from typing import Iterable, Tuple, Dict from pyflink.common import Configuration from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import AggregateFunction from pyflink.datastream.window import EventTimeSessionWindows def main(): config = Configuration() # thread mode config.set_string("python.execution-mode", "thread") # process mode # config.set_string("python.execution-mode", "process") env = StreamExecutionEnvironment.get_execution_environment(config) data_stream = env.from_collection([ ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)], type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ .with_timestamp_assigner(SecondColumnTimestampAssigner()) class MyAggregateFunction(AggregateFunction): def create_accumulator(self) -> Tuple[int, str]: return 0, '' def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) -> Tuple[int, str]: return value[1] + accumulator[0], value[0] def get_result(self, accumulator: Tuple[str, int]): return accumulator[1], accumulator[0] def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]): return acc_a[0] + acc_b[0], acc_a[1] ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ .key_by(lambda x: x[0], key_type=Types.STRING()) \ .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ .aggregate(MyAggregateFunction(), accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) ds.print() env.execute('test_window_aggregate_accumulator_type') if __name__ == '__main__': main() {code} * run the python datastream window job and watch the result {code:bash} $ python demo.py {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)