Yes, that sounds good to me. Implement support for generic range partitioning first and go for the non-range-equally-splittable cases later.
Best, Fabian 2015-10-20 5:21 GMT+02:00 Li, Chengxiang <chengxiang...@intel.com>: > Thanks a lot for the comments, Fabian. I agree with you on the plan > mostly, just add some more thoughts about Non-Range-Equally-Splittable > case here. > 1. Let's assume a case which 10% data is skewed on certain key, in this > case, as long as the parallelism is larger than 10, it would fit into > Non-Range-Equally-Splittable case. So it should not be very corner case of > skew issue. > 2. In proposal, the solution of Non-Range-Equally-Splittable case is based > on 2 new RangePartitioner and little optimizer logic, which has been > touched already in the plan #1, #2. It does not require to change anything > about the operator semantics, so if we have a good partitioner abstraction, > I think it does not add much complexity for Flink to handle this kind of > issue. > It should not block anything, after finished the simple case, we would > have more knowledge about the implementation details, then we can look back > at this issue, and decide whether it's deserved to be resolved at the cost. > > Thanks > Chengxiang > -----Original Message----- > From: Fabian Hueske [mailto:fhue...@gmail.com] > Sent: Monday, October 19, 2015 7:15 PM > To: dev@flink.apache.org > Subject: Re: A proposal about skew data handling in Flink > > Hi, > > First of all, thanks a lot for this extensive proposal! It contains a lot > of good observations and techniques how to address data skew. > > I have a few remarks: > > 1) The terms Input and Output Contract were introduced in the first > scientific publications and are not used anymore. Input Contract are what > we call operators or transformations today, the concept of output contract > is completely gone. > In the current code, we have operators like Map, Reduce, and Join that > describe how data needs to be organized (by key, etc.) and UDFs that > process the data. > > 2) I would categorize skew as follows: > > - UDF Call Complexity Skew: The input cardinalities of UDF calls differ > (only applicable to group-based operators such as GroupReduce and CoGroup) > or the computational complexity of UDF calls depends on the data and varies > a lot. UDF calls are the smallest parallelizable unit. It is not possible > to change that without changing the semantics. Combiners can help to reduce > the effect of skew for group-based operators. > > - Input Partition Skew: The cardinality of parallel partitions varies. > This is handled by Flink as follows: > - Lazy split assignment for data sources > - Operators that do not require special partitioning (Map, Filter, > Cross, etc.) just consume the output partitions of the preceding operator. > Rebalance() can be used to enforce round-robin partitioning to equalize > size of all partitions. > - Operators that require key-based partitioning use hash partitioning. > Range partitioning can help address significant data skew. > > - UDF Call Skew: The number of UDF calls per parallel partition varies. > This can be an issue for n-m joins which essentially result in Cartesian > products. > - UDF Call Skew is most relevant for Joins > - UDF Call Skew for Map, Reduce, CoGroup, Cross can be controlled by > controlling Input Partition Skew > > 3) I agree that we should not try to detect and automatically fix data > skew (at the moment) but give users tools to manually manage skew. > > 4) I would focus on addressing the Input Partition Skew problem. UDF Call > Complexity Skew cannot be addressed because it would change the semantics > of operators. UDF Call Skew is only affecting joins and much harder to > solve. > > 5) I wonder how much the practical gain is to address the > Non-Range-Equally-Splittable case compared to the added code complexity. In > general, tackling skew is a very good idea, but solving corner cases with > quite complex methods might make future features more complicated to add. > Hence, I would propose to focus on the common and "easy" cases first. > > I would address Input Partition Skew first and ignore the > Non-Range-Equally-Splittable case for now. We can do this in two steps: > > 1) Add the "simple" range partitioner as in your pull request for unary > operators (explicit range partitioning, total order, groupBy). Once the > sampling happens online, this is a very good addition to Flink. > 2) Add the "simple" range partitioner also for binary operators (join, > coGroup). This will be a bit more tricky, because we need to do a > coordinated decision for both inputs. > 3) Expose range partitioning for GroupBy, Join, CoGroup to the API, maybe > through optimizer hints. > > Since we want to have this transparently handled by the API and engine, we > need to add a lot of these features into the optimizer, or > JobGraphGenerator to be more precisely. > > Does that make sense to you? > > Cheers, Fabian > > 2015-10-16 17:13 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > > > Hi, > > > > thanks for starting a discussion about data skew! I agree, it's a > > important issue that can cause a lot of problems. > > I'll have a look at your proposal and add comments soon. > > > > Thanks, Fabian > > > > 2015-10-15 12:24 GMT+02:00 Li, Chengxiang <chengxiang...@intel.com>: > > > >> Dear all, > >> In many real world use case, data are nature to be skewed. For > >> example, in social network, famous people get much more "follow" than > >> others, a hot tweet would be transferred millions of times. and the > >> purchased records of normal product can never compared to hot > >> products. While at the same time, Flink runtime assume that all tasks > >> consume same size resources, this's not always true. Skew data > >> handling try to make skewed data fit into Flink's runtime. > >> I write a proposal about skew data handling in Flink, you can read it > >> at > >> https://docs.google.com/document/d/1ma060BUlhXDqeFmviEO7Io4CXLKgrAXIf > >> eDYldvZsKI/edit?usp=sharing > >> . > >> Any comments and feedback are welcome, you can comment on the google > >> doc, or reply this email thread directly. > >> > >> Thanks > >> Chengxiang > >> > > > > >