Weiqing Yang created FLINK-38477:
------------------------------------
Summary: Add FINISHED watermark status to support proper watermark
aggregation
Key: FLINK-38477
URL: https://issues.apache.org/jira/browse/FLINK-38477
Project: Flink
Issue Type: Bug
Components: Runtime / Network
Affects Versions: 1.20.3, 2.1.0, 1.16.3
Reporter: Weiqing Yang
*Environment / Preconditions*
* Source: Kafka connector with dynamic partition discovery disabled (no split
rescan after startup).
* Input topic(s): total partition count = {*}N{*}.
* Job parallelism = {*}P{*}, where *P > N* (some source subtasks start with no
assigned partitions).
* Idle detection configured (e.g., {{{}table.exec.source.idle-timeout =
10s{}}}).
* Downstream uses event-time semantics (e.g., {{{}IntervalJoin{}}},
time-bounded aggregations, windows).
*Problem Summary*
* When *P > N* and Kafka partition discovery is disabled, some source subtasks
start with no splits and {*}finish immediately{*}. Finished subtasks emit a
watermark of {{Long.MAX_VALUE}} but are *not excluded* from watermark
aggregation. If remaining active subtasks later go *IDLE* (e.g., during
lulls/backpressure and idle-timeout expiry), the only “non-idle” watermark seen
downstream becomes {{{}Long.MAX_VALUE{}}}, which advances operator watermarks
to infinity. That causes {*}all subsequent records to be treated as late{*},
timers to clean up state, and effectively *no output* from time-aware operators.
*Detailed Behavior*
* Event-time progress for an operator is the *minimum* watermark across its
*non-idle* inputs.
* Finished subtasks today:
** set watermark to {{{}Long.MAX_VALUE{}}},
** but are *not* marked idle / excluded from aggregation.
* Failure condition:
## Some subtasks finish at startup (no splits).
## Later, active subtasks go *IDLE* due to
{{{}table.exec.source.idle-timeout{}}}.
## Aggregation sees only the finished subtasks as “non-idle” → min watermark =
{{{}Long.MAX_VALUE{}}}.
## Downstream (e.g., {{{}IntervalJoin{}}}) advances its operator watermark to
{{{}Long.MAX_VALUE{}}}.
## All incoming records are “late,” time-bounded join/aggregate can’t match,
cleanup timers fire → {*}zero output{*}.
*Why it reproduces with P > N*
* With {*}P > N{*}, at least *P − N* source subtasks receive no splits and
finish immediately.
* With {*}P == N{*}, each subtask has a split, so the “finished watermark
dominates” condition does not arise at startup.
*Steps to Reproduce (minimal)*
# Create a Kafka topic with *N* partitions. Disable connector’s dynamic
partition discovery (and note: no code change from FLINK-24347 is present).
# Launch a Flink job with *P > N* source parallelism (e.g., a simple pipeline:
KafkaSource → {{IntervalJoin}} or time-windowed op → sink).
# Configure idle detection (e.g., {{{}table.exec.source.idle-timeout =
10s{}}}).
# Start with some traffic, then pause/slow it enough that active subtasks trip
idle timeout.
# Observe downstream operator watermark jump to {{{}Long.MAX_VALUE{}}},
records subsequently dropped as late, no output emitted.
*Expected Result*
* Finished inputs should be *excluded* from aggregated watermark progression
(behave like “non-contributing” channels) until *all* inputs are finished.
*Actual Result*
* Finished inputs remain “non-idle” while holding watermark at
{{{}Long.MAX_VALUE{}}}, which can dominate aggregation and prematurely advance
downstream watermarks to infinity, halting event-time processing.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)