haishui created FLINK-36751:
-------------------------------

             Summary: PausableRelativeClock does not pause when the source only 
has one split
                 Key: FLINK-36751
                 URL: https://issues.apache.org/jira/browse/FLINK-36751
             Project: Flink
          Issue Type: Sub-task
          Components: API / DataStream
            Reporter: haishui


Reason:

PausableRelativeClock#pause is called at pauseOrResumeSplits in 
ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when 
the sourceOperator has more than one splits.
 
My example code tested on Flink 1.20-SNAPSHOT is as follows:

{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<Long> dataGen1 = new DataGeneratorSource<>(v -> v, 
Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG);
DataGeneratorSource<Long> dataGen2 = new DataGeneratorSource<>(v -> v + 500, 
Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG);
WatermarkStrategy<Long> watermarkStrategy = WatermarkStrategy
        .<Long>forMonotonousTimestamps()
        .withTimestampAssigner((aLong, l) -> aLong)
        .withWatermarkAlignment("default", Duration.ofMillis(5), 
Duration.ofSeconds(5))
        .withIdleness(Duration.ofSeconds(5));
DataStreamSource<Long> s1 = env.fromSource(dataGen1, watermarkStrategy, "S1");
DataStream<Long> s2 = env.fromSource(dataGen2, watermarkStrategy, "S2");
s1.print("S1");
s2.print("S2");
s1.keyBy(v -> 0)
        .connect(s2.keyBy(v -> 0))
        .process(new CoProcessFunction<Long, Long, Void>() {
            @Override
            public void processElement1(Long aLong, CoProcessFunction<Long, 
Long, Void>.Context context, Collector<Void> collector) throws Exception {
                if (context.timestamp() < 
context.timerService().currentWatermark()) {
                    throw new IllegalStateException("left stream element is 
late: " + aLong);
                }
            }            @Override
            public void processElement2(Long aLong, CoProcessFunction<Long, 
Long, Void>.Context context, Collector<Void> collector) throws Exception {
                if (context.timestamp() < 
context.timerService().currentWatermark()) {
                    throw new IllegalStateException("right stream element is 
late: " + aLong);
                }
            }
        });
env.execute();{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to