Thanks for the clarification, definitely would want to require Sort but only recommend partitioning ... I think that would be useful to request based on details about the incoming dataset.
On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rb...@netflix.com> wrote: > A required clustering would not, but a required sort would. Clustering is > asking for the input dataframe's partitioning, and sorting would be how > each partition is sorted. > > On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer < > russell.spit...@gmail.com> wrote: > >> I forgot since it's been a while, but does Clustering support allow >> requesting that partitions contain elements in order as well? That would be >> a useful trick for me. IE >> Request/Require(SortedOn(Col1)) >> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2)) >> >> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid> >> wrote: >> >>> Thanks, it makes sense that the existing interface is for aggregation >>> and not joins. Why are there requirements for the number of partitions that >>> are returned then? >>> >>> Does it makes sense to design the write-side `Requirement` classes and >>> the read-side reporting separately? >>> >>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com> >>> wrote: >>> >>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't >>>> expose hash function, so Join can't benefit from this interface, as Join >>>> doesn't require a general ClusteredDistribution, but a more specific one >>>> called HashClusteredDistribution. >>>> >>>> So currently only Aggregate can benefit from SupportsReportPartitioning >>>> and save shuffle. We can add a new interface to expose the hash function to >>>> make it work for Join. >>>> >>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote: >>>> >>>>> I just took a look at SupportsReportPartitioning and I'm not sure that >>>>> it will work for real use cases. It doesn't specify, as far as I can tell, >>>>> a hash function for combining clusters into tasks or a way to provide >>>>> Spark >>>>> a hash function for the other side of a join. It seems unlikely to me that >>>>> many data sources would have partitioning that happens to match the other >>>>> side of a join. And, it looks like task order matters? Maybe I'm missing >>>>> something? >>>>> >>>>> I think that we should design the write side independently based on >>>>> what data stores actually need, and take a look at the read side based on >>>>> what data stores can actually provide. Wenchen, was there a design doc for >>>>> partitioning on the read path? >>>>> >>>>> I completely agree with your point about a global sort. We recommend >>>>> to all of our data engineers to add a sort to most tables because it >>>>> introduces the range partitioner and does a skew calculation, in addition >>>>> to making data filtering much better when it is read. It's really common >>>>> for tables to be skewed by partition values. >>>>> >>>>> rb >>>>> >>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody < >>>>> patrick.woo...@gmail.com> wrote: >>>>> >>>>>> Hey Ryan, Ted, Wenchen >>>>>> >>>>>> Thanks for the quick replies. >>>>>> >>>>>> @Ryan - the sorting portion makes sense, but I think we'd have to >>>>>> ensure something similar to requiredChildDistribution in SparkPlan where >>>>>> we >>>>>> have the number of partitions as well if we'd want to further report to >>>>>> SupportsReportPartitioning, yeah? >>>>>> >>>>>> Specifying an explicit global sort can also be useful for filtering >>>>>> purposes on Parquet row group stats if we have a time based/high >>>>>> cardinality ID field. If my datasource or catalog knows about previous >>>>>> queries on a table, it could be really useful to recommend more >>>>>> appropriate >>>>>> formatting for consumers on the next materialization. The same would be >>>>>> true of clustering on commonly joined fields. >>>>>> >>>>>> Thanks again >>>>>> Pat >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> Hmm. Ryan seems to be right. >>>>>>> >>>>>>> Looking >>>>>>> at >>>>>>> sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java >>>>>>> : >>>>>>> >>>>>>> import >>>>>>> org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; >>>>>>> ... >>>>>>> Partitioning outputPartitioning(); >>>>>>> >>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Actually clustering is already supported, please take a look at >>>>>>>> SupportsReportPartitioning >>>>>>>> >>>>>>>> Ordering is not proposed yet, might be similar to what Ryan >>>>>>>> proposed. >>>>>>>> >>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Interesting. >>>>>>>>> >>>>>>>>> Should requiredClustering return a Set of Expression's ? >>>>>>>>> This way, we can determine the order of Expression's by looking at >>>>>>>>> what requiredOrdering() returns. >>>>>>>>> >>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue < >>>>>>>>> rb...@netflix.com.invalid> wrote: >>>>>>>>> >>>>>>>>>> Hi Pat, >>>>>>>>>> >>>>>>>>>> Thanks for starting the discussion on this, we’re really >>>>>>>>>> interested in it as well. I don’t think there is a proposed API yet, >>>>>>>>>> but I >>>>>>>>>> was thinking something like this: >>>>>>>>>> >>>>>>>>>> interface RequiresClustering { >>>>>>>>>> List<Expression> requiredClustering(); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> interface RequiresSort { >>>>>>>>>> List<SortOrder> requiredOrdering(); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> The reason why RequiresClustering should provide Expression is >>>>>>>>>> that it needs to be able to customize the implementation. For >>>>>>>>>> example, >>>>>>>>>> writing to HTable would require building a key (or the data for a >>>>>>>>>> key) and >>>>>>>>>> that might use a hash function that differs from Spark’s built-ins. >>>>>>>>>> RequiresSort is fairly straightforward, but the interaction >>>>>>>>>> between the two requirements deserves some consideration. To make >>>>>>>>>> the two >>>>>>>>>> compatible, I think that RequiresSort must be interpreted as a >>>>>>>>>> sort within each partition of the clustering, but could possibly be >>>>>>>>>> used >>>>>>>>>> for a global sort when the two overlap. >>>>>>>>>> >>>>>>>>>> For example, if I have a table partitioned by “day” and >>>>>>>>>> “category” then the RequiredClustering would be by day, category. >>>>>>>>>> A required sort might be day ASC, category DESC, name ASC. >>>>>>>>>> Because that sort satisfies the required clustering, it could be >>>>>>>>>> used for a >>>>>>>>>> global ordering. But, is that useful? How would the global ordering >>>>>>>>>> matter >>>>>>>>>> beyond a sort within each partition, i.e., how would the partition’s >>>>>>>>>> place >>>>>>>>>> in the global ordering be passed? >>>>>>>>>> >>>>>>>>>> To your other questions, you might want to have a look at the >>>>>>>>>> recent SPIP I’m working on to consolidate and clean up logical >>>>>>>>>> plans >>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>. >>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that >>>>>>>>>> should help >>>>>>>>>> clarify what validation needs to take place. As for custom catalyst >>>>>>>>>> rules, >>>>>>>>>> I’d like to hear about the use cases to see if we can build it into >>>>>>>>>> these >>>>>>>>>> improvements. >>>>>>>>>> >>>>>>>>>> rb >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody < >>>>>>>>>> patrick.woo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hey all, >>>>>>>>>>> >>>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes that >>>>>>>>>>> we might have the data source inform Spark of requirements for the >>>>>>>>>>> input >>>>>>>>>>> data's ordering and partitioning. Has there been a proposed API for >>>>>>>>>>> that >>>>>>>>>>> yet? >>>>>>>>>>> >>>>>>>>>>> Even one level up it would be helpful to understand how I should >>>>>>>>>>> be thinking about the responsibility of the data source writer, >>>>>>>>>>> when I >>>>>>>>>>> should be inserting a custom catalyst rule, and how I should handle >>>>>>>>>>> validation/assumptions of the table before attempting the write. >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> Pat >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Ryan Blue >>>>>>>>>> Software Engineer >>>>>>>>>> Netflix >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Software Engineer >>>>> Netflix >>>>> >>>> >>>> >>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> > > > -- > Ryan Blue > Software Engineer > Netflix >