Hi, 我看你使用了System.currentTimeMillis(),有可能是分布式的情况下,多台TM上的机器时间不一致导致的吗?
--
Best!
Xuyang
在 2024-04-20 19:04:14,"hhq" <[email protected]> 写道:
>我使用了一个基于处理时间的滚动窗口,窗口大小设置为60s,但是我在窗口的处理函数中比较窗口的结束时间和系统时间,偶尔会发现获取到的系统时间早于窗口结束时间(这里的提前量不大,只有几毫秒,但是我不清楚,这是flink窗口本身的原因还是我代码的问题)我没有找到原因,请求帮助
>
>public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStreamSource<Integer> integerDataStreamSource = env.addSource(new
> IntegerSource());
>
> integerDataStreamSource
> .keyBy(Integer::intValue)
> .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
> .process(new IntegerProcessFunction())
> .setParallelism(1);
>
> env.execute();
>}
>
>
>public class IntegerProcessFunction extends ProcessWindowFunction<Integer,
>Object, Integer, TimeWindow> {
> private Logger log;
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> this.log = Logger.getLogger(IntegerProcessFunction.class);
> }
>
> @Override
> public void process(Integer integer, ProcessWindowFunction<Integer,
> Object, Integer, TimeWindow>.Context context, Iterable<Integer> elements,
> Collector<Object> out) throws Exception {
> long currentTimeMillis = System.currentTimeMillis();
> long end = context.window().getEnd();
>
> if (currentTimeMillis < end) {
> log <http://log.info/>.info <http://log.info/>("bad");
> } else {
> log <http://log.info/>.info <http://log.info/>("good");
> }
> }
>}
>