Thanks for the clarification, definitely would want to require Sort but
only recommend partitioning ...  I think that would be useful to request
based on details about the incoming dataset.

On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rb...@netflix.com> wrote:

> A required clustering would not, but a required sort would. Clustering is
> asking for the input dataframe's partitioning, and sorting would be how
> each partition is sorted.
>
> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> I forgot since it's been a while, but does Clustering support allow
>> requesting that partitions contain elements in order as well? That would be
>> a useful trick for me. IE
>> Request/Require(SortedOn(Col1))
>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>>
>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> Thanks, it makes sense that the existing interface is for aggregation
>>> and not joins. Why are there requirements for the number of partitions that
>>> are returned then?
>>>
>>> Does it makes sense to design the write-side `Requirement` classes and
>>> the read-side reporting separately?
>>>
>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
>>>> expose hash function, so Join can't benefit from this interface, as Join
>>>> doesn't require a general ClusteredDistribution, but a more specific one
>>>> called HashClusteredDistribution.
>>>>
>>>> So currently only Aggregate can benefit from SupportsReportPartitioning
>>>> and save shuffle. We can add a new interface to expose the hash function to
>>>> make it work for Join.
>>>>
>>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> I just took a look at SupportsReportPartitioning and I'm not sure that
>>>>> it will work for real use cases. It doesn't specify, as far as I can tell,
>>>>> a hash function for combining clusters into tasks or a way to provide 
>>>>> Spark
>>>>> a hash function for the other side of a join. It seems unlikely to me that
>>>>> many data sources would have partitioning that happens to match the other
>>>>> side of a join. And, it looks like task order matters? Maybe I'm missing
>>>>> something?
>>>>>
>>>>> I think that we should design the write side independently based on
>>>>> what data stores actually need, and take a look at the read side based on
>>>>> what data stores can actually provide. Wenchen, was there a design doc for
>>>>> partitioning on the read path?
>>>>>
>>>>> I completely agree with your point about a global sort. We recommend
>>>>> to all of our data engineers to add a sort to most tables because it
>>>>> introduces the range partitioner and does a skew calculation, in addition
>>>>> to making data filtering much better when it is read. It's really common
>>>>> for tables to be skewed by partition values.
>>>>>
>>>>> rb
>>>>>
>>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
>>>>> patrick.woo...@gmail.com> wrote:
>>>>>
>>>>>> Hey Ryan, Ted, Wenchen
>>>>>>
>>>>>> Thanks for the quick replies.
>>>>>>
>>>>>> @Ryan - the sorting portion makes sense, but I think we'd have to
>>>>>> ensure something similar to requiredChildDistribution in SparkPlan where 
>>>>>> we
>>>>>> have the number of partitions as well if we'd want to further report to
>>>>>> SupportsReportPartitioning, yeah?
>>>>>>
>>>>>> Specifying an explicit global sort can also be useful for filtering
>>>>>> purposes on Parquet row group stats if we have a time based/high
>>>>>> cardinality ID field. If my datasource or catalog knows about previous
>>>>>> queries on a table, it could be really useful to recommend more 
>>>>>> appropriate
>>>>>> formatting for consumers on the next materialization. The same would be
>>>>>> true of clustering on commonly joined fields.
>>>>>>
>>>>>> Thanks again
>>>>>> Pat
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> Hmm. Ryan seems to be right.
>>>>>>>
>>>>>>> Looking
>>>>>>> at 
>>>>>>> sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
>>>>>>> :
>>>>>>>
>>>>>>> import
>>>>>>> org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
>>>>>>> ...
>>>>>>>   Partitioning outputPartitioning();
>>>>>>>
>>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Actually clustering is already supported, please take a look at
>>>>>>>> SupportsReportPartitioning
>>>>>>>>
>>>>>>>> Ordering is not proposed yet, might be similar to what Ryan
>>>>>>>> proposed.
>>>>>>>>
>>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Interesting.
>>>>>>>>>
>>>>>>>>> Should requiredClustering return a Set of Expression's ?
>>>>>>>>> This way, we can determine the order of Expression's by looking at
>>>>>>>>> what requiredOrdering() returns.
>>>>>>>>>
>>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
>>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Pat,
>>>>>>>>>>
>>>>>>>>>> Thanks for starting the discussion on this, we’re really
>>>>>>>>>> interested in it as well. I don’t think there is a proposed API yet, 
>>>>>>>>>> but I
>>>>>>>>>> was thinking something like this:
>>>>>>>>>>
>>>>>>>>>> interface RequiresClustering {
>>>>>>>>>>   List<Expression> requiredClustering();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> interface RequiresSort {
>>>>>>>>>>   List<SortOrder> requiredOrdering();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> The reason why RequiresClustering should provide Expression is
>>>>>>>>>> that it needs to be able to customize the implementation. For 
>>>>>>>>>> example,
>>>>>>>>>> writing to HTable would require building a key (or the data for a 
>>>>>>>>>> key) and
>>>>>>>>>> that might use a hash function that differs from Spark’s built-ins.
>>>>>>>>>> RequiresSort is fairly straightforward, but the interaction
>>>>>>>>>> between the two requirements deserves some consideration. To make 
>>>>>>>>>> the two
>>>>>>>>>> compatible, I think that RequiresSort must be interpreted as a
>>>>>>>>>> sort within each partition of the clustering, but could possibly be 
>>>>>>>>>> used
>>>>>>>>>> for a global sort when the two overlap.
>>>>>>>>>>
>>>>>>>>>> For example, if I have a table partitioned by “day” and
>>>>>>>>>> “category” then the RequiredClustering would be by day, category.
>>>>>>>>>> A required sort might be day ASC, category DESC, name ASC.
>>>>>>>>>> Because that sort satisfies the required clustering, it could be 
>>>>>>>>>> used for a
>>>>>>>>>> global ordering. But, is that useful? How would the global ordering 
>>>>>>>>>> matter
>>>>>>>>>> beyond a sort within each partition, i.e., how would the partition’s 
>>>>>>>>>> place
>>>>>>>>>> in the global ordering be passed?
>>>>>>>>>>
>>>>>>>>>> To your other questions, you might want to have a look at the
>>>>>>>>>> recent SPIP I’m working on to consolidate and clean up logical
>>>>>>>>>> plans
>>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that 
>>>>>>>>>> should help
>>>>>>>>>> clarify what validation needs to take place. As for custom catalyst 
>>>>>>>>>> rules,
>>>>>>>>>> I’d like to hear about the use cases to see if we can build it into 
>>>>>>>>>> these
>>>>>>>>>> improvements.
>>>>>>>>>>
>>>>>>>>>> rb
>>>>>>>>>> ​
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
>>>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey all,
>>>>>>>>>>>
>>>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes that
>>>>>>>>>>> we might have the data source inform Spark of requirements for the 
>>>>>>>>>>> input
>>>>>>>>>>> data's ordering and partitioning. Has there been a proposed API for 
>>>>>>>>>>> that
>>>>>>>>>>> yet?
>>>>>>>>>>>
>>>>>>>>>>> Even one level up it would be helpful to understand how I should
>>>>>>>>>>> be thinking about the responsibility of the data source writer, 
>>>>>>>>>>> when I
>>>>>>>>>>> should be inserting a custom catalyst rule, and how I should handle
>>>>>>>>>>> validation/assumptions of the table before attempting the write.
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Pat
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Ryan Blue
>>>>>>>>>> Software Engineer
>>>>>>>>>> Netflix
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to