Yep, that sounds reasonable to me!

On Fri, Mar 30, 2018 at 5:50 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> +1
>
> -------- Original message --------
> From: Ryan Blue <rb...@netflix.com>
> Date: 3/30/18 2:28 PM (GMT-08:00)
> To: Patrick Woody <patrick.woo...@gmail.com>
> Cc: Russell Spitzer <russell.spit...@gmail.com>, Wenchen Fan <
> cloud0...@gmail.com>, Ted Yu <yuzhih...@gmail.com>, Spark Dev List <
> dev@spark.apache.org>
> Subject: Re: DataSourceV2 write input requirements
>
> You're right. A global sort would change the clustering if it had more
> fields than the clustering.
>
> Then what about this: if there is no RequiredClustering, then the sort is
> a global sort. If RequiredClustering is present, then the clustering is
> applied and the sort is a partition-level sort.
>
> That rule would mean that within a partition you always get the sort, but
> an explicit clustering overrides the partitioning a sort might try to
> introduce. Does that sound reasonable?
>
> rb
>
> On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody <patrick.woo...@gmail.com>
> wrote:
>
>> Does that methodology work in this specific case? The ordering must be a
>> subset of the clustering to guarantee they exist in the same partition when
>> doing a global sort I thought. Though I get the gist that if it does
>> satisfy, then there is no reason to not choose the global sort.
>>
>> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> > Can you expand on how the ordering containing the clustering
>>> expressions would ensure the global sort?
>>>
>>> The idea was to basically assume that if the clustering can be satisfied
>>> by a global sort, then do the global sort. For example, if the clustering
>>> is Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort
>>> by columns a, b, and c.
>>>
>>> Technically, you could do this with a hash partitioner instead of a
>>> range partitioner and sort within each partition, but that doesn't make
>>> much sense because the partitioning would ensure that each partition has
>>> just one combination of the required clustering columns. Using a hash
>>> partitioner would make it so that the in-partition sort basically ignores
>>> the first few values, so it must be that the intent was a global sort.
>>>
>>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody <patrick.woo...@gmail.com
>>> > wrote:
>>>
>>>> Right, you could use this to store a global ordering if there is only
>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that
>>>>> case, you would still have a clustering and an ordering, but the ordering
>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>> partition ordinal for the source to store would be required.
>>>>
>>>>
>>>> Can you expand on how the ordering containing the clustering
>>>> expressions would ensure the global sort? Having an RangePartitioning would
>>>> certainly satisfy, but it isn't required - is the suggestion that if Spark
>>>> sees this overlap, then it plans a global sort?
>>>>
>>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
>>>> russell.spit...@gmail.com> wrote:
>>>>
>>>>> @RyanBlue I'm hoping that through the CBO effort we will continue to
>>>>> get more detailed statistics. Like on read we could be using sketch data
>>>>> structures to get estimates on unique values and density for each column.
>>>>> You may be right that the real way for this to be handled would be giving 
>>>>> a
>>>>> "cost" back to a higher order optimizer which can decide which method to
>>>>> use rather than having the data source itself do it. This is probably in a
>>>>> far future version of the api.
>>>>>
>>>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> Cassandra can insert records with the same partition-key faster if
>>>>>> they arrive in the same payload. But this is only beneficial if the
>>>>>> incoming dataset has multiple entries for the same partition key.
>>>>>>
>>>>>> Thanks for the example, the recommended partitioning use case makes
>>>>>> more sense now. I think we could have two interfaces, a
>>>>>> RequiresClustering and a RecommendsClustering if we want to support
>>>>>> this. But I’m skeptical it will be useful for two reasons:
>>>>>>
>>>>>>    - Do we want to optimize the low cardinality case? Shuffles are
>>>>>>    usually much cheaper at smaller sizes, so I’m not sure it is 
>>>>>> necessary to
>>>>>>    optimize this away.
>>>>>>    - How do we know there isn’t just a few partition keys for all
>>>>>>    the records? It may look like a shuffle wouldn’t help, but we don’t 
>>>>>> know
>>>>>>    the partition keys until it is too late.
>>>>>>
>>>>>> Then there’s also the logic for avoiding the shuffle and how to
>>>>>> calculate the cost, which sounds like something that needs some details
>>>>>> from CBO.
>>>>>>
>>>>>> I would assume that given the estimated data size from Spark and
>>>>>> options passed in from the user, the data source could make a more
>>>>>> intelligent requirement on the write format than Spark independently.
>>>>>>
>>>>>> This is a good point.
>>>>>>
>>>>>> What would an implementation actually do here and how would
>>>>>> information be passed? For my use cases, the store would produce the 
>>>>>> number
>>>>>> of tasks based on the estimated incoming rows, because the source has the
>>>>>> best idea of how the rows will compress. But, that’s just applying a
>>>>>> multiplier most of the time. To be very useful, this would have to handle
>>>>>> skew in the rows (think row with a type where total size depends on type)
>>>>>> and that’s a bit harder. I think maybe an interface that can provide
>>>>>> relative cost estimates based on partition keys would be helpful, but 
>>>>>> then
>>>>>> keep the planning logic in Spark.
>>>>>>
>>>>>> This is probably something that we could add later as we find use
>>>>>> cases that require it?
>>>>>>
>>>>>> I wouldn’t assume that a data source requiring a certain write format
>>>>>> would give any guarantees around reading the same data? In the cases 
>>>>>> where
>>>>>> it is a complete overwrite it would, but for independent writes it could
>>>>>> still be useful for statistics or compression.
>>>>>>
>>>>>> Right, you could use this to store a global ordering if there is only
>>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that
>>>>>> case, you would still have a clustering and an ordering, but the ordering
>>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>>> partition ordinal for the source to store would be required.
>>>>>>
>>>>>> For the second point that ordering is useful for statistics and
>>>>>> compression, I completely agree. Our best practices doc tells users to
>>>>>> always add a global sort when writing because you get the benefit of a
>>>>>> range partitioner to handle skew, plus the stats and compression you’re
>>>>>> talking about to optimize for reads. I think the proposed API can 
>>>>>> request a
>>>>>> global ordering from Spark already. My only point is that there isn’t 
>>>>>> much
>>>>>> the source can do to guarantee ordering for reads when there is more than
>>>>>> one write.
>>>>>> ​
>>>>>>
>>>>>> On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody <
>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>
>>>>>>> Spark would always apply the required clustering and sort order
>>>>>>>> because they are required by the data source. It is reasonable for a 
>>>>>>>> source
>>>>>>>> to reject data that isn’t properly prepared. For example, data must be
>>>>>>>> written to HTable files with keys in order or else the files are 
>>>>>>>> invalid.
>>>>>>>> Sorting should not be implemented in the sources themselves because 
>>>>>>>> Spark
>>>>>>>> handles concerns like spilling to disk. Spark must prepare data 
>>>>>>>> correctly,
>>>>>>>> which is why the interfaces start with “Requires”.
>>>>>>>
>>>>>>>
>>>>>>> This was in reference to Russell's suggestion that the data source
>>>>>>> could have a required sort, but only a recommended partitioning. I don't
>>>>>>> have an immediate recommending use case that would come to mind though. 
>>>>>>> I'm
>>>>>>> definitely in sync that the data source itself shouldn't do work 
>>>>>>> outside of
>>>>>>> the writes themselves.
>>>>>>>
>>>>>>> Considering the second use case you mentioned first, I don’t think
>>>>>>>> it is a good idea for a table to put requirements on the number of 
>>>>>>>> tasks
>>>>>>>> used for a write. The parallelism should be set appropriately for the 
>>>>>>>> data
>>>>>>>> volume, which is for Spark or the user to determine. A minimum or 
>>>>>>>> maximum
>>>>>>>> number of tasks could cause bad behavior.
>>>>>>>
>>>>>>>
>>>>>>> For your first use case, an explicit global ordering, the problem is
>>>>>>>> that there can’t be an explicit global ordering for a table when it is
>>>>>>>> populated by a series of independent writes. Each write could have a 
>>>>>>>> global
>>>>>>>> order, but once those files are written, you have to deal with multiple
>>>>>>>> sorted data sets. I think it makes sense to focus on order within data
>>>>>>>> files, not order between data files.
>>>>>>>
>>>>>>>
>>>>>>> This is where I'm interested in learning about the separation of
>>>>>>> responsibilities for the data source and how "smart" it is supposed to 
>>>>>>> be.
>>>>>>>
>>>>>>> For the first part, I would assume that given the estimated data
>>>>>>> size from Spark and options passed in from the user, the data source 
>>>>>>> could
>>>>>>> make a more intelligent requirement on the write format than Spark
>>>>>>> independently. Somewhat analogous to how the current FileSource does bin
>>>>>>> packing of small files on the read side, restricting parallelism for the
>>>>>>> sake of overhead.
>>>>>>>
>>>>>>> For the second, I wouldn't assume that a data source requiring a
>>>>>>> certain write format would give any guarantees around reading the same
>>>>>>> data? In the cases where it is a complete overwrite it would, but for
>>>>>>> independent writes it could still be useful for statistics or 
>>>>>>> compression.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Pat
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>

Reply via email to