[ 
https://issues.apache.org/jira/browse/FLINK-36751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899708#comment-17899708
 ] 

haishui commented on FLINK-36751:
---------------------------------

Hi [~pnowojski] , could you help investigate this?

> PausableRelativeClock does not pause when the source only has one split
> -----------------------------------------------------------------------
>
>                 Key: FLINK-36751
>                 URL: https://issues.apache.org/jira/browse/FLINK-36751
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream
>            Reporter: haishui
>            Priority: Major
>
> Reason:
> PausableRelativeClock#pause is called at pauseOrResumeSplits in 
> ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when 
> the sourceOperator has more than one splits.
>  
> My example code tested on Flink 1.20-SNAPSHOT is as follows:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataGeneratorSource<Long> dataGen1 = new DataGeneratorSource<>(v -> v, 
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG);
> DataGeneratorSource<Long> dataGen2 = new DataGeneratorSource<>(v -> v + 500, 
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG);
> WatermarkStrategy<Long> watermarkStrategy = WatermarkStrategy
>         .<Long>forMonotonousTimestamps()
>         .withTimestampAssigner((aLong, l) -> aLong)
>         .withWatermarkAlignment("default", Duration.ofMillis(5), 
> Duration.ofSeconds(5))
>         .withIdleness(Duration.ofSeconds(5));
> DataStreamSource<Long> s1 = env.fromSource(dataGen1, watermarkStrategy, "S1");
> DataStream<Long> s2 = env.fromSource(dataGen2, watermarkStrategy, "S2");
> s1.print("S1");
> s2.print("S2");
> s1.keyBy(v -> 0)
>         .connect(s2.keyBy(v -> 0))
>         .process(new CoProcessFunction<Long, Long, Void>() {
>             @Override
>             public void processElement1(Long aLong, CoProcessFunction<Long, 
> Long, Void>.Context context, Collector<Void> collector) throws Exception {
>                 if (context.timestamp() < 
> context.timerService().currentWatermark()) {
>                     throw new IllegalStateException("left stream element is 
> late: " + aLong);
>                 }
>             }            @Override
>             public void processElement2(Long aLong, CoProcessFunction<Long, 
> Long, Void>.Context context, Collector<Void> collector) throws Exception {
>                 if (context.timestamp() < 
> context.timerService().currentWatermark()) {
>                     throw new IllegalStateException("right stream element is 
> late: " + aLong);
>                 }
>             }
>         });
> env.execute();{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to