Timo Walther created FLINK-23470:
------------------------------------
Summary: Use blocking shuffles but pipeline within a slot
Key: FLINK-23470
URL: https://issues.apache.org/jira/browse/FLINK-23470
Project: Flink
Issue Type: New Feature
Components: API / DataStream
Reporter: Timo Walther
As discussed in FLINK-23402, we would like to introduce a good default shuffle
mode for batch runtime mode that is a trade-off between all pipelined and all
blocking shuffles.
>From the discussion in FLINK-23402:
For the shuffle modes, I think those three settings are actually sufficient.:
1. pipeline all, for batch execution that wants pipelined shuffles. (Still
batch recovery, no checkpoints, batch operators)
2. batch all, just in case you want to.
3. batch shuffles, pipeline within a slot. (DEFAULT)
This should be the default, and it means we batch whenever a slot has a
dependency on another slot.
A dependency between slots is:
- any all-to-all connection (keyBy, broadcast, rebalance, random)
- any pointwise connection (rescale)
- any forward between different slot sharing groups
Effectively only FORWARD connections within the same slot sharing group has no
dependency on another slot.
That mode makes a lot of sense as the default, because it guarantees that we
can always run the program as long as we have at least one slot. No resource
starvation ever. But it retains pipelining where we don't chain operators due
to missing chaining logic (but we still slot-share them).
Compared to this (3) mode, FORWARD_EDGES_PIPELINED and
POINTWISE_EDGES_PIPELINED are not well-defined.
POINTWISE_EDGES_PIPELINED is a gamble, it only works if you have a certain
amount of resources, related to the rescale factor. Otherwise the job may fail
with resource starvation. Hard to understand and debug for users; not a great
option in my opinion.
FORWARD_EDGES_PIPELINED can also lead to job failure with resource starvation
when the forward connection connects different slot sharing groups.
That's why I would drop those (they make it confusing for users) not reuse the
GlobalDataExchangeMode, and rather introduce the option (3) above, which mostly
batches the exchanges, except when then they are guaranteed to be in the same
slot.
As a side note: The difference between (3) and (2) should be already relatively
small in SQL jobs and become smaller over time, as more and more can be chained
together.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)