Huang Xingbo created FLINK-31099: ------------------------------------ Summary: Chained WindowOperator throws NPE in PyFlink ThreadMode Key: FLINK-31099 URL: https://issues.apache.org/jira/browse/FLINK-31099 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.16.1, 1.17.0 Reporter: Huang Xingbo Assignee: Huang Xingbo Fix For: 1.17.0, 1.16.2
Test case {code:python} config = Configuration() config.set_string("python.execution-mode", "process") env = StreamExecutionEnvironment.get_execution_environment(config) class MyTimestampAssigner(TimestampAssigner, ABC): def extract_timestamp(self, value: tuple, record_timestamp: int) -> int: return value[0] ds = env.from_collection( [(1676461680000, "a1", "b1", 1), (1676461680000, "a1", "b1", 1), (1676461680000, "a2", "b2", 1), (1676461680000, "a1", "b2", 1), (1676461740000, "a1", "b1", 1), (1676461740000, "a2", "b2", 1)] ).assign_timestamps_and_watermarks( WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(MyTimestampAssigner()) ) ds.key_by( lambda x: (x[0], x[1], x[2]) ).window( TumblingEventTimeWindows.of(Time.minutes(1)) ).reduce( lambda x, y: (x[0], x[1], x[2], x[3] + y[3]), output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING(), Types.INT()]) # ).filter( # lambda x: x[1] == "a1" ).map( lambda x: (x[0], x[1], x[3]), output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.INT()]) ).print() env.execute() {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)