Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose hash function, so Join can't benefit from this interface, as Join doesn't require a general ClusteredDistribution, but a more specific one called HashClusteredDistribution.
So currently only Aggregate can benefit from SupportsReportPartitioning and save shuffle. We can add a new interface to expose the hash function to make it work for Join. On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote: > I just took a look at SupportsReportPartitioning and I'm not sure that it > will work for real use cases. It doesn't specify, as far as I can tell, a > hash function for combining clusters into tasks or a way to provide Spark a > hash function for the other side of a join. It seems unlikely to me that > many data sources would have partitioning that happens to match the other > side of a join. And, it looks like task order matters? Maybe I'm missing > something? > > I think that we should design the write side independently based on what > data stores actually need, and take a look at the read side based on what > data stores can actually provide. Wenchen, was there a design doc for > partitioning on the read path? > > I completely agree with your point about a global sort. We recommend to > all of our data engineers to add a sort to most tables because it > introduces the range partitioner and does a skew calculation, in addition > to making data filtering much better when it is read. It's really common > for tables to be skewed by partition values. > > rb > > On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <patrick.woo...@gmail.com> > wrote: > >> Hey Ryan, Ted, Wenchen >> >> Thanks for the quick replies. >> >> @Ryan - the sorting portion makes sense, but I think we'd have to ensure >> something similar to requiredChildDistribution in SparkPlan where we have >> the number of partitions as well if we'd want to further report to >> SupportsReportPartitioning, yeah? >> >> Specifying an explicit global sort can also be useful for filtering >> purposes on Parquet row group stats if we have a time based/high >> cardinality ID field. If my datasource or catalog knows about previous >> queries on a table, it could be really useful to recommend more appropriate >> formatting for consumers on the next materialization. The same would be >> true of clustering on commonly joined fields. >> >> Thanks again >> Pat >> >> >> >> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Hmm. Ryan seems to be right. >>> >>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/re >>> ader/SupportsReportPartitioning.java : >>> >>> import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; >>> ... >>> Partitioning outputPartitioning(); >>> >>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com> >>> wrote: >>> >>>> Actually clustering is already supported, please take a look at >>>> SupportsReportPartitioning >>>> >>>> Ordering is not proposed yet, might be similar to what Ryan proposed. >>>> >>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> Interesting. >>>>> >>>>> Should requiredClustering return a Set of Expression's ? >>>>> This way, we can determine the order of Expression's by looking at >>>>> what requiredOrdering() returns. >>>>> >>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <rb...@netflix.com.invalid> >>>>> wrote: >>>>> >>>>>> Hi Pat, >>>>>> >>>>>> Thanks for starting the discussion on this, we’re really interested >>>>>> in it as well. I don’t think there is a proposed API yet, but I was >>>>>> thinking something like this: >>>>>> >>>>>> interface RequiresClustering { >>>>>> List<Expression> requiredClustering(); >>>>>> } >>>>>> >>>>>> interface RequiresSort { >>>>>> List<SortOrder> requiredOrdering(); >>>>>> } >>>>>> >>>>>> The reason why RequiresClustering should provide Expression is that >>>>>> it needs to be able to customize the implementation. For example, writing >>>>>> to HTable would require building a key (or the data for a key) and that >>>>>> might use a hash function that differs from Spark’s built-ins. >>>>>> RequiresSort is fairly straightforward, but the interaction between >>>>>> the two requirements deserves some consideration. To make the two >>>>>> compatible, I think that RequiresSort must be interpreted as a sort >>>>>> within each partition of the clustering, but could possibly be used for a >>>>>> global sort when the two overlap. >>>>>> >>>>>> For example, if I have a table partitioned by “day” and “category” >>>>>> then the RequiredClustering would be by day, category. A required >>>>>> sort might be day ASC, category DESC, name ASC. Because that sort >>>>>> satisfies the required clustering, it could be used for a global >>>>>> ordering. >>>>>> But, is that useful? How would the global ordering matter beyond a sort >>>>>> within each partition, i.e., how would the partition’s place in the >>>>>> global >>>>>> ordering be passed? >>>>>> >>>>>> To your other questions, you might want to have a look at the recent >>>>>> SPIP I’m working on to consolidate and clean up logical plans >>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>. >>>>>> That proposes more specific uses for the DataSourceV2 API that should >>>>>> help >>>>>> clarify what validation needs to take place. As for custom catalyst >>>>>> rules, >>>>>> I’d like to hear about the use cases to see if we can build it into these >>>>>> improvements. >>>>>> >>>>>> rb >>>>>> >>>>>> >>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody < >>>>>> patrick.woo...@gmail.com> wrote: >>>>>> >>>>>>> Hey all, >>>>>>> >>>>>>> I saw in some of the discussions around DataSourceV2 writes that we >>>>>>> might have the data source inform Spark of requirements for the input >>>>>>> data's ordering and partitioning. Has there been a proposed API for that >>>>>>> yet? >>>>>>> >>>>>>> Even one level up it would be helpful to understand how I should be >>>>>>> thinking about the responsibility of the data source writer, when I >>>>>>> should >>>>>>> be inserting a custom catalyst rule, and how I should handle >>>>>>> validation/assumptions of the table before attempting the write. >>>>>>> >>>>>>> Thanks! >>>>>>> Pat >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> Software Engineer >>>>>> Netflix >>>>>> >>>>> >>>>> >>>> >>> >> > > > -- > Ryan Blue > Software Engineer > Netflix >