Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose
hash function, so Join can't benefit from this interface, as Join doesn't
require a general ClusteredDistribution, but a more specific one
called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and
save shuffle. We can add a new interface to expose the hash function to
make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote:

> I just took a look at SupportsReportPartitioning and I'm not sure that it
> will work for real use cases. It doesn't specify, as far as I can tell, a
> hash function for combining clusters into tasks or a way to provide Spark a
> hash function for the other side of a join. It seems unlikely to me that
> many data sources would have partitioning that happens to match the other
> side of a join. And, it looks like task order matters? Maybe I'm missing
> something?
>
> I think that we should design the write side independently based on what
> data stores actually need, and take a look at the read side based on what
> data stores can actually provide. Wenchen, was there a design doc for
> partitioning on the read path?
>
> I completely agree with your point about a global sort. We recommend to
> all of our data engineers to add a sort to most tables because it
> introduces the range partitioner and does a skew calculation, in addition
> to making data filtering much better when it is read. It's really common
> for tables to be skewed by partition values.
>
> rb
>
> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <patrick.woo...@gmail.com>
> wrote:
>
>> Hey Ryan, Ted, Wenchen
>>
>> Thanks for the quick replies.
>>
>> @Ryan - the sorting portion makes sense, but I think we'd have to ensure
>> something similar to requiredChildDistribution in SparkPlan where we have
>> the number of partitions as well if we'd want to further report to
>> SupportsReportPartitioning, yeah?
>>
>> Specifying an explicit global sort can also be useful for filtering
>> purposes on Parquet row group stats if we have a time based/high
>> cardinality ID field. If my datasource or catalog knows about previous
>> queries on a table, it could be really useful to recommend more appropriate
>> formatting for consumers on the next materialization. The same would be
>> true of clustering on commonly joined fields.
>>
>> Thanks again
>> Pat
>>
>>
>>
>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Hmm. Ryan seems to be right.
>>>
>>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/re
>>> ader/SupportsReportPartitioning.java :
>>>
>>> import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
>>> ...
>>>   Partitioning outputPartitioning();
>>>
>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com>
>>> wrote:
>>>
>>>> Actually clustering is already supported, please take a look at
>>>> SupportsReportPartitioning
>>>>
>>>> Ordering is not proposed yet, might be similar to what Ryan proposed.
>>>>
>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Interesting.
>>>>>
>>>>> Should requiredClustering return a Set of Expression's ?
>>>>> This way, we can determine the order of Expression's by looking at
>>>>> what requiredOrdering() returns.
>>>>>
>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <rb...@netflix.com.invalid>
>>>>> wrote:
>>>>>
>>>>>> Hi Pat,
>>>>>>
>>>>>> Thanks for starting the discussion on this, we’re really interested
>>>>>> in it as well. I don’t think there is a proposed API yet, but I was
>>>>>> thinking something like this:
>>>>>>
>>>>>> interface RequiresClustering {
>>>>>>   List<Expression> requiredClustering();
>>>>>> }
>>>>>>
>>>>>> interface RequiresSort {
>>>>>>   List<SortOrder> requiredOrdering();
>>>>>> }
>>>>>>
>>>>>> The reason why RequiresClustering should provide Expression is that
>>>>>> it needs to be able to customize the implementation. For example, writing
>>>>>> to HTable would require building a key (or the data for a key) and that
>>>>>> might use a hash function that differs from Spark’s built-ins.
>>>>>> RequiresSort is fairly straightforward, but the interaction between
>>>>>> the two requirements deserves some consideration. To make the two
>>>>>> compatible, I think that RequiresSort must be interpreted as a sort
>>>>>> within each partition of the clustering, but could possibly be used for a
>>>>>> global sort when the two overlap.
>>>>>>
>>>>>> For example, if I have a table partitioned by “day” and “category”
>>>>>> then the RequiredClustering would be by day, category. A required
>>>>>> sort might be day ASC, category DESC, name ASC. Because that sort
>>>>>> satisfies the required clustering, it could be used for a global 
>>>>>> ordering.
>>>>>> But, is that useful? How would the global ordering matter beyond a sort
>>>>>> within each partition, i.e., how would the partition’s place in the 
>>>>>> global
>>>>>> ordering be passed?
>>>>>>
>>>>>> To your other questions, you might want to have a look at the recent
>>>>>> SPIP I’m working on to consolidate and clean up logical plans
>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>>> That proposes more specific uses for the DataSourceV2 API that should 
>>>>>> help
>>>>>> clarify what validation needs to take place. As for custom catalyst 
>>>>>> rules,
>>>>>> I’d like to hear about the use cases to see if we can build it into these
>>>>>> improvements.
>>>>>>
>>>>>> rb
>>>>>> ​
>>>>>>
>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hey all,
>>>>>>>
>>>>>>> I saw in some of the discussions around DataSourceV2 writes that we
>>>>>>> might have the data source inform Spark of requirements for the input
>>>>>>> data's ordering and partitioning. Has there been a proposed API for that
>>>>>>> yet?
>>>>>>>
>>>>>>> Even one level up it would be helpful to understand how I should be
>>>>>>> thinking about the responsibility of the data source writer, when I 
>>>>>>> should
>>>>>>> be inserting a custom catalyst rule, and how I should handle
>>>>>>> validation/assumptions of the table before attempting the write.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Pat
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to