hi,
这个问题是因为经过窗口算子后StreamRecord中指定的时间时间戳被改成了window.maxTimestamp(),可以查看[1]中WindowOperator或EvictingWindowOperator中的emitWindowContents方法。 如果想要更改时间戳,可以实现一个ProcessFuncton TimestampedCollector<T> collector = (TimestampedCollector<T>) out; collector.setAbsoluteTimestamp( <value.getTimestampField()> ); collector.collect(value); 如果可以接受kafka内数据使用插入时间,则可以设置topic的log.message.timestamp.type=LogAppendTime [1] https://github.com/apache/flink/tree/edac2adb9523adcb69e1dacc5fd4ea8f63480175/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing 在 2023-02-27 18:03:38,"aiden" <18765295...@163.com> 写道: > >hi,我在使用1.16.0版本时遇到kafka sink 时间戳异常大的情况,以下分别为正常和异常数据 >正常: >{ > "partition": 0, > "offset": 16, > "msg": "xxxxx", > "timespan": 1677487065330, > "date": "2023-02-27 16:37:45" > } >异常: > { > "partition": 0, > "offset": 17, > "msg": "xxxxxx", > "timespan": 9223372036854776000, > "date": "292278994-08-17 15:12:55" > } >最终发现是由于使用了countWindow算子导致的,推测是由于这个算子窗口为GlobalWindow导致的,有什么方式可以避免这个异常吗?或者可以在序列化kafka > sink时手动指定时间戳吗?