Yep, that sounds reasonable to me! On Fri, Mar 30, 2018 at 5:50 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> +1 > > -------- Original message -------- > From: Ryan Blue <rb...@netflix.com> > Date: 3/30/18 2:28 PM (GMT-08:00) > To: Patrick Woody <patrick.woo...@gmail.com> > Cc: Russell Spitzer <russell.spit...@gmail.com>, Wenchen Fan < > cloud0...@gmail.com>, Ted Yu <yuzhih...@gmail.com>, Spark Dev List < > dev@spark.apache.org> > Subject: Re: DataSourceV2 write input requirements > > You're right. A global sort would change the clustering if it had more > fields than the clustering. > > Then what about this: if there is no RequiredClustering, then the sort is > a global sort. If RequiredClustering is present, then the clustering is > applied and the sort is a partition-level sort. > > That rule would mean that within a partition you always get the sort, but > an explicit clustering overrides the partitioning a sort might try to > introduce. Does that sound reasonable? > > rb > > On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody <patrick.woo...@gmail.com> > wrote: > >> Does that methodology work in this specific case? The ordering must be a >> subset of the clustering to guarantee they exist in the same partition when >> doing a global sort I thought. Though I get the gist that if it does >> satisfy, then there is no reason to not choose the global sort. >> >> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue <rb...@netflix.com> wrote: >> >>> > Can you expand on how the ordering containing the clustering >>> expressions would ensure the global sort? >>> >>> The idea was to basically assume that if the clustering can be satisfied >>> by a global sort, then do the global sort. For example, if the clustering >>> is Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort >>> by columns a, b, and c. >>> >>> Technically, you could do this with a hash partitioner instead of a >>> range partitioner and sort within each partition, but that doesn't make >>> much sense because the partitioning would ensure that each partition has >>> just one combination of the required clustering columns. Using a hash >>> partitioner would make it so that the in-partition sort basically ignores >>> the first few values, so it must be that the intent was a global sort. >>> >>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody <patrick.woo...@gmail.com >>> > wrote: >>> >>>> Right, you could use this to store a global ordering if there is only >>>>> one write (e.g., CTAS). I don’t think anything needs to change in that >>>>> case, you would still have a clustering and an ordering, but the ordering >>>>> would need to include all fields of the clustering. A way to pass in the >>>>> partition ordinal for the source to store would be required. >>>> >>>> >>>> Can you expand on how the ordering containing the clustering >>>> expressions would ensure the global sort? Having an RangePartitioning would >>>> certainly satisfy, but it isn't required - is the suggestion that if Spark >>>> sees this overlap, then it plans a global sort? >>>> >>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer < >>>> russell.spit...@gmail.com> wrote: >>>> >>>>> @RyanBlue I'm hoping that through the CBO effort we will continue to >>>>> get more detailed statistics. Like on read we could be using sketch data >>>>> structures to get estimates on unique values and density for each column. >>>>> You may be right that the real way for this to be handled would be giving >>>>> a >>>>> "cost" back to a higher order optimizer which can decide which method to >>>>> use rather than having the data source itself do it. This is probably in a >>>>> far future version of the api. >>>>> >>>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue <rb...@netflix.com> wrote: >>>>> >>>>>> Cassandra can insert records with the same partition-key faster if >>>>>> they arrive in the same payload. But this is only beneficial if the >>>>>> incoming dataset has multiple entries for the same partition key. >>>>>> >>>>>> Thanks for the example, the recommended partitioning use case makes >>>>>> more sense now. I think we could have two interfaces, a >>>>>> RequiresClustering and a RecommendsClustering if we want to support >>>>>> this. But I’m skeptical it will be useful for two reasons: >>>>>> >>>>>> - Do we want to optimize the low cardinality case? Shuffles are >>>>>> usually much cheaper at smaller sizes, so I’m not sure it is >>>>>> necessary to >>>>>> optimize this away. >>>>>> - How do we know there isn’t just a few partition keys for all >>>>>> the records? It may look like a shuffle wouldn’t help, but we don’t >>>>>> know >>>>>> the partition keys until it is too late. >>>>>> >>>>>> Then there’s also the logic for avoiding the shuffle and how to >>>>>> calculate the cost, which sounds like something that needs some details >>>>>> from CBO. >>>>>> >>>>>> I would assume that given the estimated data size from Spark and >>>>>> options passed in from the user, the data source could make a more >>>>>> intelligent requirement on the write format than Spark independently. >>>>>> >>>>>> This is a good point. >>>>>> >>>>>> What would an implementation actually do here and how would >>>>>> information be passed? For my use cases, the store would produce the >>>>>> number >>>>>> of tasks based on the estimated incoming rows, because the source has the >>>>>> best idea of how the rows will compress. But, that’s just applying a >>>>>> multiplier most of the time. To be very useful, this would have to handle >>>>>> skew in the rows (think row with a type where total size depends on type) >>>>>> and that’s a bit harder. I think maybe an interface that can provide >>>>>> relative cost estimates based on partition keys would be helpful, but >>>>>> then >>>>>> keep the planning logic in Spark. >>>>>> >>>>>> This is probably something that we could add later as we find use >>>>>> cases that require it? >>>>>> >>>>>> I wouldn’t assume that a data source requiring a certain write format >>>>>> would give any guarantees around reading the same data? In the cases >>>>>> where >>>>>> it is a complete overwrite it would, but for independent writes it could >>>>>> still be useful for statistics or compression. >>>>>> >>>>>> Right, you could use this to store a global ordering if there is only >>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that >>>>>> case, you would still have a clustering and an ordering, but the ordering >>>>>> would need to include all fields of the clustering. A way to pass in the >>>>>> partition ordinal for the source to store would be required. >>>>>> >>>>>> For the second point that ordering is useful for statistics and >>>>>> compression, I completely agree. Our best practices doc tells users to >>>>>> always add a global sort when writing because you get the benefit of a >>>>>> range partitioner to handle skew, plus the stats and compression you’re >>>>>> talking about to optimize for reads. I think the proposed API can >>>>>> request a >>>>>> global ordering from Spark already. My only point is that there isn’t >>>>>> much >>>>>> the source can do to guarantee ordering for reads when there is more than >>>>>> one write. >>>>>> >>>>>> >>>>>> On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody < >>>>>> patrick.woo...@gmail.com> wrote: >>>>>> >>>>>>> Spark would always apply the required clustering and sort order >>>>>>>> because they are required by the data source. It is reasonable for a >>>>>>>> source >>>>>>>> to reject data that isn’t properly prepared. For example, data must be >>>>>>>> written to HTable files with keys in order or else the files are >>>>>>>> invalid. >>>>>>>> Sorting should not be implemented in the sources themselves because >>>>>>>> Spark >>>>>>>> handles concerns like spilling to disk. Spark must prepare data >>>>>>>> correctly, >>>>>>>> which is why the interfaces start with “Requires”. >>>>>>> >>>>>>> >>>>>>> This was in reference to Russell's suggestion that the data source >>>>>>> could have a required sort, but only a recommended partitioning. I don't >>>>>>> have an immediate recommending use case that would come to mind though. >>>>>>> I'm >>>>>>> definitely in sync that the data source itself shouldn't do work >>>>>>> outside of >>>>>>> the writes themselves. >>>>>>> >>>>>>> Considering the second use case you mentioned first, I don’t think >>>>>>>> it is a good idea for a table to put requirements on the number of >>>>>>>> tasks >>>>>>>> used for a write. The parallelism should be set appropriately for the >>>>>>>> data >>>>>>>> volume, which is for Spark or the user to determine. A minimum or >>>>>>>> maximum >>>>>>>> number of tasks could cause bad behavior. >>>>>>> >>>>>>> >>>>>>> For your first use case, an explicit global ordering, the problem is >>>>>>>> that there can’t be an explicit global ordering for a table when it is >>>>>>>> populated by a series of independent writes. Each write could have a >>>>>>>> global >>>>>>>> order, but once those files are written, you have to deal with multiple >>>>>>>> sorted data sets. I think it makes sense to focus on order within data >>>>>>>> files, not order between data files. >>>>>>> >>>>>>> >>>>>>> This is where I'm interested in learning about the separation of >>>>>>> responsibilities for the data source and how "smart" it is supposed to >>>>>>> be. >>>>>>> >>>>>>> For the first part, I would assume that given the estimated data >>>>>>> size from Spark and options passed in from the user, the data source >>>>>>> could >>>>>>> make a more intelligent requirement on the write format than Spark >>>>>>> independently. Somewhat analogous to how the current FileSource does bin >>>>>>> packing of small files on the read side, restricting parallelism for the >>>>>>> sake of overhead. >>>>>>> >>>>>>> For the second, I wouldn't assume that a data source requiring a >>>>>>> certain write format would give any guarantees around reading the same >>>>>>> data? In the cases where it is a complete overwrite it would, but for >>>>>>> independent writes it could still be useful for statistics or >>>>>>> compression. >>>>>>> >>>>>>> Thanks >>>>>>> Pat >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com> >>>>>>> wrote: >>>>>>> >>>>>>>>