That might be good to do, but seems like orthogonal to this effort itself. It would be a completely different interface.
On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan <cloud0...@gmail.com> wrote: > OK I agree with it, how about we add a new interface to push down the > query plan, based on the current framework? We can mark the > query-plan-push-down interface as unstable, to save the effort of designing > a stable representation of query plan and maintaining forward compatibility. > > On Wed, Aug 30, 2017 at 10:53 AM, James Baker <j.ba...@outlook.com> wrote: > >> I'll just focus on the one-by-one thing for now - it's the thing that >> blocks me the most. >> >> I think the place where we're most confused here is on the cost of >> determining whether I can push down a filter. For me, in order to work out >> whether I can push down a filter or satisfy a sort, I might have to read >> plenty of data. That said, it's worth me doing this because I can use this >> information to avoid reading >>that much data. >> >> If you give me all the orderings, I will have to read that data many >> times (we stream it to avoid keeping it in memory). >> >> There's also a thing where our typical use cases have many filters (20+ >> is common). So, it's likely not going to work to pass us all the >> combinations. That said, if I can tell you a cost, I know what optimal >> looks like, why can't I just pick that myself? >> >> The current design is friendly to simple datasources, but does not have >> the potential to support this. >> >> So the main problem we have with datasources v1 is that it's essentially >> impossible to leverage a bunch of Spark features - I don't get to use >> bucketing or row batches or all the nice things that I really want to use >> to get decent performance. Provided I can leverage these in a moderately >> supported way which won't break in any given commit, I'll be pretty happy >> with anything that lets me opt out of the restrictions. >> >> My suggestion here is that if you make a mode which works well for >> complicated use cases, you end up being able to write simple mode in terms >> of it very easily. So we could actually provide two APIs, one that lets >> people who have more interesting datasources leverage the cool Spark >> features, and one that lets people who just want to implement basic >> features do that - I'd try to include some kind of layering here. I could >> probably sketch out something here if that'd be useful? >> >> James >> >> On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <cloud0...@gmail.com> wrote: >> >>> Hi James, >>> >>> Thanks for your feedback! I think your concerns are all valid, but we >>> need to make a tradeoff here. >>> >>> > Explicitly here, what I'm looking for is a convenient mechanism to >>> accept a fully specified set of arguments >>> >>> The problem with this approach is: 1) if we wanna add more arguments in >>> the future, it's really hard to do without changing the existing interface. >>> 2) if a user wants to implement a very simple data source, he has to look >>> at all the arguments and understand them, which may be a burden for him. >>> I don't have a solution to solve these 2 problems, comments are welcome. >>> >>> >>> > There are loads of cases like this - you can imagine someone being >>> able to push down a sort before a filter is applied, but not afterwards. >>> However, maybe the filter is so selective that it's better to push down the >>> filter and not handle the sort. I don't get to make this decision, Spark >>> does (but doesn't have good enough information to do it properly, whilst I >>> do). I want to be able to choose the parts I push down given knowledge of >>> my datasource - as defined the APIs don't let me do that, they're strictly >>> more restrictive than the V1 APIs in this way. >>> >>> This is true, the current framework applies push downs one by one, >>> incrementally. If a data source wanna go back to accept a sort push down >>> after it accepts a filter push down, it's impossible with the current data >>> source V2. >>> Fortunately, we have a solution for this problem. At Spark side, >>> actually we do have a fully specified set of arguments waiting to be >>> pushed down, but Spark doesn't know which is the best order to push them >>> into data source. Spark can try every combination and ask the data source >>> to report a cost, then Spark can pick the best combination with the lowest >>> cost. This can also be implemented as a cost report interface, so that >>> advanced data source can implement it for optimal performance, and simple >>> data source doesn't need to care about it and keep simple. >>> >>> >>> The current design is very friendly to simple data source, and has the >>> potential to support complex data source, I prefer the current design over >>> the plan push down one. What do you think? >>> >>> >>> On Wed, Aug 30, 2017 at 5:53 AM, James Baker <j.ba...@outlook.com> >>> wrote: >>> >>>> Yeah, for sure. >>>> >>>> With the stable representation - agree that in the general case this is >>>> pretty intractable, it restricts the modifications that you can do in the >>>> future too much. That said, it shouldn't be as hard if you restrict >>>> yourself to the parts of the plan which are supported by the datasources V2 >>>> API (which after all, need to be translateable properly into the future to >>>> support the mixins proposed). This should have a pretty small scope in >>>> comparison. As long as the user can bail out of nodes they don't >>>> understand, they should be ok, right? >>>> >>>> That said, what would also be fine for us is a place to plug into an >>>> unstable query plan. >>>> >>>> Explicitly here, what I'm looking for is a convenient mechanism to >>>> accept a fully specified set of arguments (of which I can choose to ignore >>>> some), and return the information as to which of them I'm ignoring. Taking >>>> a query plan of sorts is a way of doing this which IMO is intuitive to the >>>> user. It also provides a convenient location to plug in things like stats. >>>> Not at all married to the idea of using a query plan here; it just seemed >>>> convenient. >>>> >>>> Regarding the users who just want to be able to pump data into Spark, >>>> my understanding is that replacing isolated nodes in a query plan is easy. >>>> That said, our goal here is to be able to push down as much as possible >>>> into the underlying datastore. >>>> >>>> To your second question: >>>> >>>> The issue is that if you build up pushdowns incrementally and not all >>>> at once, you end up having to reject pushdowns and filters that you >>>> actually can do, which unnecessarily increases overheads. >>>> >>>> For example, the dataset >>>> >>>> a b c >>>> 1 2 3 >>>> 1 3 3 >>>> 1 3 4 >>>> 2 1 1 >>>> 2 0 1 >>>> >>>> can efficiently push down sort(b, c) if I have already applied the >>>> filter a = 1, but otherwise will force a sort in Spark. On the PR I detail >>>> a case I see where I can push down two equality filters iff I am given them >>>> at the same time, whilst not being able to one at a time. >>>> >>>> There are loads of cases like this - you can imagine someone being able >>>> to push down a sort before a filter is applied, but not afterwards. >>>> However, maybe the filter is so selective that it's better to push down the >>>> filter and not handle the sort. I don't get to make this decision, Spark >>>> does (but doesn't have good enough information to do it properly, whilst I >>>> do). I want to be able to choose the parts I push down given knowledge of >>>> my datasource - as defined the APIs don't let me do that, they're strictly >>>> more restrictive than the V1 APIs in this way. >>>> >>>> The pattern of not considering things that can be done in bulk bites us >>>> in other ways. The retrieval methods end up being trickier to implement >>>> than is necessary because frequently a single operation provides the result >>>> of many of the getters, but the state is mutable, so you end up with odd >>>> caches. >>>> >>>> For example, the work I need to do to answer unhandledFilters in V1 is >>>> roughly the same as the work I need to do to buildScan, so I want to cache >>>> it. This means that I end up with code that looks like: >>>> >>>> public final class CachingFoo implements Foo { >>>> private final Foo delegate; >>>> >>>> private List<Filter> currentFilters = emptyList(); >>>> private Supplier<Bar> barSupplier = newSupplier(currentFilters); >>>> >>>> public CachingFoo(Foo delegate) { >>>> this.delegate = delegate; >>>> } >>>> >>>> private Supplier<Bar> newSupplier(List<Filter> filters) { >>>> return Suppliers.memoize(() -> delegate.computeBar(filters)); >>>> } >>>> >>>> @Override >>>> public Bar computeBar(List<Filter> filters) { >>>> if (!filters.equals(currentFilters)) { >>>> currentFilters = filters; >>>> barSupplier = newSupplier(filters); >>>> } >>>> >>>> return barSupplier.get(); >>>> } >>>> } >>>> >>>> which caches the result required in unhandledFilters on the expectation >>>> that Spark will call buildScan afterwards and get to use the result.. >>>> >>>> This kind of cache becomes more prominent, but harder to deal with in >>>> the new APIs. As one example here, the state I will need in order to >>>> compute accurate column stats internally will likely be a subset of the >>>> work required in order to get the read tasks, tell you if I can handle >>>> filters, etc, so I'll want to cache them for reuse. However, the cached >>>> information needs to be appropriately invalidated when I add a new filter >>>> or sort order or limit, and this makes implementing the APIs harder and >>>> more error-prone. >>>> >>>> One thing that'd be great is a defined contract of the order in which >>>> Spark calls the methods on your datasource (ideally this contract could be >>>> implied by the way the Java class structure works, but otherwise I can just >>>> throw). >>>> >>>> James >>>> >>>> On Tue, 29 Aug 2017 at 02:56 Reynold Xin <r...@databricks.com> wrote: >>>> >>>>> James, >>>>> >>>>> Thanks for the comment. I think you just pointed out a trade-off >>>>> between expressiveness and API simplicity, compatibility and evolvability. >>>>> For the max expressiveness, we'd want the ability to expose full query >>>>> plans, and let the data source decide which part of the query plan can be >>>>> pushed down. >>>>> >>>>> The downside to that (full query plan push down) are: >>>>> >>>>> 1. It is extremely difficult to design a stable representation for >>>>> logical / physical plan. It is doable, but we'd be the first to do it. I'm >>>>> not sure of any mainstream databases being able to do that in the past. >>>>> The >>>>> design of that API itself, to make sure we have a good story for backward >>>>> and forward compatibility, would probably take months if not years. It >>>>> might still be good to do, or offer an experimental trait without >>>>> compatibility guarantee that uses the current Catalyst internal logical >>>>> plan. >>>>> >>>>> 2. Most data source developers simply want a way to offer some data, >>>>> without any pushdown. Having to understand query plans is a burden rather >>>>> than a gift. >>>>> >>>>> >>>>> Re: your point about the proposed v2 being worse than v1 for your use >>>>> case. >>>>> >>>>> Can you say more? You used the argument that in v2 there are more >>>>> support for broader pushdown and as a result it is harder to implement. >>>>> That's how it is supposed to be. If a data source simply implements one of >>>>> the trait, it'd be logically identical to v1. I don't see why it would be >>>>> worse or better, other than v2 provides much stronger forward >>>>> compatibility >>>>> guarantees than v1. >>>>> >>>>> >>>>> On Tue, Aug 29, 2017 at 4:54 AM, James Baker <j.ba...@outlook.com> >>>>> wrote: >>>>> >>>>>> Copying from the code review comments I just submitted on the draft >>>>>> API ( >>>>>> https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745): >>>>>> >>>>>> >>>>>> Context here is that I've spent some time implementing a Spark >>>>>> datasource and have had some issues with the current API which are made >>>>>> worse in V2. >>>>>> >>>>>> The general conclusion I’ve come to here is that this is very hard to >>>>>> actually implement (in a similar but more aggressive way than DataSource >>>>>> V1, because of the extra methods and dimensions we get in V2). >>>>>> >>>>>> In DataSources V1 PrunedFilteredScan, the issue is that you are >>>>>> passed in the filters with the buildScan method, and then passed in again >>>>>> with the unhandledFilters method. >>>>>> >>>>>> However, the filters that you can’t handle might be data dependent, >>>>>> which the current API does not handle well. Suppose I can handle filter A >>>>>> some of the time, and filter B some of the time. If I’m passed in both, >>>>>> then either A and B are unhandled, or A, or B, or neither. The work I >>>>>> have >>>>>> to do to work this out is essentially the same as I have to do while >>>>>> actually generating my RDD (essentially I have to generate my >>>>>> partitions), >>>>>> so I end up doing some weird caching work. >>>>>> >>>>>> This V2 API proposal has the same issues, but perhaps moreso. In >>>>>> PrunedFilteredScan, there is essentially one degree of freedom for >>>>>> pruning >>>>>> (filters), so you just have to implement caching between unhandledFilters >>>>>> and buildScan. However, here we have many degrees of freedom; sorts, >>>>>> individual filters, clustering, sampling, maybe aggregations eventually - >>>>>> and these operations are not all commutative, and computing my support >>>>>> one-by-one can easily end up being more expensive than computing all in >>>>>> one >>>>>> go. >>>>>> >>>>>> For some trivial examples: >>>>>> >>>>>> - After filtering, I might be sorted, whilst before filtering I might >>>>>> not be. >>>>>> >>>>>> - Filtering with certain filters might affect my ability to push down >>>>>> others. >>>>>> >>>>>> - Filtering with aggregations (as mooted) might not be possible to >>>>>> push down. >>>>>> >>>>>> And with the API as currently mooted, I need to be able to go back >>>>>> and change my results because they might change later. >>>>>> >>>>>> Really what would be good here is to pass all of the filters and >>>>>> sorts etc all at once, and then I return the parts I can’t handle. >>>>>> >>>>>> I’d prefer in general that this be implemented by passing some kind >>>>>> of query plan to the datasource which enables this kind of replacement. >>>>>> Explicitly don’t want to give the whole query plan - that sounds painful >>>>>> - >>>>>> would prefer we push down only the parts of the query plan we deem to be >>>>>> stable. With the mix-in approach, I don’t think we can guarantee the >>>>>> properties we want without a two-phase thing - I’d really love to be able >>>>>> to just define a straightforward union type which is our supported >>>>>> pushdown >>>>>> stuff, and then the user can transform and return it. >>>>>> >>>>>> I think this ends up being a more elegant API for consumers, and also >>>>>> far more intuitive. >>>>>> >>>>>> James >>>>>> >>>>>> On Mon, 28 Aug 2017 at 18:00 蒋星博 <jiangxb1...@gmail.com> wrote: >>>>>> >>>>>>> +1 (Non-binding) >>>>>>> >>>>>>> Xiao Li <gatorsm...@gmail.com>于2017年8月28日 周一下午5:38写道: >>>>>>> >>>>>>>> +1 >>>>>>>> >>>>>>>> 2017-08-28 12:45 GMT-07:00 Cody Koeninger <c...@koeninger.org>: >>>>>>>> >>>>>>>>> Just wanted to point out that because the jira isn't labeled SPIP, >>>>>>>>> it >>>>>>>>> won't have shown up linked from >>>>>>>>> >>>>>>>>> http://spark.apache.org/improvement-proposals.html >>>>>>>>> >>>>>>>>> On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <cloud0...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> > Hi all, >>>>>>>>> > >>>>>>>>> > It has been almost 2 weeks since I proposed the data source V2 >>>>>>>>> for >>>>>>>>> > discussion, and we already got some feedbacks on the JIRA ticket >>>>>>>>> and the >>>>>>>>> > prototype PR, so I'd like to call for a vote. >>>>>>>>> > >>>>>>>>> > The full document of the Data Source API V2 is: >>>>>>>>> > >>>>>>>>> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit >>>>>>>>> > >>>>>>>>> > Note that, this vote should focus on high-level >>>>>>>>> design/framework, not >>>>>>>>> > specified APIs, as we can always change/improve specified APIs >>>>>>>>> during >>>>>>>>> > development. >>>>>>>>> > >>>>>>>>> > The vote will be up for the next 72 hours. Please reply with >>>>>>>>> your vote: >>>>>>>>> > >>>>>>>>> > +1: Yeah, let's go forward and implement the SPIP. >>>>>>>>> > +0: Don't really care. >>>>>>>>> > -1: I don't think this is a good idea because of the following >>>>>>>>> technical >>>>>>>>> > reasons. >>>>>>>>> > >>>>>>>>> > Thanks! >>>>>>>>> >>>>>>>>> >>>>>>>>> --------------------------------------------------------------------- >>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>> >>> >