[ https://issues.apache.org/jira/browse/FLINK-36751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899708#comment-17899708 ]
haishui commented on FLINK-36751: --------------------------------- Hi [~pnowojski] , could you help investigate this? > PausableRelativeClock does not pause when the source only has one split > ----------------------------------------------------------------------- > > Key: FLINK-36751 > URL: https://issues.apache.org/jira/browse/FLINK-36751 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream > Reporter: haishui > Priority: Major > > Reason: > PausableRelativeClock#pause is called at pauseOrResumeSplits in > ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when > the sourceOperator has more than one splits. > > My example code tested on Flink 1.20-SNAPSHOT is as follows: > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > DataGeneratorSource<Long> dataGen1 = new DataGeneratorSource<>(v -> v, > Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG); > DataGeneratorSource<Long> dataGen2 = new DataGeneratorSource<>(v -> v + 500, > Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG); > WatermarkStrategy<Long> watermarkStrategy = WatermarkStrategy > .<Long>forMonotonousTimestamps() > .withTimestampAssigner((aLong, l) -> aLong) > .withWatermarkAlignment("default", Duration.ofMillis(5), > Duration.ofSeconds(5)) > .withIdleness(Duration.ofSeconds(5)); > DataStreamSource<Long> s1 = env.fromSource(dataGen1, watermarkStrategy, "S1"); > DataStream<Long> s2 = env.fromSource(dataGen2, watermarkStrategy, "S2"); > s1.print("S1"); > s2.print("S2"); > s1.keyBy(v -> 0) > .connect(s2.keyBy(v -> 0)) > .process(new CoProcessFunction<Long, Long, Void>() { > @Override > public void processElement1(Long aLong, CoProcessFunction<Long, > Long, Void>.Context context, Collector<Void> collector) throws Exception { > if (context.timestamp() < > context.timerService().currentWatermark()) { > throw new IllegalStateException("left stream element is > late: " + aLong); > } > } @Override > public void processElement2(Long aLong, CoProcessFunction<Long, > Long, Void>.Context context, Collector<Void> collector) throws Exception { > if (context.timestamp() < > context.timerService().currentWatermark()) { > throw new IllegalStateException("right stream element is > late: " + aLong); > } > } > }); > env.execute();{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)