Arvid Heise created FLINK-21936:
-----------------------------------
Summary: Disable checkpointing of inflight data in pointwise
connections for unaligned checkpoints
Key: FLINK-21936
URL: https://issues.apache.org/jira/browse/FLINK-21936
Project: Flink
Issue Type: Improvement
Components: Runtime / Checkpointing
Affects Versions: 1.13.0
Reporter: Arvid Heise
Assignee: Arvid Heise
We currently do not have any hard guarantees on pointwise connection regarding
data consistency. However, since data was structured implicitly in the same way
as any preceding source or keyby, some users relied on this behavior to divide
compute-intensive tasks into smaller chunks while relying on ordering
guarantees.
As long as the parallelism does not change, unaligned checkpoints (UC) retains
these properties. With the implementation of rescaling of UC (FLINK-19801),
that has changed. For most exchanges, there is a meaningful way to reassign
state from one channel to another (even in random order). For some exchanges,
the mapping is ambiguous and requires post-filtering. However, for point-wise
connections, it's impossible while retaining these properties.
Consider, {{source -> keyby -> task1 -> forward -> task2}}. No if we want to
rescale from parallelism p = 1 to p = 2, suddenly the records inside the keyby
channels need to be divided into two channels according to the keygroups. That
is easily possible by using the keygroup ranges of the operators and a way to
determine the key(group) of the record (independent of the actual approach).
For the forward channel, we completely lack the key context. No record in the
forward channel has any keygroup assigned; it's also not possible to calculate
it as there is no guarantee that the key is still present.
The root cause for this limitation is the conceptual mismatch between what we
provide and what some users assume we provide (or we assume that the users
assume). For example, it's impossible to use (keyed) state in task2 right now,
because there is no key context, but we still want to guarantee orderness in
respect to that key context.
For 1.13, the easiest solution is to disable channel state in pointwise
connections. For any non-trivial application with at least one shuffle, the
number of pointwise channels (linear to p) is quickly dwarfed by all-to-all
connections (quadratic to p). I'd add some alternative ideas to the discussion.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)