Weiqing Yang created FLINK-38477:
------------------------------------

             Summary: Add FINISHED watermark status to support proper watermark 
aggregation
                 Key: FLINK-38477
                 URL: https://issues.apache.org/jira/browse/FLINK-38477
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Network
    Affects Versions: 1.20.3, 2.1.0, 1.16.3
            Reporter: Weiqing Yang


*Environment / Preconditions*
 * Source: Kafka connector with dynamic partition discovery disabled (no split 
rescan after startup).

 * Input topic(s): total partition count = {*}N{*}.

 * Job parallelism = {*}P{*}, where *P > N* (some source subtasks start with no 
assigned partitions).

 * Idle detection configured (e.g., {{{}table.exec.source.idle-timeout = 
10s{}}}).

 * Downstream uses event-time semantics (e.g., {{{}IntervalJoin{}}}, 
time-bounded aggregations, windows).

*Problem Summary*
 * When *P > N* and Kafka partition discovery is disabled, some source subtasks 
start with no splits and {*}finish immediately{*}. Finished subtasks emit a 
watermark of {{Long.MAX_VALUE}} but are *not excluded* from watermark 
aggregation. If remaining active subtasks later go *IDLE* (e.g., during 
lulls/backpressure and idle-timeout expiry), the only “non-idle” watermark seen 
downstream becomes {{{}Long.MAX_VALUE{}}}, which advances operator watermarks 
to infinity. That causes {*}all subsequent records to be treated as late{*}, 
timers to clean up state, and effectively *no output* from time-aware operators.

*Detailed Behavior*
 * Event-time progress for an operator is the *minimum* watermark across its 
*non-idle* inputs.

 * Finished subtasks today:

 ** set watermark to {{{}Long.MAX_VALUE{}}},

 ** but are *not* marked idle / excluded from aggregation.

 * Failure condition:

 ## Some subtasks finish at startup (no splits).

 ## Later, active subtasks go *IDLE* due to 
{{{}table.exec.source.idle-timeout{}}}.

 ## Aggregation sees only the finished subtasks as “non-idle” → min watermark = 
{{{}Long.MAX_VALUE{}}}.

 ## Downstream (e.g., {{{}IntervalJoin{}}}) advances its operator watermark to 
{{{}Long.MAX_VALUE{}}}.

 ## All incoming records are “late,” time-bounded join/aggregate can’t match, 
cleanup timers fire → {*}zero output{*}.

*Why it reproduces with P > N*
 * With {*}P > N{*}, at least *P − N* source subtasks receive no splits and 
finish immediately.

 * With {*}P == N{*}, each subtask has a split, so the “finished watermark 
dominates” condition does not arise at startup.

*Steps to Reproduce (minimal)*
 # Create a Kafka topic with *N* partitions. Disable connector’s dynamic 
partition discovery (and note: no code change from FLINK-24347 is present).

 # Launch a Flink job with *P > N* source parallelism (e.g., a simple pipeline: 
KafkaSource → {{IntervalJoin}} or time-windowed op → sink).

 # Configure idle detection (e.g., {{{}table.exec.source.idle-timeout = 
10s{}}}).

 # Start with some traffic, then pause/slow it enough that active subtasks trip 
idle timeout.

 # Observe downstream operator watermark jump to {{{}Long.MAX_VALUE{}}}, 
records subsequently dropped as late, no output emitted.

*Expected Result*
 * Finished inputs should be *excluded* from aggregated watermark progression 
(behave like “non-contributing” channels) until *all* inputs are finished.

*Actual Result*
 * Finished inputs remain “non-idle” while holding watermark at 
{{{}Long.MAX_VALUE{}}}, which can dominate aggregation and prematurely advance 
downstream watermarks to infinity, halting event-time processing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to