haishui created FLINK-36751: ------------------------------- Summary: PausableRelativeClock does not pause when the source only has one split Key: FLINK-36751 URL: https://issues.apache.org/jira/browse/FLINK-36751 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: haishui
Reason: PausableRelativeClock#pause is called at pauseOrResumeSplits in ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when the sourceOperator has more than one splits. My example code tested on Flink 1.20-SNAPSHOT is as follows: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataGeneratorSource<Long> dataGen1 = new DataGeneratorSource<>(v -> v, Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG); DataGeneratorSource<Long> dataGen2 = new DataGeneratorSource<>(v -> v + 500, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG); WatermarkStrategy<Long> watermarkStrategy = WatermarkStrategy .<Long>forMonotonousTimestamps() .withTimestampAssigner((aLong, l) -> aLong) .withWatermarkAlignment("default", Duration.ofMillis(5), Duration.ofSeconds(5)) .withIdleness(Duration.ofSeconds(5)); DataStreamSource<Long> s1 = env.fromSource(dataGen1, watermarkStrategy, "S1"); DataStream<Long> s2 = env.fromSource(dataGen2, watermarkStrategy, "S2"); s1.print("S1"); s2.print("S2"); s1.keyBy(v -> 0) .connect(s2.keyBy(v -> 0)) .process(new CoProcessFunction<Long, Long, Void>() { @Override public void processElement1(Long aLong, CoProcessFunction<Long, Long, Void>.Context context, Collector<Void> collector) throws Exception { if (context.timestamp() < context.timerService().currentWatermark()) { throw new IllegalStateException("left stream element is late: " + aLong); } } @Override public void processElement2(Long aLong, CoProcessFunction<Long, Long, Void>.Context context, Collector<Void> collector) throws Exception { if (context.timestamp() < context.timerService().currentWatermark()) { throw new IllegalStateException("right stream element is late: " + aLong); } } }); env.execute();{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)