Joekwal created FLINK-32040: ------------------------------- Summary: The WatermarkStrategy defined with the Function(with_idleness) report an error Key: FLINK-32040 URL: https://issues.apache.org/jira/browse/FLINK-32040 Project: Flink Issue Type: Bug Components: API / Python Reporter: Joekwal
version: upgrade pyflink1.15.2 to pyflink1.16.1 Report an error: Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'data_stream.assign_timestamps_and_watermarks(...)'? The application before with version 1.15.2 has never reported the error. Example1 report an error: {code:java} ```python``` class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp: int) -> int: return value['version'] sql=""" select columns,version(milliseconds) from kafka_source """ table = st_env.sql_query(sql) stream = st_env.to_changelog_stream(table) stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10))) stream = stream.key_by(CommonKeySelector()) \ .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \ .process(WindowFunction(), typeInfo){code} Try to debug to trace pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks and find watermark_strategy._timestamp_assigner is none. Solution: Remove function–with_idleness(Duration.of_seconds(10)) {code:java} stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner())) {code} Is this a bug? -- This message was sent by Atlassian Jira (v8.20.10#820010)