Hi,

First of all, thanks a lot for this extensive proposal! It contains a lot
of good observations and techniques how to address data skew.

I have a few remarks:

1) The terms Input and Output Contract were introduced in the first
scientific publications and are not used anymore. Input Contract are what
we call operators or transformations today, the concept of output contract
is completely gone.
In the current code, we have operators like Map, Reduce, and Join that
describe how data needs to be organized (by key, etc.) and UDFs that
process the data.

2) I would categorize skew as follows:

- UDF Call Complexity Skew: The input cardinalities of UDF calls differ
(only applicable to group-based operators such as GroupReduce and CoGroup)
or the computational complexity of UDF calls depends on the data and varies
a lot. UDF calls are the smallest parallelizable unit. It is not possible
to change that without changing the semantics. Combiners can help to reduce
the effect of skew for group-based operators.

- Input Partition Skew: The cardinality of parallel partitions varies. This
is handled by Flink as follows:
    - Lazy split assignment for data sources
    - Operators that do not require special partitioning (Map, Filter,
Cross, etc.) just consume the output partitions of the preceding operator.
Rebalance() can be used to enforce round-robin partitioning to equalize
size of all partitions.
    - Operators that require key-based partitioning use hash partitioning.
Range partitioning can help address significant data skew.

- UDF Call Skew: The number of UDF calls per parallel partition varies.
This can be an issue for n-m joins which essentially result in Cartesian
products.
    - UDF Call Skew is most relevant for Joins
    - UDF Call Skew for Map, Reduce, CoGroup, Cross can be controlled by
controlling Input Partition Skew

3) I agree that we should not try to detect and automatically fix data skew
(at the moment) but give users tools to manually manage skew.

4) I would focus on addressing the Input Partition Skew problem. UDF Call
Complexity Skew cannot be addressed because it would change the semantics
of operators. UDF Call Skew is only affecting joins and much harder to
solve.

5) I wonder how much the practical gain is to address the
Non-Range-Equally-Splittable case compared to the added code complexity. In
general, tackling skew is a very good idea, but solving corner cases with
quite complex methods might make future features more complicated to add.
Hence, I would propose to focus on the common and "easy" cases first.

I would address Input Partition Skew first and ignore the
Non-Range-Equally-Splittable case for now. We can do this in two steps:

1) Add the "simple" range partitioner as in your pull request for unary
operators (explicit range partitioning, total order, groupBy). Once the
sampling happens online, this is a very good addition to Flink.
2) Add the "simple" range partitioner also for binary operators (join,
coGroup). This will be a bit more tricky, because we need to do a
coordinated decision for both inputs.
3) Expose range partitioning for GroupBy, Join, CoGroup to the API, maybe
through optimizer hints.

Since we want to have this transparently handled by the API and engine, we
need to add a lot of these features into the optimizer, or
JobGraphGenerator to be more precisely.

Does that make sense to you?

Cheers, Fabian

2015-10-16 17:13 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi,
>
> thanks for starting a discussion about data skew! I agree, it's a
> important issue that can cause a lot of problems.
> I'll have a look at your proposal and add comments soon.
>
> Thanks, Fabian
>
> 2015-10-15 12:24 GMT+02:00 Li, Chengxiang <chengxiang...@intel.com>:
>
>> Dear all,
>> In many real world use case, data are nature to be skewed. For example,
>> in social network, famous people get much more "follow" than others, a hot
>> tweet would be transferred millions of times. and the purchased records of
>> normal product can never compared to hot products. While at the same time,
>> Flink runtime assume that all tasks consume same size resources, this's not
>> always true. Skew data handling try to make skewed data fit into Flink's
>> runtime.
>> I write a proposal about skew data handling in Flink, you can read it at
>> https://docs.google.com/document/d/1ma060BUlhXDqeFmviEO7Io4CXLKgrAXIfeDYldvZsKI/edit?usp=sharing
>> .
>> Any comments and feedback are welcome, you can comment on the google doc,
>> or reply this email thread directly.
>>
>> Thanks
>> Chengxiang
>>
>
>

Reply via email to