How would Spark determine whether or not to apply a recommendation - a cost threshold? And yes, it would be good to flesh out what information we get from Spark in the datasource when providing these recommendations/requirements - I could see statistics and the existing outputPartitioning/Ordering of the child plan being used for providing the requirement.
Should a datasource be able to provide a Distribution proper rather than just the clustering expressions? Two use cases would be for explicit global sorting of the dataset and attempting to ensure a minimum write task size/number of write tasks. On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <russell.spit...@gmail.com> wrote: > Thanks for the clarification, definitely would want to require Sort but > only recommend partitioning ... I think that would be useful to request > based on details about the incoming dataset. > > On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rb...@netflix.com> wrote: > >> A required clustering would not, but a required sort would. Clustering is >> asking for the input dataframe's partitioning, and sorting would be how >> each partition is sorted. >> >> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer < >> russell.spit...@gmail.com> wrote: >> >>> I forgot since it's been a while, but does Clustering support allow >>> requesting that partitions contain elements in order as well? That would be >>> a useful trick for me. IE >>> Request/Require(SortedOn(Col1)) >>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2)) >>> >>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid> >>> wrote: >>> >>>> Thanks, it makes sense that the existing interface is for aggregation >>>> and not joins. Why are there requirements for the number of partitions that >>>> are returned then? >>>> >>>> Does it makes sense to design the write-side `Requirement` classes and >>>> the read-side reporting separately? >>>> >>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com> >>>> wrote: >>>> >>>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't >>>>> expose hash function, so Join can't benefit from this interface, as Join >>>>> doesn't require a general ClusteredDistribution, but a more specific one >>>>> called HashClusteredDistribution. >>>>> >>>>> So currently only Aggregate can benefit from >>>>> SupportsReportPartitioning and save shuffle. We can add a new interface to >>>>> expose the hash function to make it work for Join. >>>>> >>>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote: >>>>> >>>>>> I just took a look at SupportsReportPartitioning and I'm not sure >>>>>> that it will work for real use cases. It doesn't specify, as far as I can >>>>>> tell, a hash function for combining clusters into tasks or a way to >>>>>> provide >>>>>> Spark a hash function for the other side of a join. It seems unlikely to >>>>>> me >>>>>> that many data sources would have partitioning that happens to match the >>>>>> other side of a join. And, it looks like task order matters? Maybe I'm >>>>>> missing something? >>>>>> >>>>>> I think that we should design the write side independently based on >>>>>> what data stores actually need, and take a look at the read side based on >>>>>> what data stores can actually provide. Wenchen, was there a design doc >>>>>> for >>>>>> partitioning on the read path? >>>>>> >>>>>> I completely agree with your point about a global sort. We recommend >>>>>> to all of our data engineers to add a sort to most tables because it >>>>>> introduces the range partitioner and does a skew calculation, in addition >>>>>> to making data filtering much better when it is read. It's really common >>>>>> for tables to be skewed by partition values. >>>>>> >>>>>> rb >>>>>> >>>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody < >>>>>> patrick.woo...@gmail.com> wrote: >>>>>> >>>>>>> Hey Ryan, Ted, Wenchen >>>>>>> >>>>>>> Thanks for the quick replies. >>>>>>> >>>>>>> @Ryan - the sorting portion makes sense, but I think we'd have to >>>>>>> ensure something similar to requiredChildDistribution in SparkPlan >>>>>>> where we >>>>>>> have the number of partitions as well if we'd want to further report to >>>>>>> SupportsReportPartitioning, yeah? >>>>>>> >>>>>>> Specifying an explicit global sort can also be useful for filtering >>>>>>> purposes on Parquet row group stats if we have a time based/high >>>>>>> cardinality ID field. If my datasource or catalog knows about previous >>>>>>> queries on a table, it could be really useful to recommend more >>>>>>> appropriate >>>>>>> formatting for consumers on the next materialization. The same would be >>>>>>> true of clustering on commonly joined fields. >>>>>>> >>>>>>> Thanks again >>>>>>> Pat >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hmm. Ryan seems to be right. >>>>>>>> >>>>>>>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/ >>>>>>>> reader/SupportsReportPartitioning.java : >>>>>>>> >>>>>>>> import org.apache.spark.sql.sources.v2.reader.partitioning. >>>>>>>> Partitioning; >>>>>>>> ... >>>>>>>> Partitioning outputPartitioning(); >>>>>>>> >>>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Actually clustering is already supported, please take a look at >>>>>>>>> SupportsReportPartitioning >>>>>>>>> >>>>>>>>> Ordering is not proposed yet, might be similar to what Ryan >>>>>>>>> proposed. >>>>>>>>> >>>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Interesting. >>>>>>>>>> >>>>>>>>>> Should requiredClustering return a Set of Expression's ? >>>>>>>>>> This way, we can determine the order of Expression's by looking >>>>>>>>>> at what requiredOrdering() returns. >>>>>>>>>> >>>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue < >>>>>>>>>> rb...@netflix.com.invalid> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Pat, >>>>>>>>>>> >>>>>>>>>>> Thanks for starting the discussion on this, we’re really >>>>>>>>>>> interested in it as well. I don’t think there is a proposed API >>>>>>>>>>> yet, but I >>>>>>>>>>> was thinking something like this: >>>>>>>>>>> >>>>>>>>>>> interface RequiresClustering { >>>>>>>>>>> List<Expression> requiredClustering(); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> interface RequiresSort { >>>>>>>>>>> List<SortOrder> requiredOrdering(); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> The reason why RequiresClustering should provide Expression is >>>>>>>>>>> that it needs to be able to customize the implementation. For >>>>>>>>>>> example, >>>>>>>>>>> writing to HTable would require building a key (or the data for a >>>>>>>>>>> key) and >>>>>>>>>>> that might use a hash function that differs from Spark’s built-ins. >>>>>>>>>>> RequiresSort is fairly straightforward, but the interaction >>>>>>>>>>> between the two requirements deserves some consideration. To make >>>>>>>>>>> the two >>>>>>>>>>> compatible, I think that RequiresSort must be interpreted as a >>>>>>>>>>> sort within each partition of the clustering, but could possibly be >>>>>>>>>>> used >>>>>>>>>>> for a global sort when the two overlap. >>>>>>>>>>> >>>>>>>>>>> For example, if I have a table partitioned by “day” and >>>>>>>>>>> “category” then the RequiredClustering would be by day, category. >>>>>>>>>>> A required sort might be day ASC, category DESC, name ASC. >>>>>>>>>>> Because that sort satisfies the required clustering, it could be >>>>>>>>>>> used for a >>>>>>>>>>> global ordering. But, is that useful? How would the global ordering >>>>>>>>>>> matter >>>>>>>>>>> beyond a sort within each partition, i.e., how would the >>>>>>>>>>> partition’s place >>>>>>>>>>> in the global ordering be passed? >>>>>>>>>>> >>>>>>>>>>> To your other questions, you might want to have a look at the >>>>>>>>>>> recent SPIP I’m working on to consolidate and clean up logical >>>>>>>>>>> plans >>>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>. >>>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that >>>>>>>>>>> should help >>>>>>>>>>> clarify what validation needs to take place. As for custom catalyst >>>>>>>>>>> rules, >>>>>>>>>>> I’d like to hear about the use cases to see if we can build it into >>>>>>>>>>> these >>>>>>>>>>> improvements. >>>>>>>>>>> >>>>>>>>>>> rb >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody < >>>>>>>>>>> patrick.woo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey all, >>>>>>>>>>>> >>>>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes >>>>>>>>>>>> that we might have the data source inform Spark of requirements >>>>>>>>>>>> for the >>>>>>>>>>>> input data's ordering and partitioning. Has there been a proposed >>>>>>>>>>>> API for >>>>>>>>>>>> that yet? >>>>>>>>>>>> >>>>>>>>>>>> Even one level up it would be helpful to understand how I >>>>>>>>>>>> should be thinking about the responsibility of the data source >>>>>>>>>>>> writer, when >>>>>>>>>>>> I should be inserting a custom catalyst rule, and how I should >>>>>>>>>>>> handle >>>>>>>>>>>> validation/assumptions of the table before attempting the write. >>>>>>>>>>>> >>>>>>>>>>>> Thanks! >>>>>>>>>>>> Pat >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Ryan Blue >>>>>>>>>>> Software Engineer >>>>>>>>>>> Netflix >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> Software Engineer >>>>>> Netflix >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> >