Joekwal created FLINK-32040:
-------------------------------

             Summary: The WatermarkStrategy defined with the 
Function(with_idleness) report an error
                 Key: FLINK-32040
                 URL: https://issues.apache.org/jira/browse/FLINK-32040
             Project: Flink
          Issue Type: Bug
          Components: API / Python
            Reporter: Joekwal


version: upgrade pyflink1.15.2 to pyflink1.16.1

Report an error:

Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time 
characteristic set to 'ProcessingTime', or did you forget to call 
'data_stream.assign_timestamps_and_watermarks(...)'?

The application before with version 1.15.2 has never reported the error.

Example1 report an error:

 
{code:java}
```python```

class MyTimestampAssigner(TimestampAssigner):    
   def extract_timestamp(self, value, record_timestamp: int) -> int:        
       return value['version']

sql="""
select columns,version(milliseconds) from kafka_source
"""
table = st_env.sql_query(sql)
stream = st_env.to_changelog_stream(table)
stream = stream.assign_timestamps_and_watermarks(            
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1))          
  
.with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10)))

stream = stream.key_by(CommonKeySelector()) \
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \
    .process(WindowFunction(), typeInfo){code}
 

Try to debug to trace  
pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks and 
find

 
watermark_strategy._timestamp_assigner is none.
Solution:
Remove function–with_idleness(Duration.of_seconds(10))
{code:java}
stream = stream.assign_timestamps_and_watermarks(
    WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1))
    .with_timestamp_assigner(MyTimestampAssigner())) {code}
Is this a bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to