Re: [DISCUSS] SPIP: FunctionCatalog
I don’t think that using Invoke really works. The usability is poor if something goes wrong and it can’t handle varargs or parameterized use cases well. There isn’t a significant enough performance penalty for passing data as a row to justify making the API much more difficult and less expressive. I don’t think that it makes much sense to move forward with the idea. More replies inline. On Tue, Feb 9, 2021 at 2:37 AM Wenchen Fan wrote: > There’s also a massive performance penalty for the Invoke approach when > falling back to non-codegen because the function is loaded and invoked each > time eval is called. It is much cheaper to use a method in an interface. > > Can you elaborate? Using the row parameter or individual parameters > shouldn't change the life cycle of the UDF instance. > The eval method looks up the method and invokes it every time using reflection. That’s quite a bit slower than calling an interface method on an UDF instance. > Should they use String or UTF8String? What representations are supported > and how will Spark detect and produce those representations? > > It's the same as InternalRow. We can just copy-paste the document of > InternalRow to explain the corresponding java type for each data type. > My point is that having a single method signature that uses InternalRow and is inherited from an interface is much easier for users and Spark. If a user forgets to use UTF8String and writes a method with String instead, then the UDF wouldn’t work. What then? Does Spark detect that the wrong type was used? It would need to or else it would be difficult for a UDF developer to tell what is wrong. And this is a runtime issue so it is caught late. -- Ryan Blue
Re: [DISCUSS] SPIP: FunctionCatalog
o believe we have been stuck mainly due to trying to come up with a >>>> better design. We already have an ugly picture of the current Spark APIs to >>>> draw a better bigger picture. >>>> >>>> >>>> 2021년 2월 10일 (수) 오전 3:28, Holden Karau 님이 작성: >>>> >>>>> I think this proposal is a good set of trade-offs and has existed in >>>>> the community for a long period of time. I especially appreciate how the >>>>> design is focused on a minimal useful component, with future optimizations >>>>> considered from a point of view of making sure it's flexible, but actual >>>>> concrete decisions left for the future once we see how this API is used. I >>>>> think if we try and optimize everything right out of the gate, we'll >>>>> quickly get stuck (again) and not make any progress. >>>>> >>>>> On Mon, Feb 8, 2021 at 10:46 AM Ryan Blue wrote: >>>>> >>>>>> Hi everyone, >>>>>> >>>>>> I'd like to start a discussion for adding a FunctionCatalog interface >>>>>> to catalog plugins. This will allow catalogs to expose functions to >>>>>> Spark, >>>>>> similar to how the TableCatalog interface allows a catalog to expose >>>>>> tables. The proposal doc is available here: >>>>>> https://docs.google.com/document/d/1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U/edit >>>>>> >>>>>> Here's a high-level summary of some of the main design choices: >>>>>> * Adds the ability to list and load functions, not to create or >>>>>> modify them in an external catalog >>>>>> * Supports scalar, aggregate, and partial aggregate functions >>>>>> * Uses load and bind steps for better error messages and simpler >>>>>> implementations >>>>>> * Like the DSv2 table read and write APIs, it uses InternalRow to >>>>>> pass data >>>>>> * Can be extended using mix-in interfaces to add vectorization, >>>>>> codegen, and other future features >>>>>> >>>>>> There is also a PR with the proposed API: >>>>>> https://github.com/apache/spark/pull/24559/files >>>>>> >>>>>> Let's discuss the proposal here rather than on that PR, to get better >>>>>> visibility. Also, please take the time to read the proposal first. That >>>>>> really helps clear up misconceptions. >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> >>>>> >>>>> >>>>> -- >>>>> Twitter: https://twitter.com/holdenkarau >>>>> Books (Learning Spark, High Performance Spark, etc.): >>>>> https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> >>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>>>> >>>> -- Ryan Blue
Re: [DISCUSS] SPIP: FunctionCatalog
Thanks for the positive feedback, everyone. It sounds like there is a clear path forward for calling functions. Even without a prototype, the `invoke` plans show that Wenchen's suggested optimization can be done, and incorporating it as an optional extension to this proposal solves many of the unknowns. With that area now understood, is there any discussion about other parts of the proposal, besides the function call interface? On Fri, Feb 12, 2021 at 10:40 PM Chao Sun wrote: > This is an important feature which can unblock several other projects > including bucket join support for DataSource v2, complete support for > enforcing DataSource v2 distribution requirements on the write path, etc. I > like Ryan's proposals which look simple and elegant, with nice support on > function overloading and variadic arguments. On the other hand, I think > Wenchen made a very good point about performance. Overall, I'm excited to > see active discussions on this topic and believe the community will come to > a proposal with the best of both sides. > > Chao > > On Fri, Feb 12, 2021 at 7:58 PM Hyukjin Kwon wrote: > >> +1 for Liang-chi's. >> >> Thanks Ryan and Wenchen for leading this. >> >> >> 2021년 2월 13일 (토) 오후 12:18, Liang-Chi Hsieh 님이 작성: >> >>> Basically I think the proposal makes sense to me and I'd like to support >>> the >>> SPIP as it looks like we have strong need for the important feature. >>> >>> Thanks Ryan for working on this and I do also look forward to Wenchen's >>> implementation. Thanks for the discussion too. >>> >>> Actually I think the SupportsInvoke proposed by Ryan looks a good >>> alternative to me. Besides Wenchen's alternative implementation, is >>> there a >>> chance we also have the SupportsInvoke for comparison? >>> >>> >>> John Zhuge wrote >>> > Excited to see our Spark community rallying behind this important >>> feature! >>> > >>> > The proposal lays a solid foundation of minimal feature set with >>> careful >>> > considerations for future optimizations and extensions. Can't wait to >>> see >>> > it leading to more advanced functionalities like views with shared >>> custom >>> > functions, function pushdown, lambda, etc. It has already borne fruit >>> from >>> > the constructive collaborations in this thread. Looking forward to >>> > Wenchen's prototype and further discussions including the >>> SupportsInvoke >>> > extension proposed by Ryan. >>> > >>> > >>> > On Fri, Feb 12, 2021 at 4:35 PM Owen O'Malley < >>> >>> > owen.omalley@ >>> >>> > > >>> > wrote: >>> > >>> >> I think this proposal is a very good thing giving Spark a standard >>> way of >>> >> getting to and calling UDFs. >>> >> >>> >> I like having the ScalarFunction as the API to call the UDFs. It is >>> >> simple, yet covers all of the polymorphic type cases well. I think it >>> >> would >>> >> also simplify using the functions in other contexts like pushing down >>> >> filters into the ORC & Parquet readers although there are a lot of >>> >> details >>> >> that would need to be considered there. >>> >> >>> >> .. Owen >>> >> >>> >> >>> >> On Fri, Feb 12, 2021 at 11:07 PM Erik Krogen < >>> >>> > ekrogen@.com >>> >>> > > >>> >> wrote: >>> >> >>> >>> I agree that there is a strong need for a FunctionCatalog within >>> Spark >>> >>> to >>> >>> provide support for shareable UDFs, as well as make movement towards >>> >>> more >>> >>> advanced functionality like views which themselves depend on UDFs, >>> so I >>> >>> support this SPIP wholeheartedly. >>> >>> >>> >>> I find both of the proposed UDF APIs to be sufficiently user-friendly >>> >>> and >>> >>> extensible. I generally think Wenchen's proposal is easier for a >>> user to >>> >>> work with in the common case, but has greater potential for confusing >>> >>> and >>> >>> hard-to-debug behavior due to use of reflective method signature >>> >>> searches. >>> &g
Re: [DISCUSS] SPIP: FunctionCatalog
Andrew, The proposal already includes an API for aggregate functions and I think we would want to implement those right away. Processing ColumnBatch is something we can easily extend the interfaces to support, similar to Wenchen's suggestion. The important thing right now is to agree on some basic functionality: how to look up functions and what the simple API should be. Like the TableCatalog interfaces, we will layer on more support through optional interfaces like `SupportsInvoke` or `SupportsColumnBatch`. On Tue, Feb 16, 2021 at 9:00 AM Andrew Melo wrote: > Hello Ryan, > > This proposal looks very interesting. Would future goals for this > functionality include both support for aggregation functions, as well > as support for processing ColumnBatch-es (instead of Row/InternalRow)? > > Thanks > Andrew > > On Mon, Feb 15, 2021 at 12:44 PM Ryan Blue > wrote: > > > > Thanks for the positive feedback, everyone. It sounds like there is a > clear path forward for calling functions. Even without a prototype, the > `invoke` plans show that Wenchen's suggested optimization can be done, and > incorporating it as an optional extension to this proposal solves many of > the unknowns. > > > > With that area now understood, is there any discussion about other parts > of the proposal, besides the function call interface? > > > > On Fri, Feb 12, 2021 at 10:40 PM Chao Sun wrote: > >> > >> This is an important feature which can unblock several other projects > including bucket join support for DataSource v2, complete support for > enforcing DataSource v2 distribution requirements on the write path, etc. I > like Ryan's proposals which look simple and elegant, with nice support on > function overloading and variadic arguments. On the other hand, I think > Wenchen made a very good point about performance. Overall, I'm excited to > see active discussions on this topic and believe the community will come to > a proposal with the best of both sides. > >> > >> Chao > >> > >> On Fri, Feb 12, 2021 at 7:58 PM Hyukjin Kwon > wrote: > >>> > >>> +1 for Liang-chi's. > >>> > >>> Thanks Ryan and Wenchen for leading this. > >>> > >>> > >>> 2021년 2월 13일 (토) 오후 12:18, Liang-Chi Hsieh 님이 작성: > >>>> > >>>> Basically I think the proposal makes sense to me and I'd like to > support the > >>>> SPIP as it looks like we have strong need for the important feature. > >>>> > >>>> Thanks Ryan for working on this and I do also look forward to > Wenchen's > >>>> implementation. Thanks for the discussion too. > >>>> > >>>> Actually I think the SupportsInvoke proposed by Ryan looks a good > >>>> alternative to me. Besides Wenchen's alternative implementation, is > there a > >>>> chance we also have the SupportsInvoke for comparison? > >>>> > >>>> > >>>> John Zhuge wrote > >>>> > Excited to see our Spark community rallying behind this important > feature! > >>>> > > >>>> > The proposal lays a solid foundation of minimal feature set with > careful > >>>> > considerations for future optimizations and extensions. Can't wait > to see > >>>> > it leading to more advanced functionalities like views with shared > custom > >>>> > functions, function pushdown, lambda, etc. It has already borne > fruit from > >>>> > the constructive collaborations in this thread. Looking forward to > >>>> > Wenchen's prototype and further discussions including the > SupportsInvoke > >>>> > extension proposed by Ryan. > >>>> > > >>>> > > >>>> > On Fri, Feb 12, 2021 at 4:35 PM Owen O'Malley < > >>>> > >>>> > owen.omalley@ > >>>> > >>>> > > > >>>> > wrote: > >>>> > > >>>> >> I think this proposal is a very good thing giving Spark a standard > way of > >>>> >> getting to and calling UDFs. > >>>> >> > >>>> >> I like having the ScalarFunction as the API to call the UDFs. It is > >>>> >> simple, yet covers all of the polymorphic type cases well. I think > it > >>>> >> would > >>>> >> also simplify using the functions in other contexts like pushing > down > >>>> >> filt
Re: [DISCUSS] SPIP: FunctionCatalog
Thanks, Hyukjin. I think that's a fair summary. And I agree with the idea that we should focus on what Spark will do by default. I think we should focus on the proposal, for two reasons: first, there is a straightforward path to incorporate Wenchen's suggestion via `SupportsInvoke`, and second, the proposal is more complete: it defines a solution for many concerns like loading a function and finding out what types to use -- not just how to call code -- and supports more use cases like varargs functions. I think we can continue to discuss the rest of the proposal and be confident that we can support an invoke code path where it makes sense. Does everyone agree? If not, I think we would need to solve a lot of the challenges that I initially brought up with the invoke idea. It seems like a good way to call a function, but needs a real proposal behind it if we don't use it via `SupportsInvoke` in the current proposal. On Tue, Feb 16, 2021 at 11:07 PM Hyukjin Kwon wrote: > Just to make sure we don’t move past, I think we haven’t decided yet: > >- if we’ll replace the current proposal to Wenchen’s approach as the >default >- if we want to have Wenchen’s approach as an optional mix-in on the >top of Ryan’s proposal (SupportsInvoke) > > From what I read, some people pointed out it as a replacement. Please > correct me if I misread this discussion thread. > As Dongjoon pointed out, it would be good to know rough ETA to make sure > making progress in this, and people can compare more easily. > > > FWIW, there’s the saying I like in the zen of Python > <https://www.python.org/dev/peps/pep-0020/>: > > There should be one— and preferably only one —obvious way to do it. > > If multiple approaches have the way for developers to do the (almost) same > thing, I would prefer to avoid it. > > In addition, I would prefer to focus on what Spark does by default first. > > > 2021년 2월 17일 (수) 오후 2:33, Dongjoon Hyun 님이 작성: > >> Hi, Wenchen. >> >> This thread seems to get enough attention. Also, I'm expecting more and >> more if we have this on the `master` branch because we are developing >> together. >> >> > Spark SQL has many active contributors/committers and this thread >> doesn't get much attention yet. >> >> So, what's your ETA from now? >> >> > I think the problem here is we were discussing some very detailed >> things without actual code. >> > I'll implement my idea after the holiday and then we can have more >> effective discussions. >> > We can also do benchmarks and get some real numbers. >> > In the meantime, we can continue to discuss other parts of this >> proposal, and make a prototype if possible. >> >> I'm looking forward to seeing your PR. I hope we can conclude this thread >> and have at least one implementation in the `master` branch this month >> (February). >> If you need more time (one month or longer), why don't we have Ryan's >> suggestion in the `master` branch first and benchmark with your PR later >> during Apache Spark 3.2 timeframe. >> >> Bests, >> Dongjoon. >> >> >> On Tue, Feb 16, 2021 at 9:26 AM Ryan Blue >> wrote: >> >>> Andrew, >>> >>> The proposal already includes an API for aggregate functions and I think >>> we would want to implement those right away. >>> >>> Processing ColumnBatch is something we can easily extend the interfaces >>> to support, similar to Wenchen's suggestion. The important thing right now >>> is to agree on some basic functionality: how to look up functions and what >>> the simple API should be. Like the TableCatalog interfaces, we will layer >>> on more support through optional interfaces like `SupportsInvoke` or >>> `SupportsColumnBatch`. >>> >>> On Tue, Feb 16, 2021 at 9:00 AM Andrew Melo >>> wrote: >>> >>>> Hello Ryan, >>>> >>>> This proposal looks very interesting. Would future goals for this >>>> functionality include both support for aggregation functions, as well >>>> as support for processing ColumnBatch-es (instead of Row/InternalRow)? >>>> >>>> Thanks >>>> Andrew >>>> >>>> On Mon, Feb 15, 2021 at 12:44 PM Ryan Blue >>>> wrote: >>>> > >>>> > Thanks for the positive feedback, everyone. It sounds like there is a >>>> clear path forward for calling functions. Even without a prototype, the >>>> `invoke` plans show that Wenchen's suggested optimization can be
Re: [DISCUSS] SPIP: FunctionCatalog
I agree with you that it is better in many cases to directly call a method. But it it not better in all cases, which is why I don’t think it is the right general-purpose choice. First, if codegen isn’t used for some reason, the reflection overhead is really significant. That gets much better when you have an interface to call. That’s one reason I’d use this pattern: class DoubleAdd implements ScalarFunction, SupportsInvoke { Double produceResult(InternalRow row) { return produceResult(row.getDouble(0), row.getDouble(1)); } double produceResult(double left, double right) { return left + right; } } There’s little overhead to adding the InternalRow variation, but we could call it in eval to avoid the reflect overhead. To the point about UDF developers, I think this is a reasonable cost. Second, I think usability is better and helps avoid runtime issues. Here’s an example: class StrLen implements ScalarFunction, SupportsInvoke { Integer produceResult(InternalRow row) { return produceResult(row.getString(0)); } Integer produceResult(String str) { return str.length(); } } See the bug? I forgot to use UTF8String instead of String. With the InternalRow method, I get a compiler warning because getString produces UTF8String that can’t be passed to produceResult(String). If I decided to implement length separately, then we could still run the InternalRow version and log a warning. The code would be slightly slower, but wouldn’t fail. There are similar situations with varargs where it’s better to call methods that produce concrete types than to cast from Object to some expected type. I think that using invoke is a great extension to the proposal, but I don’t think that it should be the only way to call functions. By all means, let’s work on it in parallel and use it where possible. But I think we do need the fallback of using InternalRow and that it isn’t a usability problem to include it. Oh, and one last thought is that we already have users that call Dataset.map and use InternalRow. This would allow converting that code directly to a UDF. I think we’re closer to agreeing here than it actually looks. Hopefully you’ll agree that having the InternalRow method isn’t a big usability problem. On Wed, Feb 17, 2021 at 11:51 PM Wenchen Fan wrote: > I don't see any objections to the rest of the proposal (loading functions > from the catalog, function binding stuff, etc.) and I assume everyone is OK > with it. We can commit that part first. > > Currently, the discussion focuses on the `ScalarFunction` API, where I > think it's better to directly take the input columns as the UDF parameter, > instead of wrapping the input columns with InternalRow and taking the > InternalRow as the UDF parameter. It's not only for better performance, > but also for ease of use. For example, it's easier for the UDF developer to > write `input1 + input2` than `inputRow.getLong(0) + inputRow.getLong(1)`, > as they don't need to specify the type and index by themselves (getLong(0)) > which is error-prone. > > It does push more work to the Spark side, but I think it's worth it if > implementing UDF gets easier. I don't think the work is very challenging, > as we can leverage the infra we built for the expression encoder. > > I think it's also important to look at the UDF API from the user's > perspective (UDF developers). How do you like the UDF API without > considering how Spark can support it? Do you prefer the > individual-parameters version or the row-parameter version? > > To move forward, how about we implement the function loading and binding > first? Then we can have PRs for both the individual-parameters (I can take > it) and row-parameter approaches, if we still can't reach a consensus at > that time and need to see all the details. > > On Thu, Feb 18, 2021 at 4:48 AM Ryan Blue > wrote: > >> Thanks, Hyukjin. I think that's a fair summary. And I agree with the idea >> that we should focus on what Spark will do by default. >> >> I think we should focus on the proposal, for two reasons: first, there is >> a straightforward path to incorporate Wenchen's suggestion via >> `SupportsInvoke`, and second, the proposal is more complete: it defines a >> solution for many concerns like loading a function and finding out what >> types to use -- not just how to call code -- and supports more use cases >> like varargs functions. I think we can continue to discuss the rest of the >> proposal and be confident that we can support an invoke code path where it >> makes sense. >> >> Does everyone agree? If not, I think we would need to solve a lot of the >> challenges that I initially brought up with the invoke idea. It seems like >> a good way to call a function, bu
Re: [DISCUSS] SPIP: FunctionCatalog
I don’t see any benefit to more complexity with 22 additional interfaces, instead of simply passing an InternalRow. Why not use a single interface with InternalRow? Maybe you could share your motivation? That would also result in strange duplication, where the ScalarFunction2 method is just the boxed version: class DoubleAdd implements ScalarFunction2 { @Override Double produceResult(Double left, Double right) { return left + right; } double produceResult(double left, double right) { return left + right; } } This would work okay, but would be awkward if you wanted to use the same implementation for any number of arguments, like a sum method that adds all of the arguments together and returns the result. It also isn’t great for varargs, since it is basically the same as the invoke case. The combination of an InternalRow method and the invoke method seems to be a good way to handle this to me. What is wrong with it? And, how would you solve the problem when implementations define methods with the wrong types? The InternalRow approach helps implementations catch that problem (as demonstrated above) and also provides a fallback when there is a but preventing the invoke optimization from working. That seems like a good approach to me. On Thu, Feb 18, 2021 at 11:31 PM Wenchen Fan wrote: > If people have such a big concern about reflection, we can follow the current > Spark Java UDF > <https://github.com/apache/spark/tree/master/sql/core/src/main/java/org/apache/spark/sql/api/java> > and Transport > <https://github.com/linkedin/transport/tree/master/transportable-udfs-api/src/main/java/com/linkedin/transport/api/udf>, > and create ScalarFuncion0[R], ScalarFuncion1[T1, R], etc. to avoid > reflection. But we may need to investigate how to avoid boxing with this > API design. > > To put a detailed proposal: let's have ScalarFuncion0, ScalarFuncion1, > ..., ScalarFuncion9 and VarargsScalarFunction. At execution time, if > Spark sees ScalarFuncion0-9, pass the input columns to the UDF directly, > one column one parameter. So string type input is UTF8String, array type > input is ArrayData. If Spark sees VarargsScalarFunction, wrap the input > columns with InternalRow and pass it to the UDF. > > In general, if VarargsScalarFunction is implemented, the UDF should not > implement ScalarFuncion0-9. We can also define a priority order to allow > this. I don't have a strong preference here. > > What do you think? > > On Fri, Feb 19, 2021 at 1:24 PM Walaa Eldin Moustafa < > wa.moust...@gmail.com> wrote: > >> I agree with Ryan on the questions around the expressivity of the Invoke >> method. It is not clear to me how the Invoke method can be used to declare >> UDFs with type-parameterized parameters. For example: a UDF to get the Nth >> element of an array (regardless of the Array element type) or a UDF to >> merge two Arrays (of generic types) to a Map. >> >> Also, to address Wenchen's InternalRow question, can we create a number >> of Function classes, each corresponding to a number of input parameter >> length (e.g., ScalarFunction1, ScalarFunction2, etc)? >> >> Thanks, >> Walaa. >> >> >> On Thu, Feb 18, 2021 at 6:07 PM Ryan Blue >> wrote: >> >>> I agree with you that it is better in many cases to directly call a >>> method. But it it not better in all cases, which is why I don’t think it is >>> the right general-purpose choice. >>> >>> First, if codegen isn’t used for some reason, the reflection overhead is >>> really significant. That gets much better when you have an interface to >>> call. That’s one reason I’d use this pattern: >>> >>> class DoubleAdd implements ScalarFunction, SupportsInvoke { >>> Double produceResult(InternalRow row) { >>> return produceResult(row.getDouble(0), row.getDouble(1)); >>> } >>> >>> double produceResult(double left, double right) { >>> return left + right; >>> } >>> } >>> >>> There’s little overhead to adding the InternalRow variation, but we >>> could call it in eval to avoid the reflect overhead. To the point about >>> UDF developers, I think this is a reasonable cost. >>> >>> Second, I think usability is better and helps avoid runtime issues. >>> Here’s an example: >>> >>> class StrLen implements ScalarFunction, SupportsInvoke { >>> Integer produceResult(InternalRow row) { >>> return produceResult(row.getString(0)); >>> } >>> >>> Integer produceResult(String str) { >>> return str.length(); >>> } >>> } >>> >&g
Re: [DISCUSS] SPIP: FunctionCatalog
Thanks for adding your perspective, Erik! If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values. The implementation in GenericInternalRow <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala#L35> would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe. It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe. I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety. I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else. I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure. I believe that Wenchen’s proposal will provide stronger query-compile-time safety The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement. As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2 and ScalarFunction2. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction defeats the purpose. It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow. -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] SPIP: FunctionCatalog
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing The question is not whether to use the magic functions, which would not need boxing. The question here is whether to use multiple ScalarFunction interfaces. Those interfaces would require boxing or using Object[] so there isn’t a benefit. If we do want to reuse one UDF for different types, using “magical methods” solves the problem Yes, that’s correct. We agree that magic methods are a good option for this. Again, the question we need to decide is whether to use InternalRow or interfaces like ScalarFunction2 for non-codegen. The option to use multiple interfaces is limited by type erasure because you can only have one set of type parameters. If you wanted to support both ScalarFunction2 and ScalarFunction2 you’d have to fall back to ScalarFunction2 and cast. The point is that type erasure will commonly lead either to many different implementation classes (one for each type combination) or will lead to parameterizing by Object, which defeats the purpose. The alternative adds safety because correct types are produced by calls like getLong(0). Yes, this depends on the implementation making the correct calls, but it is better than using Object and casting. I can’t think of real use cases that will force the individual-parameters approach to use Object instead of concrete types. I think this is addressed by the type erasure discussion above. A simple Plus method would require Object or 12 implementations for 2 arguments and 4 numeric types. And basically all varargs cases would need to use Object[]. Consider a UDF to create a map that requires string keys and some consistent type for values. This would be easy with the InternalRow API because you can use getString(pos) and get(pos + 1, valueType) to get the key/value pairs. Use of UTF8String vs String will be checked at compile time. I agree that Object[] is worse than InternalRow Yes, and if we are always using Object because of type erasure or using magic methods to get better performance, the utility of the parameterized interfaces is very limited. Because we want to expose the magic functions, the use of ScalarFunction2 and similar is extremely limited because it is only for non-codegen. Varargs is by far the more common case. The InternalRow interface is also a very simple way to get started and ensures that Spark can always find the right method after the function is bound to input types. On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan wrote: > Yes, GenericInternalRow is safe if when type mismatches, with the cost of > using Object[], and primitive types need to do boxing. And this is a > runtime failure, which is absolutely worse than query-compile-time checks. > Also, don't forget my previous point: users need to specify the type and > index such as row.getLong(0), which is error-prone. > > > But we don’t do that for any of the similar UDFs today so I’m skeptical > that this would actually be a high enough priority to implement. > > I'd say this is a must-have if we go with the individual-parameters > approach. The Scala UDF today checks the method signature at compile-time, > thanks to the Scala type tag. The Java UDF today doesn't check and is hard > to use. > > > You can’t implement ScalarFunction2 and > ScalarFunction2. > > Can you elaborate? We have function binding and we can use different UDFs > for different input types. If we do want to reuse one UDF > for different types, using "magical methods" solves the problem: > class MyUDF { > def call(i: Int): Int = ... > def call(l: Long): Long = ... > } > > On the other side, I don't think the row-parameter approach can solve this > problem. The best I can think of is: > class MyUDF implement ScalaFunction[Object] { > def call(row: InternalRow): Object = { > if (int input) row.getInt(0) ... else row.getLong(0) ... > } > } > > This is worse because: 1) it needs to do if-else to check different input > types. 2) the return type can only be Object and cause boxing issues. > > I agree that Object[] is worse than InternalRow. But I can't think of > real use cases that will force the individual-parameters approach to use > Object instead of concrete types. > > > On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue wrote: > >> Thanks for adding your perspective, Erik! >> >> If the input is string type but the UDF implementation calls >> row.getLong(0), it returns wrong data >> >> I think this is misleading. It is true for UnsafeRow, but there is no >> reason why InternalRow should return incorrect values. >> >> The implementation in GenericInternalRow >> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/sp
Re: [DISCUSS] SPIP: FunctionCatalog
Good point, Dongjoon. I think we can probably come to some compromise here: - Remove SupportsInvoke since it isn’t really needed. We should always try to find the right method to invoke in the codegen path. - Add a default implementation of produceResult so that implementations don’t have to use it. If they don’t implement it and a magic function can’t be found, then it will throw UnsupportedOperationException This is assuming that we can agree not to introduce all of the ScalarFunction interface variations, which would have limited utility because of type erasure. Does that sound like a good plan to everyone? If so, I’ll update the SPIP doc so we can move forward. On Wed, Mar 3, 2021 at 4:36 PM Dongjoon Hyun wrote: > Hi, All. > > We shared many opinions in different perspectives. > However, we didn't reach a consensus even on a partial merge by excluding > something > (on the PR by me, on this mailing thread by Wenchen). > > For the following claims, we have another alternative to mitigate it. > > > I don't like it because it promotes the row-parameter API and forces > users to implement it, even if the users want to only use the > individual-parameters API. > > Why don't we merge the AS-IS PR by adding something instead of excluding > something? > > - R produceResult(InternalRow input); > + default R produceResult(InternalRow input) throws Exception { > + throw new UnsupportedOperationException(); > + } > > By providing the default implementation, it will not *forcing users to > implement it* technically. > And, we can provide a document about our expected usage properly. > What do you think? > > Bests, > Dongjoon. > > > > On Wed, Mar 3, 2021 at 10:28 AM Ryan Blue wrote: > >> Yes, GenericInternalRow is safe if when type mismatches, with the cost of >> using Object[], and primitive types need to do boxing >> >> The question is not whether to use the magic functions, which would not >> need boxing. The question here is whether to use multiple ScalarFunction >> interfaces. Those interfaces would require boxing or using Object[] so >> there isn’t a benefit. >> >> If we do want to reuse one UDF for different types, using “magical >> methods” solves the problem >> >> Yes, that’s correct. We agree that magic methods are a good option for >> this. >> >> Again, the question we need to decide is whether to use InternalRow or >> interfaces like ScalarFunction2 for non-codegen. The option to use >> multiple interfaces is limited by type erasure because you can only have >> one set of type parameters. If you wanted to support both >> ScalarFunction2> Integer> and ScalarFunction2 you’d have to fall back to >> ScalarFunction2> Object> and cast. >> >> The point is that type erasure will commonly lead either to many >> different implementation classes (one for each type combination) or will >> lead to parameterizing by Object, which defeats the purpose. >> >> The alternative adds safety because correct types are produced by calls >> like getLong(0). Yes, this depends on the implementation making the >> correct calls, but it is better than using Object and casting. >> >> I can’t think of real use cases that will force the individual-parameters >> approach to use Object instead of concrete types. >> >> I think this is addressed by the type erasure discussion above. A simple >> Plus method would require Object or 12 implementations for 2 arguments >> and 4 numeric types. >> >> And basically all varargs cases would need to use Object[]. Consider a >> UDF to create a map that requires string keys and some consistent type for >> values. This would be easy with the InternalRow API because you can use >> getString(pos) and get(pos + 1, valueType) to get the key/value pairs. >> Use of UTF8String vs String will be checked at compile time. >> >> I agree that Object[] is worse than InternalRow >> >> Yes, and if we are always using Object because of type erasure or using >> magic methods to get better performance, the utility of the parameterized >> interfaces is very limited. >> >> Because we want to expose the magic functions, the use of ScalarFunction2 >> and similar is extremely limited because it is only for non-codegen. >> Varargs is by far the more common case. The InternalRow interface is >> also a very simple way to get started and ensures that Spark can always >> find the right method after the function is bound to input types. >> >> On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan wrote: >> >>> Yes, GenericInternalR
Re: [DISCUSS] SPIP: FunctionCatalog
Okay, great. I'll update the SPIP doc and call a vote in the next day or two. On Thu, Mar 4, 2021 at 8:26 AM Erik Krogen wrote: > +1 on Dongjoon's proposal. This is a very nice compromise between the > reflective/magic-method approach and the InternalRow approach, providing > a lot of flexibility for our users, and allowing for the more complicated > reflection-based approach to evolve at its own pace, since you can always > fall back to InternalRow for situations which aren't yet supported by > reflection. > > We can even consider having Spark code detect that you haven't overridden > the default produceResult (IIRC this is discoverable via reflection), and > raise an error at query analysis time instead of at runtime when it can't > find a reflective method or an overridden produceResult. > > I'm very pleased we have found a compromise that everyone seems happy > with! Big thanks to everyone who participated. > > On Wed, Mar 3, 2021 at 8:34 PM John Zhuge wrote: > >> +1 Good plan to move forward. >> >> Thank you all for the constructive and comprehensive discussions in this >> thread! Decisions on this important feature will have ramifications for >> years to come. >> >> On Wed, Mar 3, 2021 at 7:42 PM Wenchen Fan wrote: >> >>> +1 to this proposal. If people don't like the ScalarFunction0,1, ... >>> variants and prefer the "magical methods", then we can have a single >>> ScalarFunction interface which has the row-parameter API (with a >>> default implementation to fail) and documents to describe the "magical >>> methods" (which can be done later). >>> >>> I'll start the PR review this week to check the naming, doc, etc. >>> >>> Thanks all for the discussion here and let's move forward! >>> >>> On Thu, Mar 4, 2021 at 9:53 AM Ryan Blue wrote: >>> >>>> Good point, Dongjoon. I think we can probably come to some compromise >>>> here: >>>> >>>>- Remove SupportsInvoke since it isn’t really needed. We should >>>>always try to find the right method to invoke in the codegen path. >>>>- Add a default implementation of produceResult so that >>>>implementations don’t have to use it. If they don’t implement it and a >>>>magic function can’t be found, then it will throw >>>>UnsupportedOperationException >>>> >>>> This is assuming that we can agree not to introduce all of the >>>> ScalarFunction interface variations, which would have limited utility >>>> because of type erasure. >>>> >>>> Does that sound like a good plan to everyone? If so, I’ll update the >>>> SPIP doc so we can move forward. >>>> >>>> On Wed, Mar 3, 2021 at 4:36 PM Dongjoon Hyun >>>> wrote: >>>> >>>>> Hi, All. >>>>> >>>>> We shared many opinions in different perspectives. >>>>> However, we didn't reach a consensus even on a partial merge by >>>>> excluding something >>>>> (on the PR by me, on this mailing thread by Wenchen). >>>>> >>>>> For the following claims, we have another alternative to mitigate it. >>>>> >>>>> > I don't like it because it promotes the row-parameter API and >>>>> forces users to implement it, even if the users want to only use the >>>>> individual-parameters API. >>>>> >>>>> Why don't we merge the AS-IS PR by adding something instead of >>>>> excluding something? >>>>> >>>>> - R produceResult(InternalRow input); >>>>> + default R produceResult(InternalRow input) throws Exception { >>>>> + throw new UnsupportedOperationException(); >>>>> + } >>>>> >>>>> By providing the default implementation, it will not *forcing users to >>>>> implement it* technically. >>>>> And, we can provide a document about our expected usage properly. >>>>> What do you think? >>>>> >>>>> Bests, >>>>> Dongjoon. >>>>> >>>>> >>>>> >>>>> On Wed, Mar 3, 2021 at 10:28 AM Ryan Blue wrote: >>>>> >>>>>> Yes, GenericInternalRow is safe if when type mismatches, with the >>>>>> cost of using Object[], and primitive types need to do boxing >>>>>> >>>
[VOTE] SPIP: Add FunctionCatalog
Hi everyone, I’d like to start a vote for the FunctionCatalog design proposal (SPIP). The proposal is to add a FunctionCatalog interface that can be used to load and list functions for Spark to call. There are interfaces for scalar and aggregate functions. In the discussion we’ve come to consensus and I’ve updated the design doc to match how functions will be called: In addition to produceResult(InternalRow), which is optional, functions can define produceResult methods with arguments that are Spark’s internal data types, like UTF8String. Spark will prefer these methods when calling the UDF using codgen. I’ve also updated the AggregateFunction interface and merged it with the partial aggregate interface because Spark doesn’t support non-partial aggregates. The full SPIP doc is here: https://docs.google.com/document/d/1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U/edit#heading=h.82w8qxfl2uwl Please vote on the SPIP in the next 72 hours. Once it is approved, I’ll do a final update of the PR and we can merge the API. [ ] +1: Accept the proposal as an official SPIP [ ] +0 [ ] -1: I don’t think this is a good idea because … -- Ryan Blue
Re: [VOTE] SPIP: Add FunctionCatalog
And a late +1 from me. On Fri, Mar 12, 2021 at 5:46 AM Takeshi Yamamuro wrote: > +1, too. > > On Fri, Mar 12, 2021 at 8:51 PM kordex wrote: > >> +1 (for what it's worth). It will definitely help our efforts. >> >> On Fri, Mar 12, 2021 at 12:14 PM Gengliang Wang wrote: >> > >> > +1 (non-binding) >> > >> > On Fri, Mar 12, 2021 at 3:00 PM Hyukjin Kwon >> wrote: >> >> >> >> +1 >> >> >> >> 2021년 3월 12일 (금) 오후 2:54, Jungtaek Lim 님이 >> 작성: >> >>> >> >>> +1 (non-binding) Excellent description on SPIP doc! Thanks for the >> amazing effort! >> >>> >> >>> On Wed, Mar 10, 2021 at 3:19 AM Liang-Chi Hsieh >> wrote: >> >>>> >> >>>> >> >>>> +1 (non-binding). >> >>>> >> >>>> Thanks for the work! >> >>>> >> >>>> >> >>>> Erik Krogen wrote >> >>>> > +1 from me (non-binding) >> >>>> > >> >>>> > On Tue, Mar 9, 2021 at 9:27 AM huaxin gao < >> >>>> >> >>>> > huaxin.gao11@ >> >>>> >> >>>> > > wrote: >> >>>> > >> >>>> >> +1 (non-binding) >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> -- >> >>>> Sent from: >> http://apache-spark-developers-list.1001551.n3.nabble.com/ >> >>>> >> >>>> - >> >>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> >>>> >> >> - >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> >> > > -- > --- > Takeshi Yamamuro > -- Ryan Blue Software Engineer Netflix
[RESULT] [VOTE] SPIP: Add FunctionCatalog
This SPIP is adopted with the following +1 votes and no -1 or +0 votes: Holden Karau* John Zhuge Chao Sun Dongjoon Hyun* Russell Spitzer DB Tsai* Wenchen Fan* Kent Yao Huaxin Gao Liang-Chi Hsieh Jungtaek Lim Hyukjin Kwon* Gengliang Wang kordex Takeshi Yamamuro Ryan Blue * = binding On Mon, Mar 8, 2021 at 3:55 PM Ryan Blue wrote: > Hi everyone, I’d like to start a vote for the FunctionCatalog design > proposal (SPIP). > > The proposal is to add a FunctionCatalog interface that can be used to > load and list functions for Spark to call. There are interfaces for scalar > and aggregate functions. > > In the discussion we’ve come to consensus and I’ve updated the design doc > to match how functions will be called: > > In addition to produceResult(InternalRow), which is optional, functions > can define produceResult methods with arguments that are Spark’s internal > data types, like UTF8String. Spark will prefer these methods when calling > the UDF using codgen. > > I’ve also updated the AggregateFunction interface and merged it with the > partial aggregate interface because Spark doesn’t support non-partial > aggregates. > > The full SPIP doc is here: > https://docs.google.com/document/d/1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U/edit#heading=h.82w8qxfl2uwl > > Please vote on the SPIP in the next 72 hours. Once it is approved, I’ll do > a final update of the PR and we can merge the API. > > [ ] +1: Accept the proposal as an official SPIP > [ ] +0 > [ ] -1: I don’t think this is a good idea because … > -- > Ryan Blue > -- Ryan Blue
Re: [VOTE] SPIP: Catalog API for view metadata
I don't think that it makes sense to discuss a different approach in the PR rather than in the vote. Let's discuss this now since that's the purpose of an SPIP. On Mon, May 24, 2021 at 11:22 AM John Zhuge wrote: > Hi everyone, I’d like to start a vote for the ViewCatalog design proposal > (SPIP). > > The proposal is to add a ViewCatalog interface that can be used to load, > create, alter, and drop views in DataSourceV2. > > The full SPIP doc is here: > https://docs.google.com/document/d/1XOxFtloiMuW24iqJ-zJnDzHl2KMxipTjJoxleJFz66A/edit?usp=sharing > > Please vote on the SPIP in the next 72 hours. Once it is approved, I’ll > update the PR for review. > > [ ] +1: Accept the proposal as an official SPIP > [ ] +0 > [ ] -1: I don’t think this is a good idea because … > -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2
+1 from me as well. Thanks Chao for doing so much to get it to this point! On Sat, Oct 23, 2021 at 11:29 PM DB Tsai wrote: > +1 on this SPIP. > > This is a more generalized version of bucketed tables and bucketed > joins which can eliminate very expensive data shuffles when joins, and > many users in the Apache Spark community have wanted this feature for > a long time! > > Thank you, Ryan and Chao, for working on this, and I look forward to > it as a new feature in Spark 3.3 > > DB Tsai | https://www.dbtsai.com/ | PGP 42E5B25A8F7A82C1 > > On Fri, Oct 22, 2021 at 12:18 PM Chao Sun wrote: > > > > Hi, > > > > Ryan and I drafted a design doc to support a new type of join: storage > partitioned join which covers bucket join support for DataSourceV2 but is > more general. The goal is to let Spark leverage distribution properties > reported by data sources and eliminate shuffle whenever possible. > > > > Design doc: > https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE > (includes a POC link at the end) > > > > We'd like to start a discussion on the doc and any feedback is welcome! > > > > Thanks, > > Chao > -- Ryan Blue
Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2
Instead of commenting on the doc, could we keep discussion here on the dev list please? That way more people can follow it and there is more room for discussion. Comment threads have a very small area and easily become hard to follow. Ryan On Tue, Oct 26, 2021 at 9:32 AM John Zhuge wrote: > +1 Nicely done! > > On Tue, Oct 26, 2021 at 8:08 AM Chao Sun wrote: > >> Oops, sorry. I just fixed the permission setting. >> >> Thanks everyone for the positive support! >> >> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan wrote: >> >>> +1 to this SPIP and nice writeup of the design doc! >>> >>> Can we open comment permission in the doc so that we can discuss details >>> there? >>> >>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon >>> wrote: >>> >>>> Seems making sense to me. >>>> >>>> Would be great to have some feedback from people such as @Wenchen Fan >>>> @Cheng Su @angers zhu >>>> . >>>> >>>> >>>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun >>>> wrote: >>>> >>>>> +1 for this SPIP. >>>>> >>>>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao >>>>> wrote: >>>>> >>>>>> +1. Thanks for lifting the current restrictions on bucket join and >>>>>> making this more generalized. >>>>>> >>>>>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue wrote: >>>>>> >>>>>>> +1 from me as well. Thanks Chao for doing so much to get it to this >>>>>>> point! >>>>>>> >>>>>>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai wrote: >>>>>>> >>>>>>>> +1 on this SPIP. >>>>>>>> >>>>>>>> This is a more generalized version of bucketed tables and bucketed >>>>>>>> joins which can eliminate very expensive data shuffles when joins, >>>>>>>> and >>>>>>>> many users in the Apache Spark community have wanted this feature >>>>>>>> for >>>>>>>> a long time! >>>>>>>> >>>>>>>> Thank you, Ryan and Chao, for working on this, and I look forward to >>>>>>>> it as a new feature in Spark 3.3 >>>>>>>> >>>>>>>> DB Tsai | https://www.dbtsai.com/ | PGP 42E5B25A8F7A82C1 >>>>>>>> >>>>>>>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > Hi, >>>>>>>> > >>>>>>>> > Ryan and I drafted a design doc to support a new type of join: >>>>>>>> storage partitioned join which covers bucket join support for >>>>>>>> DataSourceV2 >>>>>>>> but is more general. The goal is to let Spark leverage distribution >>>>>>>> properties reported by data sources and eliminate shuffle whenever >>>>>>>> possible. >>>>>>>> > >>>>>>>> > Design doc: >>>>>>>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE >>>>>>>> (includes a POC link at the end) >>>>>>>> > >>>>>>>> > We'd like to start a discussion on the doc and any feedback is >>>>>>>> welcome! >>>>>>>> > >>>>>>>> > Thanks, >>>>>>>> > Chao >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Ryan Blue >>>>>>> >>>>>> > > -- > John Zhuge > -- Ryan Blue
Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2
& >>> ORC file format only), and legacy Hive code path (HiveTableScanExec). In >>> the SPIP, I am seeing we only make change for data source v2, so wondering >>> how this would work with existing Hive table read path. In addition, just >>> FYI, supporting writing Hive bucketed table is merged in master recently ( >>> SPARK-19256 <https://issues.apache.org/jira/browse/SPARK-19256> has >>> details). >>> >>> >>> >>> 1. Would aggregate work automatically after the SPIP? >>> >>> >>> >>> Another major benefit for having bucketed table, is to avoid shuffle >>> before aggregate. Just want to bring to our attention that it would be >>> great to consider aggregate as well when doing this proposal. >>> >>> >>> >>>1. Any major use cases in mind except Hive bucketed table? >>> >>> >>> >>> Just curious if there’s any other use cases we are targeting as part of >>> SPIP. >>> >>> >>> >>> Thanks, >>> >>> Cheng Su >>> >>> >>> >>> >>> >>> >>> >>> *From: *Ryan Blue >>> *Date: *Tuesday, October 26, 2021 at 9:39 AM >>> *To: *John Zhuge >>> *Cc: *Chao Sun , Wenchen Fan , >>> Cheng Su , DB Tsai , Dongjoon Hyun < >>> dongjoon.h...@gmail.com>, Hyukjin Kwon , Wenchen >>> Fan , angers zhu , dev < >>> dev@spark.apache.org>, huaxin gao >>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source >>> V2 >>> >>> Instead of commenting on the doc, could we keep discussion here on the >>> dev list please? That way more people can follow it and there is more room >>> for discussion. Comment threads have a very small area and easily become >>> hard to follow. >>> >>> >>> >>> Ryan >>> >>> >>> >>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge wrote: >>> >>> +1 Nicely done! >>> >>> >>> >>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun wrote: >>> >>> Oops, sorry. I just fixed the permission setting. >>> >>> >>> >>> Thanks everyone for the positive support! >>> >>> >>> >>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan wrote: >>> >>> +1 to this SPIP and nice writeup of the design doc! >>> >>> >>> >>> Can we open comment permission in the doc so that we can discuss details >>> there? >>> >>> >>> >>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon >>> wrote: >>> >>> Seems making sense to me. >>> >>> Would be great to have some feedback from people such as @Wenchen Fan >>> @Cheng Su @angers zhu >>> . >>> >>> >>> >>> >>> >>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun >>> wrote: >>> >>> +1 for this SPIP. >>> >>> >>> >>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao >>> wrote: >>> >>> +1. Thanks for lifting the current restrictions on bucket join and >>> making this more generalized. >>> >>> >>> >>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue wrote: >>> >>> +1 from me as well. Thanks Chao for doing so much to get it to this >>> point! >>> >>> >>> >>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai wrote: >>> >>> +1 on this SPIP. >>> >>> This is a more generalized version of bucketed tables and bucketed >>> joins which can eliminate very expensive data shuffles when joins, and >>> many users in the Apache Spark community have wanted this feature for >>> a long time! >>> >>> Thank you, Ryan and Chao, for working on this, and I look forward to >>> it as a new feature in Spark 3.3 >>> >>> DB Tsai | https://www.dbtsai.com/ | PGP 42E5B25A8F7A82C1 >>> >>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun wrote: >>> > >>> > Hi, >>> > >>> > Ryan and I drafted a design doc to support a new type of join: storage >>> partitioned join which covers bucket join support for DataSourceV2 but is >>> more general. The goal is to let Spark leverage distribution properties >>> reported by data sources and eliminate shuffle whenever possible. >>> > >>> > Design doc: >>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE >>> (includes a POC link at the end) >>> > >>> > We'd like to start a discussion on the doc and any feedback is welcome! >>> > >>> > Thanks, >>> > Chao >>> >>> >>> >>> >>> -- >>> >>> Ryan Blue >>> >>> >>> >>> >>> -- >>> >>> John Zhuge >>> >>> >>> >>> >>> -- >>> >>> Ryan Blue >>> >> -- Ryan Blue
Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2
The transform expressions in v2 are logical, not concrete implementations. Even days may have different implementations -- the only expectation is that the partitions are day-sized. For example, you could use a transform that splits days at UTC 00:00, or uses some other day boundary. Because the expressions are logical, we need to resolve them to implementations at some point, like Chao outlines. We can do that using a FunctionCatalog, although I think it's worth considering adding an interface so that a transform from a Table can be converted into a `BoundFunction` directly. That is easier than defining a way for Spark to query the function catalog. In any case, I'm sure it's easy to understand how this works once you get a concrete implementation. On Wed, Oct 27, 2021 at 9:35 AM Wenchen Fan wrote: > `BucketTransform` is a builtin partition transform in Spark, instead of a > UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to > represent its bucket transform, or use the Spark builtin `BucketTransform`? > I'm asking this because other v2 sources may also use the builtin > `BucketTransform` but use a different bucket hash function. Or we can > clearly define the bucket hash function of the builtin `BucketTransform` in > the doc. > > On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue wrote: > >> Two v2 sources may return different bucket IDs for the same value, and >> this breaks the phase 1 split-wise join. >> >> This is why the FunctionCatalog included a canonicalName method (docs >> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/BoundFunction.java#L81-L96>). >> That method returns an identifier that can be used to compare whether two >> bucket function instances are the same. >> >> >>1. Can we apply this idea to partitioned file source tables >>(non-bucketed) as well? >> >> What do you mean here? The design doc discusses transforms like days(ts) >> that can be supported in the future. Is that what you’re asking about? Or >> are you referring to v1 file sources? I think the goal is to support v2, >> since v1 doesn’t have reliable behavior. >> >> Note that the initial implementation goal is to support bucketing since >> that’s an easier case because both sides have the same number of >> partitions. More complex storage-partitioned joins can be implemented later. >> >> >>1. What if the table has many partitions? Shall we apply certain join >>algorithms in the phase 1 split-wise join as well? Or even launch a Spark >>job to do so? >> >> I think that this proposal opens up a lot of possibilities, like what >> you’re suggesting here. It is a bit like AQE. We’ll need to come up with >> heuristics for choosing how and when to use storage partitioning in joins. >> As I said above, bucketing is a great way to get started because it fills >> an existing gap. More complex use cases can be supported over time. >> >> Ryan >> >> On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan wrote: >> >>> IIUC, the general idea is to let each input split report its partition >>> value, and Spark can perform the join in two phases: >>> 1. join the input splits from left and right tables according to their >>> partitions values and join keys, at the driver side. >>> 2. for each joined input splits pair (or a group of splits), launch a >>> Spark task to join the rows. >>> >>> My major concern is about how to define "compatible partitions". Things >>> like `days(ts)` are straightforward: the same timestamp value always >>> results in the same partition value, in whatever v2 sources. `bucket(col, >>> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2 >>> sources may return different bucket IDs for the same value, and this breaks >>> the phase 1 split-wise join. >>> >>> And two questions for further improvements: >>> 1. Can we apply this idea to partitioned file source tables >>> (non-bucketed) as well? >>> 2. What if the table has many partitions? Shall we apply certain join >>> algorithms in the phase 1 split-wise join as well? Or even launch a Spark >>> job to do so? >>> >>> Thanks, >>> Wenchen >>> >>> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun wrote: >>> >>>> Thanks Cheng for the comments. >>>> >>>> > Is migrating Hive table read path to data source v2, being a >>>> prerequisite of this SPIP >>>> >>>> Yes, this SPI
Re: [VOTE] SPIP: Storage Partitioned Join for Data Source V2
+1 On Fri, Oct 29, 2021 at 11:06 AM huaxin gao wrote: > +1 > > On Fri, Oct 29, 2021 at 10:59 AM Dongjoon Hyun > wrote: > >> +1 >> >> Dongjoon >> >> On 2021/10/29 17:48:59, Russell Spitzer >> wrote: >> > +1 This is a great idea, (I have no Apache Spark voting points) >> > >> > On Fri, Oct 29, 2021 at 12:41 PM L. C. Hsieh wrote: >> > >> > > >> > > I'll start with my +1. >> > > >> > > On 2021/10/29 17:30:03, L. C. Hsieh wrote: >> > > > Hi all, >> > > > >> > > > I’d like to start a vote for SPIP: Storage Partitioned Join for Data >> > > Source V2. >> > > > >> > > > The proposal is to support a new type of join: storage partitioned >> join >> > > which >> > > > covers bucket join support for DataSourceV2 but is more general. >> The goal >> > > > is to let Spark leverage distribution properties reported by data >> > > sources and >> > > > eliminate shuffle whenever possible. >> > > > >> > > > Please also refer to: >> > > > >> > > >- Previous discussion in dev mailing list: [DISCUSS] SPIP: >> Storage >> > > Partitioned Join for Data Source V2 >> > > >< >> > > >> https://lists.apache.org/thread.html/r7dc67c3db280a8b2e65855cb0b1c86b524d4e6ae1ed9db9ca12cb2e6%40%3Cdev.spark.apache.org%3E >> > > > >> > > >. >> > > >- JIRA: SPARK-37166 < >> > > https://issues.apache.org/jira/browse/SPARK-37166> >> > > >- Design doc < >> > > >> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE >> > >> > > >> > > > >> > > > Please vote on the SPIP for the next 72 hours: >> > > > >> > > > [ ] +1: Accept the proposal as an official SPIP >> > > > [ ] +0 >> > > > [ ] -1: I don’t think this is a good idea because … >> > > > >> > > > >> - >> > > > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> > > > >> > > > >> > > >> > > - >> > > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> > > >> > > >> > >> >> - >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> >> -- Ryan Blue Tabular
Re: [VOTE] SPIP: Row-level operations in Data Source V2
+1 Thanks to Anton for all this great work! On Sat, Nov 13, 2021 at 8:24 AM Mich Talebzadeh wrote: > +1 non-binding > > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 13 Nov 2021 at 15:07, Russell Spitzer > wrote: > >> +1 (never binding) >> >> On Sat, Nov 13, 2021 at 1:10 AM Dongjoon Hyun >> wrote: >> >>> +1 >>> >>> On Fri, Nov 12, 2021 at 6:58 PM huaxin gao >>> wrote: >>> >>>> +1 >>>> >>>> On Fri, Nov 12, 2021 at 6:44 PM Yufei Gu >>>> wrote: >>>> >>>>> +1 >>>>> >>>>> > On Nov 12, 2021, at 6:25 PM, L. C. Hsieh wrote: >>>>> > >>>>> > Hi all, >>>>> > >>>>> > I’d like to start a vote for SPIP: Row-level operations in Data >>>>> Source V2. >>>>> > >>>>> > The proposal is to add support for executing row-level operations >>>>> > such as DELETE, UPDATE, MERGE for v2 tables (SPARK-35801). The >>>>> > execution should be the same across data sources and the best way to >>>>> do >>>>> > that is to implement it in Spark. >>>>> > >>>>> > Right now, Spark can only parse and to some extent analyze DELETE, >>>>> UPDATE, >>>>> > MERGE commands. Data sources that support row-level changes have to >>>>> build >>>>> > custom Spark extensions to execute such statements. The goal of this >>>>> effort >>>>> > is to come up with a flexible and easy-to-use API that will work >>>>> across >>>>> > data sources. >>>>> > >>>>> > Please also refer to: >>>>> > >>>>> > - Previous discussion in dev mailing list: [DISCUSS] SPIP: >>>>> > Row-level operations in Data Source V2 >>>>> > <https://lists.apache.org/thread/kd8qohrk5h3qx8d6y4lhrm67vnn8p6bv> >>>>> > >>>>> > - JIRA: SPARK-35801 < >>>>> https://issues.apache.org/jira/browse/SPARK-35801> >>>>> > - PR for handling DELETE statements: >>>>> > <https://github.com/apache/spark/pull/33008> >>>>> > >>>>> > - Design doc >>>>> > < >>>>> https://docs.google.com/document/d/12Ywmc47j3l2WF4anG5vL4qlrhT2OKigb7_EbIKhxg60/ >>>>> > >>>>> > >>>>> > Please vote on the SPIP for the next 72 hours: >>>>> > >>>>> > [ ] +1: Accept the proposal as an official SPIP >>>>> > [ ] +0 >>>>> > [ ] -1: I don’t think this is a good idea because … >>>>> > >>>>> > - >>>>> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>> > >>>>> >>>>> >>>>> - >>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>> >>>>> -- Ryan Blue Tabular
Re: Supports Dynamic Table Options for Spark SQL
The proposed feature is to be able to pass options through SQL like you would when using the DataFrameReader API, so it would work for all sources that support read options. Read options are part of the DSv2 API, there just isn’t a way to pass options when using SQL. The PR also has a non-Iceberg example, which is being able to customize some JDBC source behaviors per query (e.g., fetchSize), rather than globally in the table’s options. The proposed syntax is odd, but I think that's an artifact of Spark introducing read options that aren't a normal part of SQL. Seems reasonable to me to pass them through a hint. On Mon, Nov 15, 2021 at 2:18 AM Mich Talebzadeh wrote: > Interesting. > > What is this going to add on top of support for Apache Iceberg > <https://www.dremio.com/data-lake/apache-iceberg/>. Will it be in line > with support for Hive ACID tables or Delta Lake? > > HTH > > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 15 Nov 2021 at 01:56, Zhun Wang wrote: > >> Hi dev, >> >> We are discussing Support Dynamic Table Options for Spark SQL ( >> https://github.com/apache/spark/pull/34072). It is currently not sure if >> the syntax makes sense, and would like to know if there is other feedback >> or opinion on this. >> >> I would appreciate any feedback on this. >> >> Thanks. >> > -- Ryan Blue Tabular
Re: Supports Dynamic Table Options for Spark SQL
I want to note that I wouldn't recommend time traveling this way by using the hint for `snapshot-id`. Instead, we want to add the standard SQL syntax for that in a separate PR. This is useful for other options that help a table scan perform better, like specifying the target split size. You're right that this isn't a typical optimizer hint, but I'm not sure what other syntax is possible for this use case. How else would we send custom properties through to the scan? On Mon, Nov 15, 2021 at 9:25 AM Mich Talebzadeh wrote: > I am looking at the hint and it appears to me (I stand corrected), it is a > single table hint as below: > > -- time travel > SELECT * FROM t /*+ OPTIONS('snapshot-id'='10963874102873L') */ > > My assumption is that any view on this table will also benefit from this > hint. This is not a hint to optimizer in a classical sense. Only a snapshot > hint. Normally, a hint is an instruction to the optimizer. When writing > SQL, one may know information about the data unknown to the optimizer. > Hints enable one to make decisions normally made by the optimizer, > sometimes causing the optimizer to select a plan that it sees as higher > cost. > > > So far as this case is concerned, it looks OK and I concur it should be > extended to write as well. > > > HTH > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 15 Nov 2021 at 17:02, Russell Spitzer > wrote: > >> I think since we probably will end up using this same syntax on write, >> this makes a lot of sense. Unless there is another good way to express a >> similar concept during a write operation I think going forward with this >> would be ok. >> >> On Mon, Nov 15, 2021 at 10:44 AM Ryan Blue wrote: >> >>> The proposed feature is to be able to pass options through SQL like you >>> would when using the DataFrameReader API, so it would work for all >>> sources that support read options. Read options are part of the DSv2 API, >>> there just isn’t a way to pass options when using SQL. The PR also has a >>> non-Iceberg example, which is being able to customize some JDBC source >>> behaviors per query (e.g., fetchSize), rather than globally in the table’s >>> options. >>> >>> The proposed syntax is odd, but I think that's an artifact of Spark >>> introducing read options that aren't a normal part of SQL. Seems reasonable >>> to me to pass them through a hint. >>> >>> On Mon, Nov 15, 2021 at 2:18 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Interesting. >>>> >>>> What is this going to add on top of support for Apache Iceberg >>>> <https://www.dremio.com/data-lake/apache-iceberg/>. Will it be in line >>>> with support for Hive ACID tables or Delta Lake? >>>> >>>> HTH >>>> >>>> >>>> >>>>view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Mon, 15 Nov 2021 at 01:56, Zhun Wang wrote: >>>> >>>>> Hi dev, >>>>> >>>>> We are discussing Support Dynamic Table Options for Spark SQL ( >>>>> https://github.com/apache/spark/pull/34072). It is currently not sure >>>>> if the syntax makes sense, and would like to know if there is other >>>>> feedback or opinion on this. >>>>> >>>>> I would appreciate any feedback on this. >>>>> >>>>> Thanks. >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Tabular >>> >> -- Ryan Blue Tabular
Re: Supports Dynamic Table Options for Spark SQL
Mich, time travel will use the newly added VERSION AS OF or TIMESTAMP AS OF syntax. On Tue, Nov 16, 2021 at 12:40 AM Mich Talebzadeh wrote: > As I stated before, hints are designed to direct the optimizer to choose > a certain query execution plan based on the specific criteria. > > > -- time travel > SELECT * FROM t /*+ OPTIONS('snapshot-id'='10963874102873L') */ > > > The alternative would be to specify time travel by creating a snapshot > based on CURRENT_DATE() range which encapsulates time travel for > 'snapshot-id'='10963874102873L' > > > CREATE SNAPSHOT t_snap > > START WITH CURRENT_DATE() - 30 > > NEXT CURRENT_DATE() > > AS SELECT * FROM t > > > SELECT * FROM t_snap > > > HTH > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Tue, 16 Nov 2021 at 04:26, Hyukjin Kwon wrote: > >> My biggest concern with the syntax in hints is that Spark SQL's options >> can change results (e.g., CSV's header options) whereas hints are generally >> not designed to affect the external results if I am not mistaken. This is >> counterintuitive. >> I left the comment in the PR but what's the real benefit over leveraging: >> SET conf and RESET conf? we can extract options from runtime session >> configurations e.g., SessionConfigSupport. >> >> On Tue, 16 Nov 2021 at 04:30, Nicholas Chammas < >> nicholas.cham...@gmail.com> wrote: >> >>> Side note about time travel: There is a PR >>> <https://github.com/apache/spark/pull/34497> to add VERSION/TIMESTAMP >>> AS OF syntax to Spark SQL. >>> >>> On Mon, Nov 15, 2021 at 2:23 PM Ryan Blue wrote: >>> >>>> I want to note that I wouldn't recommend time traveling this way by >>>> using the hint for `snapshot-id`. Instead, we want to add the standard SQL >>>> syntax for that in a separate PR. This is useful for other options that >>>> help a table scan perform better, like specifying the target split size. >>>> >>>> You're right that this isn't a typical optimizer hint, but I'm not sure >>>> what other syntax is possible for this use case. How else would we send >>>> custom properties through to the scan? >>>> >>>> On Mon, Nov 15, 2021 at 9:25 AM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> I am looking at the hint and it appears to me (I stand corrected), it >>>>> is a single table hint as below: >>>>> >>>>> -- time travel >>>>> SELECT * FROM t /*+ OPTIONS('snapshot-id'='10963874102873L') */ >>>>> >>>>> My assumption is that any view on this table will also benefit from >>>>> this hint. This is not a hint to optimizer in a classical sense. Only a >>>>> snapshot hint. Normally, a hint is an instruction to the optimizer. >>>>> When writing SQL, one may know information about the data unknown to the >>>>> optimizer. Hints enable one to make decisions normally made by the >>>>> optimizer, sometimes causing the optimizer to select a plan that it sees >>>>> as >>>>> higher cost. >>>>> >>>>> >>>>> So far as this case is concerned, it looks OK and I concur it should >>>>> be extended to write as well. >>>>> >>>>> >>>>> HTH >>>>> >>>>> >>>>>view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>&
Re: [VOTE][SPIP] Support Customized Kubernetes Schedulers Proposal
+1 (non-binding) On Wed, Jan 12, 2022 at 10:29 AM Mridul Muralidharan wrote: > > +1 (binding) > This should be a great improvement ! > > Regards, > Mridul > > On Wed, Jan 12, 2022 at 4:04 AM Kent Yao wrote: > >> +1 (non-binding) >> >> Thomas Graves 于2022年1月12日周三 11:52写道: >> >>> +1 (binding). >>> >>> One minor note since I haven't had time to look at the implementation >>> details is please make sure resource aware scheduling and the stage >>> level scheduling still work or any caveats are documented. Feel free >>> to ping me if questions in these areas. >>> >>> Tom >>> >>> On Wed, Jan 5, 2022 at 7:07 PM Yikun Jiang wrote: >>> > >>> > Hi all, >>> > >>> > I’d like to start a vote for SPIP: "Support Customized Kubernetes >>> Schedulers Proposal" >>> > >>> > The SPIP is to support customized Kubernetes schedulers in Spark on >>> Kubernetes. >>> > >>> > Please also refer to: >>> > >>> > - Previous discussion in dev mailing list: [DISCUSSION] SPIP: Support >>> Volcano/Alternative Schedulers Proposal >>> > - Design doc: [SPIP] Spark-36057 Support Customized Kubernetes >>> Schedulers Proposal >>> > - JIRA: SPARK-36057 >>> > >>> > Please vote on the SPIP: >>> > >>> > [ ] +1: Accept the proposal as an official SPIP >>> > [ ] +0 >>> > [ ] -1: I don’t think this is a good idea because … >>> > >>> > Regards, >>> > Yikun >>> >>> - >>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>> >>> -- Ryan Blue Tabular
Re: [VOTE] SPIP: Catalog API for view metadata
+1 for the SPIP. I think it's well designed and it has worked quite well at Netflix for a long time. On Thu, Feb 3, 2022 at 2:04 PM John Zhuge wrote: > Hi Spark community, > > I’d like to restart the vote for the ViewCatalog design proposal (SPIP). > > The proposal is to add a ViewCatalog interface that can be used to load, > create, alter, and drop views in DataSourceV2. > > Please vote on the SPIP until Feb. 9th (Wednesday). > > [ ] +1: Accept the proposal as an official SPIP > [ ] +0 > [ ] -1: I don’t think this is a good idea because … > > Thanks! > -- Ryan Blue Tabular
Re: Data Contracts
Hey Phillip, You're right that we can improve tooling to help with data contracts, but I think that a contract still needs to be an agreement between people. Constraints help by helping to ensure a data producer adheres to the contract and gives feedback as soon as possible when assumptions are violated. The problem with considering that the only contract is that it's too easy to change it. For example, if I change a required column to a nullable column, that's a perfectly valid transition --- but only if I've communicated that change to downstream consumers. Ryan On Mon, Jun 12, 2023 at 4:43 AM Phillip Henry wrote: > Hi, folks. > > There currently seems to be a buzz around "data contracts". From what I > can tell, these mainly advocate a cultural solution. But instead, could big > data tools be used to enforce these contracts? > > My questions really are: are there any plans to implement data constraints > in Spark (eg, an integer must be between 0 and 100; the date in column X > must be before that in column Y)? And if not, is there an appetite for them? > > Maybe we could associate constraints with schema metadata that are > enforced in the implementation of a FileFormatDataWriter? > > Just throwing it out there and wondering what other people think. It's an > area that interests me as it seems that over half my problems at the day > job are because of dodgy data. > > Regards, > > Phillip > > -- Ryan Blue Tabular
Re: Query hints visible to DSV2 connectors?
You probably want to use data source options. Those get passed through but can't be set in SQL. On Wed, Aug 2, 2023 at 5:39 PM Alex Cruise wrote: > Hey folks, > > I'm adding an optional feature to my DSV2 connector where it can choose > between a row-based or columnar PartitionReader dynamically depending on a > query's schema. I'd like to be able to supply a hint at query time that's > visible to the connector, but at the moment I can't see any way to > accomplish that. > > From what I can see the artifacts produced by the existing hint system [ > https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html > or sql("select 1").hint("foo").show()] aren't visible from the > TableCatalog/Table/ScanBuilder. > > I guess I could set a config parameter but I'd rather do this on a > per-query basis. Any tips? > > Thanks! > > -0xe1a > -- Ryan Blue Tabular
Re: [DISCUSSION] SPIP: An Official Kubernetes Operator for Apache Spark
+1 On Thu, Nov 9, 2023 at 4:23 PM Hussein Awala wrote: > +1 for creating an official Kubernetes operator for Apache Spark > > On Fri, Nov 10, 2023 at 12:38 AM huaxin gao > wrote: > >> +1 >> >> On Thu, Nov 9, 2023 at 3:14 PM DB Tsai wrote: >> >>> +1 >>> >>> To be completely transparent, I am employed in the same department as >>> Zhou at Apple. >>> >>> I support this proposal, provided that we witness community adoption >>> following the release of the Flink Kubernetes operator, streamlining Flink >>> deployment on Kubernetes. >>> >>> A well-maintained official Spark Kubernetes operator is essential for >>> our Spark community as well. >>> >>> DB Tsai | https://www.dbtsai.com/ | PGP 42E5B25A8F7A82C1 >>> >>> On Nov 9, 2023, at 12:05 PM, Zhou Jiang wrote: >>> >>> Hi Spark community, >>> I'm reaching out to initiate a conversation about the possibility of >>> developing a Java-based Kubernetes operator for Apache Spark. Following the >>> operator pattern ( >>> https://kubernetes.io/docs/concepts/extend-kubernetes/operator/), Spark >>> users may manage applications and related components seamlessly using >>> native tools like kubectl. The primary goal is to simplify the Spark user >>> experience on Kubernetes, minimizing the learning curve and operational >>> complexities and therefore enable users to focus on the Spark application >>> development. >>> Although there are several open-source Spark on Kubernetes operators >>> available, none of them are officially integrated into the Apache Spark >>> project. As a result, these operators may lack active support and >>> development for new features. Within this proposal, our aim is to introduce >>> a Java-based Spark operator as an integral component of the Apache Spark >>> project. This solution has been employed internally at Apple for multiple >>> years, operating millions of executors in real production environments. The >>> use of Java in this solution is intended to accommodate a wider user and >>> contributor audience, especially those who are familiar with Scala. >>> Ideally, this operator should have its dedicated repository, similar to >>> Spark Connect Golang or Spark Docker, allowing it to maintain a loose >>> connection with the Spark release cycle. This model is also followed by the >>> Apache Flink Kubernetes operator. >>> We believe that this project holds the potential to evolve into a >>> thriving community project over the long run. A comparison can be drawn >>> with the Flink Kubernetes Operator: Apple has open-sourced internal Flink >>> Kubernetes operator, making it a part of the Apache Flink project ( >>> https://github.com/apache/flink-kubernetes-operator). This move has >>> gained wide industry adoption and contributions from the community. In a >>> mere year, the Flink operator has garnered more than 600 stars and has >>> attracted contributions from over 80 contributors. This showcases the level >>> of community interest and collaborative momentum that can be achieved in >>> similar scenarios. >>> More details can be found at SPIP doc : Spark Kubernetes Operator >>> https://docs.google.com/document/d/1f5mm9VpSKeWC72Y9IiKN2jbBn32rHxjWKUfLRaGEcLE >>> >>> Thanks, >>> -- >>> *Zhou JIANG* >>> >>> >>> -- Ryan Blue Tabular
Re: Manually reading parquet files.
You're getting InternalRow instances. They probably have the data you want, but the toString representation doesn't match the data for InternalRow. On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew wrote: > Hello Friends, > > > > I’m working on a performance improvement that reads additional parquet > files in the middle of a lambda and I’m running into some issues. This is > what id like todo > > > > ds.mapPartitions(x=>{ > //read parquet file in and perform an operation with x > }) > > > > > > Here’s my current POC code but I’m getting nonsense back from the row > reader. > > > > *import *com.amazon.horizon.azulene.util.SparkFileUtils._ > > *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false") > > *val *data = *List*( > *TestRow*(1,1,"asdf"), > *TestRow*(2,1,"asdf"), > *TestRow*(3,1,"asdf"), > *TestRow*(4,1,"asdf") > ) > > *val *df = *spark*.createDataFrame(data) > > *val *folder = Files.*createTempDirectory*("azulene-test") > > *val *folderPath = folder.toAbsolutePath.toString + "/" > df.write.mode("overwrite").parquet(folderPath) > > *val *files = *spark*.fs.listStatus(folder.toUri) > > *val *file = files(1)//skip _success file > > *val *partitionSchema = *StructType*(*Seq*.empty) > *val *dataSchema = df.schema > *val *fileFormat = *new *ParquetFileFormat() > > *val *path = file.getPath > > *val *status = *spark*.fs.getFileStatus(path) > > *val *pFile = *new *PartitionedFile( > partitionValues = InternalRow.*empty*,//This should be empty for non > partitioned values > filePath = path.toString, > start = 0, > length = status.getLen > ) > > *val *readFile: (PartitionedFile) => Iterator[Any] = > //Iterator[InternalRow] > fileFormat.buildReaderWithPartitionValues( > sparkSession = *spark*, > dataSchema = dataSchema, > partitionSchema = partitionSchema,//this should be empty for non > partitioned feilds > requiredSchema = dataSchema, > filters = *Seq*.empty, > options = *Map*.*empty*, > hadoopConf = *spark*.sparkContext.hadoopConfiguration > //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) > ) > > *import *scala.collection.JavaConverters._ > > *val *rows = readFile(pFile).flatMap(_ *match *{ > *case *r: InternalRow => *Seq*(r) > > // This doesn't work. vector mode is doing something screwy > *case *b: ColumnarBatch => b.rowIterator().asScala > }).toList > > *println*(rows) > //List([0,1,5b,24,66647361]) > //??this is wrong I think > > > > Has anyone attempted something similar? > > > > Cheers Andrew > > > -- Ryan Blue Software Engineer Netflix
Re: Closing a SparkSession stops the SparkContext
I think Vinoo is right about the intended behavior. If we support multiple sessions in one context, then stopping any one session shouldn't stop the shared context. The last session to be stopped should stop the context, but not any before that. We don't typically run multiple sessions in the same context so we haven't hit this, but it sounds reasonable. On Tue, Apr 2, 2019 at 8:23 AM Vinoo Ganesh wrote: > Hey Sean - Cool, maybe I'm misunderstanding the intent of clearing a > session vs. stopping it. > > The cause of the leak looks to be because of this line here > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L131. > The ExecutionListenerBus that's added persists forever on the context's > listener bus (the SparkContext ListenerBus has an ExecutionListenerBus). > I'm trying to figure out the place that this cleanup should happen. > > With the current implementation, calling SparkSession.stop will clean up > the ExecutionListenerBus (since the context itself is stopped), but it's > unclear to me why terminating one session should terminate the JVM-global > context. Possible my mental model is off here, but I would expect stopping > a session to remove all traces of that session, while keeping the context > alive, and stopping a context would, well, stop the context. > > If stopping the session is expected to stop the context, what's the > intended usage of clearing the active / default session? > > Vinoo > > On 4/2/19, 10:57, "Sean Owen" wrote: > > What are you expecting there ... that sounds correct? something else > needs to be closed? > > On Tue, Apr 2, 2019 at 9:45 AM Vinoo Ganesh > wrote: > > > > Hi All - > > > >I’ve been digging into the code and looking into what appears to > be a memory leak ( > https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_SPARK-2D27337&d=DwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=7WzLIMu3WvZwd6AMPatqn1KZW39eI6c_oflAHIy1NUc&m=TjtXLhnSM5M_aKQlD2NFU2wRnXPvtrUbRm-t84gBNlY&s=JUsN7EzGimus0jYxyj47_xHYUDC6KnxieeUBfUKTefk&e=) > and have noticed something kind of peculiar about the way closing a > SparkSession is handled. Despite being marked as Closeable, > closing/stopping a SparkSession simply stops the SparkContext. This changed > happened as a result of one of the PRs addressing > https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_SPARK-2D15073&d=DwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=7WzLIMu3WvZwd6AMPatqn1KZW39eI6c_oflAHIy1NUc&m=TjtXLhnSM5M_aKQlD2NFU2wRnXPvtrUbRm-t84gBNlY&s=Nd9eBDH-FDdzEn_BVt2nZaNQn6fXA8EfVq5rKGztOUo&e= > in > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_pull_12873_files-23diff-2Dd91c284798f1c98bf03a31855e26d71cR596&d=DwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=7WzLIMu3WvZwd6AMPatqn1KZW39eI6c_oflAHIy1NUc&m=TjtXLhnSM5M_aKQlD2NFU2wRnXPvtrUbRm-t84gBNlY&s=RM9LrT3Yp2mf1BcbBf1o_m3bcNZdOjznrogBLzUzgeE&e= > . > > > > > > > > I’m trying to understand why this is the intended behavior – anyone > have any knowledge of why this is the case? > > > > > > > > Thanks, > > > > Vinoo > > > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- Ryan Blue Software Engineer Netflix
Re: Closing a SparkSession stops the SparkContext
For #1, do we agree on the behavior? I think that closing a SparkSession should not close the SparkContext unless it is the only session. Evidently, that's not what happens and I consider the current the current behavior a bug. For more context, we're working on the new catalog APIs and how to guarantee consistent operations. Self-joining a table, for example, should use the same version of the table for both scans, and that state should be specific to a session, not global. These plans assume that SparkSession represents a session of interactions, along with a reasonable life-cycle. If that life-cycle includes closing all sessions when you close any session, then we can't really use sessions for this. rb On Wed, Apr 3, 2019 at 9:35 AM Vinoo Ganesh wrote: > Yeah, so I think there are 2 separate issues here: > > > >1. The coupling of the SparkSession + SparkContext in their current >form seem unnatural >2. The current memory leak, which I do believe is a case where the >session is added onto the spark context, but is only needed by the session >(but would appreciate a sanity check here). Meaning, it may make sense to >investigate an API change. > > > > Thoughts? > > > > On 4/2/19, 15:13, "Sean Owen" wrote: > > > @Sean – To the point that Ryan made, it feels wrong that stopping a > session force stops the global context. Building in the logic to only stop > the context when the last session is stopped also feels like a solution, > but the best way I can think about doing this involves storing the global > list of every available SparkSession, which may be difficult. > > > > I tend to agree it would be more natural for the SparkSession to have > > its own lifecycle 'stop' method that only stops/releases its own > > resources. But is that the source of the problem here? if the state > > you're trying to free is needed by the SparkContext, it won't help. If > > it happens to be in the SparkContext but is state only needed by one > > SparkSession and that there isn't any way to clean up now, that's a > > compelling reason to change the API. Is that the situation? The only > > downside is making the user separately stop the SparkContext then. > > > > *From: *Vinoo Ganesh > *Date: *Tuesday, April 2, 2019 at 13:24 > *To: *Arun Mahadevan , Ryan Blue > *Cc: *Sean Owen , "dev@spark.apache.org" < > dev@spark.apache.org> > *Subject: *Re: Closing a SparkSession stops the SparkContext > > > > // Merging threads > > > > Thanks everyone for your thoughts. I’m very much in sync with Ryan here. > > > > @Sean – To the point that Ryan made, it feels wrong that stopping a > session force stops the global context. Building in the logic to only stop > the context when the last session is stopped also feels like a solution, > but the best way I can think about doing this involves storing the global > list of every available SparkSession, which may be difficult. > > > > @Arun – If the intention is not to be able to clear and create new > sessions, then what specific is the intended use case of Sessions? > https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html > [databricks.com] > <https://urldefense.proofpoint.com/v2/url?u=https-3A__databricks.com_blog_2016_08_15_how-2Dto-2Duse-2Dsparksession-2Din-2Dapache-2Dspark-2D2-2D0.html&d=DwMGaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=7WzLIMu3WvZwd6AMPatqn1KZW39eI6c_oflAHIy1NUc&m=gSbHnTWozD5jH8QNJAVS16x0z9oydSZcgWVhXacfk00&s=L_WFiV7KKSXbpJfUKioz7JL3SSS-QhRAZOzS2OpTI3c&e=> > describes SparkSessions as time bounded interactions which implies that old > ones should be clear-able an news ones create-able in lockstep without > adverse effect? > > > > *From: *Arun Mahadevan > *Date: *Tuesday, April 2, 2019 at 12:31 > *To: *Ryan Blue > *Cc: *Vinoo Ganesh , Sean Owen , " > dev@spark.apache.org" > *Subject: *Re: Closing a SparkSession stops the SparkContext > > > > I am not sure how would it cause a leak though. When a spark session or > the underlying context is stopped it should clean up everything. The > getOrCreate is supposed to return the active thread local or the global > session. May be if you keep creating new sessions after explicitly clearing > the default and the local sessions and keep leaking the sessions it could > happen, but I don't think Sessions are intended to be used that way. > > > > On Tue, 2 Apr 2019 at 08:45, Ryan Blue wrote: > > I think Vinoo is right about the intended behavior. If we support multiple > sessions in one context, then
DataSourceV2 sync 3 April 2019
un a bucket join, it should look up the “bucket” function from the table’s catalog using the FunctionCatalog interface. That way it can prepare data for the other side of a bucketed join to work with bucketed data from any source. - PR #24246: Add TableCatalog API - Ryan: This PR is based on #24117, but please start reviewing now to make this go faster. *Attendees*: Ryan Blue John Zhuge Russel Spitzer Gengliang Wang Yuanjian Li Matt Cheah Yifei Huang Felix Cheung Dilip Biswal Wenchen Fan -- Ryan Blue Software Engineer Netflix
Re: Dataset schema incompatibility bug when reading column partitioned data
I think the confusion is that the schema passed to spark.read is not a projection schema. I don’t think it is even used in this case because the Parquet dataset has its own schema. You’re getting the schema of the table. I think the correct behavior is to reject a user-specified schema in this case. On Thu, Apr 11, 2019 at 11:04 AM Bruce Robbins wrote: > I see a Jira: > > https://issues.apache.org/jira/browse/SPARK-21021 > > On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas > wrote: > >> +dev for more visibility. Is this a known issue? Is there a plan for a >> fix? >> >> Thanks, >> David >> >> Begin forwarded message: >> >> *From: *Dávid Szakállas >> *Subject: **Dataset schema incompatibility bug when reading column >> partitioned data* >> *Date: *2019. March 29. 14:15:27 CET >> *To: *u...@spark.apache.org >> >> We observed the following bug on Spark 2.4.0: >> >> scala> >> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet") >> >> scala> val schema = StructType(Seq(StructField("_1", >> IntegerType),StructField("_2", IntegerType))) >> >> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show >> +---+---+ >> | _2| _1| >> +---+---+ >> | 2| 1| >> +---+- --+ >> >> >> That is, when reading column partitioned Parquet files the explicitly >> specified schema is not adhered to, instead the partitioning columns are >> appended the end of the column list. This is a quite severe issue as some >> operations, such as union, fails if columns are in a different order in two >> datasets. Thus we have to work around the issue with a select: >> >> val columnNames = schema.fields.map(_.name) >> ds.select(columnNames.head, columnNames.tail: _*) >> >> >> Thanks, >> David Szakallas >> Data Engineer | Whitepages, Inc. >> >> >> -- Ryan Blue Software Engineer Netflix
Re: Thoughts on dataframe cogroup?
ideal would have been some pandas udf version of cogroup >>>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup! >>>>>> >>>>>> Chris >>>>>> >>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy < >>>>>> jonathan.wina...@gmail.com> wrote: >>>>>> >>>>>> For info, in our team have defined our own cogroup on dataframe in >>>>>> the past on different projects using different methods (rdd[row] based or >>>>>> union all collect list based). >>>>>> >>>>>> I might be biased, but find the approach very useful in project to >>>>>> simplify and speed up transformations, and remove a lot of intermediate >>>>>> stages (distinct + join => just cogroup). >>>>>> >>>>>> Plus spark 2.4 introduced a lot of new operator for nested data. >>>>>> That's a win! >>>>>> >>>>>> >>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, wrote: >>>>>> >>>>>>> I am wondering do other people have opinion/use case on cogroup? >>>>>>> >>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin >>>>>>> wrote: >>>>>>> >>>>>>>> Alessandro, >>>>>>>> >>>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality >>>>>>>> full outer join" . >>>>>>>> >>>>>>>> Two issues I see with equity outer join is: >>>>>>>> (1) equity outer join will give n * m rows for each key (n and m >>>>>>>> being the corresponding number of rows in df1 and df2 for each key) >>>>>>>> (2) User needs to do some extra processing to transform n * m back >>>>>>>> to the desired shape (two sub dataframes with n and m rows) >>>>>>>> >>>>>>>> I think full outer join is an inefficient way to implement cogroup. >>>>>>>> If the end goal is to have two separate dataframes for each key, why >>>>>>>> joining them first and then unjoin them? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando < >>>>>>>> alessandro.solima...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> I fail to see how an equi-join on the key columns is different >>>>>>>>> than the cogroup you propose. >>>>>>>>> >>>>>>>>> I think the accepted answer can shed some light: >>>>>>>>> >>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark >>>>>>>>> >>>>>>>>> Now you apply an udf on each iterable, one per key value (obtained >>>>>>>>> with cogroup). >>>>>>>>> >>>>>>>>> You can achieve the same by: >>>>>>>>> 1) join df1 and df2 on the key you want, >>>>>>>>> 2) apply "groupby" on such key >>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not >>>>>>>>> familiar with them >>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), >>>>>>>>> that will process each group "in isolation". >>>>>>>>> >>>>>>>>> HTH, >>>>>>>>> Alessandro >>>>>>>>> >>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it >>>>>>>>>> has been very helpful in integrating Spark with our existing >>>>>>>>>> pandas-heavy >>>>>>>>>> libraries. >>>>>>>>>> >>>>>>>>>> Recently, we have found more and more cases where >>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to >>>>>>>>>> group two >>>>>>>>>> dataframes by the same key, and apply a function which takes two >>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels >>>>>>>>>> very >>>>>>>>>> much like the "cogroup" operation in the RDD API. >>>>>>>>>> >>>>>>>>>> It would be great to be able to do sth like this: (not actual >>>>>>>>>> API, just to explain the use case): >>>>>>>>>> >>>>>>>>>> @pandas_udf(return_schema, ...) >>>>>>>>>> def my_udf(pdf1, pdf2) >>>>>>>>>> # pdf1 and pdf2 are the subset of the original dataframes >>>>>>>>>> that is associated with a particular key >>>>>>>>>> result = ... # some code that uses pdf1 and pdf2 >>>>>>>>>> return result >>>>>>>>>> >>>>>>>>>> df3 = cogroup(df1, df2, key='some_key').apply(my_udf) >>>>>>>>>> >>>>>>>>>> I have searched around the problem and some people have suggested >>>>>>>>>> to join the tables first. However, it's often not the same pattern >>>>>>>>>> and hard >>>>>>>>>> to get it to work by using joins. >>>>>>>>>> >>>>>>>>>> I wonder what are people's thought on this? >>>>>>>>>> >>>>>>>>>> Li >>>>>>>>>> >>>>>>>>>> -- Ryan Blue Software Engineer Netflix
Re: Spark 2.4.2
Is this a bug fix? It looks like a new feature to me. On Tue, Apr 16, 2019 at 4:13 PM Michael Armbrust wrote: > Hello All, > > I know we just released Spark 2.4.1, but in light of fixing SPARK-27453 > <https://issues.apache.org/jira/browse/SPARK-27453> I was wondering if it > might make sense to follow up quickly with 2.4.2. Without this fix its > very hard to build a datasource that correctly handles partitioning without > using unstable APIs. There are also a few other fixes that have trickled > in since 2.4.1. > > If there are no objections, I'd like to start the process shortly. > > Michael > -- Ryan Blue Software Engineer Netflix
Re: Spark 2.4.2
Spark has a lot of strange behaviors already that we don't fix in patch releases. And bugs aren't usually fixed with a configuration flag to turn on the fix. That said, I don't have a problem with this commit making it into a patch release. This is a small change and looks safe enough to me. I was just a little surprised since I was expecting a correctness issue if this is prompting a release. I'm definitely on the side of case-by-case judgments on what to allow in patch releases and this looks fine. On Tue, Apr 16, 2019 at 4:27 PM Michael Armbrust wrote: > I would argue that its confusing enough to a user for options from > DataFrameWriter to be silently dropped when instantiating the data source > to consider this a bug. They asked for partitioning to occur, and we are > doing nothing (not even telling them we can't). I was certainly surprised > by this behavior. Do you have a different proposal about how this should > be handled? > > On Tue, Apr 16, 2019 at 4:23 PM Ryan Blue wrote: > >> Is this a bug fix? It looks like a new feature to me. >> >> On Tue, Apr 16, 2019 at 4:13 PM Michael Armbrust >> wrote: >> >>> Hello All, >>> >>> I know we just released Spark 2.4.1, but in light of fixing SPARK-27453 >>> <https://issues.apache.org/jira/browse/SPARK-27453> I was wondering if >>> it might make sense to follow up quickly with 2.4.2. Without this fix its >>> very hard to build a datasource that correctly handles partitioning without >>> using unstable APIs. There are also a few other fixes that have trickled >>> in since 2.4.1. >>> >>> If there are no objections, I'd like to start the process shortly. >>> >>> Michael >>> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > -- Ryan Blue Software Engineer Netflix
DataSourceV2 sync, 17 April 2019
Here are my notes from the last DSv2 sync. As always: - If you’d like to attend the sync, send me an email and I’ll add you to the invite. Everyone is welcome. - These notes are what I wrote down and remember. If you have corrections or comments, please reply. *Topics*: - TableCatalog PR #24246: https://github.com/apache/spark/pull/24246 - Remove SaveMode PR #24233: https://github.com/apache/spark/pull/24233 - Streaming capabilities PR #24129: https://github.com/apache/spark/pull/24129 *Attendees*: Ryan Blue John Zhuge Matt Cheah Yifei Huang Bruce Robbins Jamison Bennett Russell Spitzer Wenchen Fan Yuanjian Li (and others who arrived after the start) *Discussion*: - TableCatalog PR: https://github.com/apache/spark/pull/24246 - Wenchen and Matt had just reviewed the PR. Mostly what was in the SPIP so not much discussion of content. - Wenchen: Easier to review if the changes to move Table and TableCapability were in a separate PR (mostly import changes) - Ryan will open a separate PR for the move [Ed: #24410] - Russell: How should caching work? Has hit lots of problems with Spark caching data and getting out of date - Ryan: Spark should always call into the catalog and not cache to avoid those problems. However, Spark should ensure that it uses the same instance of a Table for all scans in the same query, for consistent self-joins. - Some discussion of self joins. Conclusion was that we don’t need to worry about this yet because it is unlikely. - Wenchen: should this include the namespace methods? - Ryan: No, those are a separate concern and can be added in a parallel PR. - Remove SaveMode PR: https://github.com/apache/spark/pull/24233 - Wenchen: PR is on hold waiting for streaming capabilities, #24129, because the Noop sink doesn’t validate schema - Wenchen will open a PR to add a capability to opt out of schema validation, then come back to this PR. - Streaming capabilities PR: https://github.com/apache/spark/pull/24129 - Ryan: This PR needs validation in the analyzer. The analyzer is where validations should exist, or else validations must be copied into every code path that produces a streaming plan. - Wenchen: the write check can’t be written because the write node is never passed to the analyzer. Fixing that is a larger problem. - Ryan: Agree that refactoring to pass the write node to the analyzer should be separate. - Wenchen: a check to ensure that either microbatch or continuous can be used is hard because some sources may fall back - Ryan: By the time this check runs, fallback has happened. Do v1 sources support continuous mode? - Wenchen: No, v1 doesn’t support continuous - Ryan: Then this can be written to assume that v1 sources only support microbatch mode. - Wenchen will add this check - Wenchen: the check that tables in a v2 streaming relation support either microbatch or continuous won’t catch anything and are unnecessary - Ryan: These checks still need to be in the analyzer so future uses do not break. We had the same problem moving to v2: because schema checks were specific to DataSource code paths, they were overlooked when adding v2. Running validations in the analyzer avoids problems like this. - Wenchen will add the validation. - Matt: Will v2 be ready in time for the 3.0 release? - Ryan: Once #24246 is in, we can work on PRs in parallel, but it is not looking good. -- Ryan Blue Software Engineer Netflix
Re: DataSourceV2 sync, 17 April 2019
That is mostly correct. V2 standardizes the behavior of logical operations like CTAS across data sources, so it isn't compatible with v1 behavior. Consequently, we can't just move to v2 easily. We have to maintain both in parallel and eventually deprecate v1. We are aiming to have a working v2 in Spark 3.0, but the community has not committed to this goal. Support may be incomplete. rb On Sat, Apr 27, 2019 at 7:13 AM Jean Georges Perrin wrote: > This may be completely inappropriate and I apologize if it is, > nevertheless, I am trying to get some clarification about the current > status of DS. > > Please tell me where I am wrong: > > Currently, the stable API is v1. > There is a v2 DS API, but it is not widely used. > The group is working on a “new” v2 API that will be available after the > release of Spark v3. > > jg > > -- > Jean Georges Perrin > j...@jgp.net > > > > On Apr 19, 2019, at 10:10, Ryan Blue wrote: > > Here are my notes from the last DSv2 sync. As always: > >- If you’d like to attend the sync, send me an email and I’ll add you >to the invite. Everyone is welcome. >- These notes are what I wrote down and remember. If you have >corrections or comments, please reply. > > *Topics*: > >- TableCatalog PR #24246: https://github.com/apache/spark/pull/24246 >- Remove SaveMode PR #24233: https://github.com/apache/spark/pull/24233 >- Streaming capabilities PR #24129: >https://github.com/apache/spark/pull/24129 > > *Attendees*: > > Ryan Blue > John Zhuge > Matt Cheah > Yifei Huang > Bruce Robbins > Jamison Bennett > Russell Spitzer > Wenchen Fan > Yuanjian Li > > (and others who arrived after the start) > > *Discussion*: > >- TableCatalog PR: https://github.com/apache/spark/pull/24246 > - Wenchen and Matt had just reviewed the PR. Mostly what was in the > SPIP so not much discussion of content. > - Wenchen: Easier to review if the changes to move Table and > TableCapability were in a separate PR (mostly import changes) > - Ryan will open a separate PR for the move [Ed: #24410] > - Russell: How should caching work? Has hit lots of problems with > Spark caching data and getting out of date > - Ryan: Spark should always call into the catalog and not cache to > avoid those problems. However, Spark should ensure that it uses the same > instance of a Table for all scans in the same query, for consistent > self-joins. > - Some discussion of self joins. Conclusion was that we don’t need > to worry about this yet because it is unlikely. > - Wenchen: should this include the namespace methods? > - Ryan: No, those are a separate concern and can be added in a > parallel PR. >- Remove SaveMode PR: https://github.com/apache/spark/pull/24233 > - Wenchen: PR is on hold waiting for streaming capabilities, > #24129, because the Noop sink doesn’t validate schema > - Wenchen will open a PR to add a capability to opt out of schema > validation, then come back to this PR. >- Streaming capabilities PR: https://github.com/apache/spark/pull/24129 > - Ryan: This PR needs validation in the analyzer. The analyzer is > where validations should exist, or else validations must be copied into > every code path that produces a streaming plan. > - Wenchen: the write check can’t be written because the write node > is never passed to the analyzer. Fixing that is a larger problem. > - Ryan: Agree that refactoring to pass the write node to the > analyzer should be separate. > - Wenchen: a check to ensure that either microbatch or continuous > can be used is hard because some sources may fall back > - Ryan: By the time this check runs, fallback has happened. Do v1 > sources support continuous mode? > - Wenchen: No, v1 doesn’t support continuous > - Ryan: Then this can be written to assume that v1 sources only > support microbatch mode. > - Wenchen will add this check > - Wenchen: the check that tables in a v2 streaming relation support > either microbatch or continuous won’t catch anything and are unnecessary > - Ryan: These checks still need to be in the analyzer so future > uses do not break. We had the same problem moving to v2: because schema > checks were specific to DataSource code paths, they were overlooked when > adding v2. Running validations in the analyzer avoids problems like > this. > - Wenchen will add the validation. >- Matt: Will v2 be ready in time for the 3.0 release? > - Ryan: Once #24246 is in, we can work on PRs in parallel, but it > is not looking good. > > -- > Ryan Blue > Software Engineer > Netflix > > > -- Ryan Blue Software Engineer Netflix
Re: Bucketing and catalyst
Andrew, Here's an umbrella issue that is a good starting point for looking at the project to add Hive bucketing support: https://issues.apache.org/jira/browse/SPARK-19256 rb On Thu, May 2, 2019 at 11:40 AM Long, Andrew wrote: > Hey Friends, > > > > How aware of bucketing is Catalyst? I’ve been trying to piece together how > Catalyst knows that it can remove a sort and shuffle given that both tables > are bucketed and sorted the same way. Is there any classes in particular I > should look at? > > > > Cheers Andrew > -- Ryan Blue Software Engineer Netflix
DataSourceV2 community sync notes - 1 May 2019
Here are my notes for the latest DSv2 community sync. As usual, if you have comments or corrections, please reply. If you’d like to be invited to the next sync, email me directly. Everyone is welcome to attend. *Attendees*: Ryan Blue John Zhuge Andrew Long Bruce Robbins Dilip Biswal Gengliang Wang Kevin Yu Michael Artz Russel Spitzer Yifei Huang Zhilmil Dhillon *Topics*: Introductions Open pull requests V2 organization Bucketing and sort order from v2 sources *Discussion*: - Introductions: we stopped doing introductions when we had a large group. Attendance has gone down from the first few syncs, so we decided to resume. - V2 organization / PR #24416: https://github.com/apache/spark/pull/24416 - Ryan: There’s an open PR to move v2 into catalyst - Andrew: What is the distinction between catalyst and sql? How do we know what goes where? - Ryan: IIUC, the catalyst module is supposed to be a stand-alone query planner that doesn’t get into concrete physical plans. The catalyst package is the private implementation. Anything that is generic catalyst, including APIs like DataType, should be in the catalyst module. Anything public, like an API, should not be in the catalyst package. - Ryan: v2 meets those requirements now and I don’t have a strong opinion on organization. We just need to choose one. - No one had a strong opinion so we tabled this. In #24416 or shortly after let’s decide on organization and do the move at once. - Next steps: someone with an opinion on organization should suggest a structure. - TableCatalog API / PR #24246: https://github.com/apache/spark/pull/24246 - Ryan: Wenchen’s last comment was that he was waiting for tests to pass and they are. Maybe this will be merged soon? - Bucketing and sort order from v2 sources - Andrew: interested in using existing data layout and sorting to remove expensive tasks in joins - Ryan: in v2, bucketing is unified with other partitioning functions. I plan to build a way for Spark to get partition function implementations from a source so it can use that function to prepare the other side of a join. From there, I have been thinking about a way to check compatibility between functions, so we could validate that table A has the same bucketing as table B. - Dilip: is bucketing Hive-specific? - Russel: Cassandra also buckets - Matt: what is the difference between bucketing and other partition functions for this? - Ryan: probably no difference. If you’re partitioning by hour, you could probably use that, too. - Dilip: how can Spark compare functions? - Andrew: should we introduce a standard? it would be easy to switch for their use case - Ryan: it is difficult to introduce a standard because so much data already exists in tables. I think it is easier to support multiple functions. - Russel: Cassandra uses dynamic bucketing, which wouldn’t be able to use a standard. - Dilip: sources could push down joins - Russel: that’s a harder problem - Andrew: does anyone else limit bucket size? - Ryan: we don’t because we assume the sort can spill. probably a good optimization for later - Matt: what are the follow-up items for this? - Andrew: will look into the current state of bucketing in Spark - Ryan: it would be great if someone thought about what the FunctionCatalog interface will look like -- Ryan Blue Software Engineer Netflix
Re: DataSourceV2Reader Q
lyzed$lzycompute(QueryExecution.scala:57) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78) > at org.apache.spark.sql.Dataset.org > $apache$spark$sql$Dataset$$withPlan(Dataset.scala:3406) > at org.apache.spark.sql.Dataset.select(Dataset.scala:1334) > at org.apache.spark.sql.Dataset.select(Dataset.scala:1352) > at org.apache.spark.sql.Dataset.select(Dataset.scala:1352) > at > edu.vanderbilt.accre.spark_ttree.TTreeDataSourceIntegrationTest.testLoadDataFrame(TTreeDataSourceIntegrationTest.java:47) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:89) > at > org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:41) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:541) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:763) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:463) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:209)] > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- Ryan Blue Software Engineer Netflix
DataSourceV2 sync notes - 15 May 2019
Sorry these notes are so late, I didn’t get to the write up until now. As usual, if anyone has corrections or comments, please reply. *Attendees*: John Zhuge Ryan Blue Andrew Long Wenchen Fan Gengliang Wang Russell Spitzer Yuanjian Li Yifei Huang Matt Cheah Amardeep Singh Dhilon Zhilmil Dhion Ryan Pifer *Topics*: - Should Spark require catalogs to report case sensitivity? - Bucketing and sorting survey - Add default v2 catalog: https://github.com/apache/spark/pull/24594 - SupportsNamespaces API: https://github.com/apache/spark/pull/24560 - FunctionCatalog API: https://github.com/apache/spark/pull/24559 - Skip output column resolution: https://github.com/apache/spark/pull/24469 - Move DSv2 into catalyst module: https://github.com/apache/spark/pull/24416 - Remove SupportsSaveMode: https://github.com/apache/spark/pull/24233 *Discussion*: - Wenchen: When will we add select support? - John: working in resolution. DSv2 resolution is straight-forward, the difficulty is ensuring a smooth transition from v1 to v2. - Ryan: table resolution will also be used for inserts. Once select is done, insert is next. - John: the PR may include insert as well - Add default v2 catalog: - Ryan: A default catalog is needed fro CTAS support when the source is v2 - Ryan: A pass-through v2 catalog that uses SessionCatalog should be available as the default - FunctionCatalog API: - Wenchen: this should have a design doc - Ryan: Agreed. The PR is for early discussion and prototyping. - Bucketed joins: [Ed: I don’t remember much of this, feel free to expand what was said] - Andrew: looks like lots of work to be done for bucketing. Sort removals aren’t done, bucketing with non-bucketed tables still incurs hashing costs. - Ryan: work on support for Hive bucketing appears to have stopped, so it doesn’t look like this is an easy area to improve - Where should join optimization be done? - Andrew will create a prototype PR. - Case sensitivity in catalogs: should catalogs report case sensitivity to Spark? - Ryan: catalogs connect to external systems so Spark can’t impose case sensitivity requirements. A catalog is case sensitive or not and would only be forced to violate Spark’s assumption. - Ryan: requiring a catalog to report whether it is case sensitive doesn’t actually help Spark. If the catalog is case sensitive, then Spark should pass exactly what it received to avoid changing the meaning. If the catalog is case insensitive, then Spark can pass exactly what it received because case is handled in the catalog. So Spark’s behavior doesn’t change. - Russel: not all catalogs are case sensitive or case insensitive. Some are case insensitive unless an identifier is quoted. Quoted parts are case sensitive. - Ryan: So a catalog would not be able to return true or false correctly. - Conclusion: Spark should pass identifiers that it received, without modification. -- Ryan Blue Software Engineer Netflix
DataSourceV2 sync notes - 29 May 2019
Here are my notes from last night’s sync. I had to leave early, so there may be more discussion. Others can fill in the details for those topics. *Attendees*: John Zhuge Ryan Blue Yifei Huang Matt Cheah Yuanjian Li Russell Spitzer Kevin Yu *Topics*: - Atomic extensions for the TableCatalog API - Moving DSv2 to Catalyst - should this include package renames? - Catalogs and table resolution: proposal to prefer default v2 catalog when defined *Notes*: - Skipping discussion of open PRs - Atomic table catalogs: - Matt: the proposal in the SPIP makes sense. When should Spark use the atomic API? Is there a way for a user to signal that Spark should use the staging calls? Spark could use SQL transaction statements for this. - Ryan: the atomic operations that we are currently targeting with the TableCatalog extensions are single statements, like CREATE TABLE AS SELECT. Transaction statements (e.g., BEGIN) are for multi-statement transactions and are out of scope. - Ryan: Because the expected behavior of the commands (CTAS, RTAS) is that atomic, Spark should use always use atomic implementations if they are available. No need for a user to opt in. - Matt: What should REPLACE TABLE do if transactions are not supported? If the write fails, the table would be deleted - Ryan: REPLACE is a combination of DROP TABLE and CREATE TABLE AS SELECT. By using it, user is signaling that if a combined operation is possible, Spark should use it. So REPLACE TABLE signals intent to drop and it is the right thing to drop the table if an atomic replace is not supported. - There was also some confusion about whether IF EXISTS should be supported. The consensus was that REPLACE TABLE AS SELECT is expected to be idempotent and should not fail if the target table does not exist. - Moving DSv2 to catalyst - skipped because Wenchen did not attend - Catalogs and table resolution: - Ryan: Table resolution with catalogs is getting complicated when namespaces overlap. If an identifier has a catalog, then it is easy to use a v2 catalog. But when the identifier does not have a catalog, there is a namespace overlap between session catalog tables and the default v2 catalog tables. It would be much easier to understand and document if we used a simple rule for precedence. We suggest using session catalog unless the default v2 catalog is defined, then using the v2 catalog by default. - This makes the behavior easy to document and reason about, with few special cases. To guarantee compatibility, we will need a v2 implementation that delegates to session catalog. - Ryan: If there aren’t objections, I’ll raise this on the dev list. We should make a decision there. -- Ryan Blue Software Engineer Netflix
DataSourceV2 sync notes - 12 June 2019
Here are the latest DSv2 sync notes. Please reply with updates or corrections. *Attendees*: Ryan Blue Michael Armbrust Gengliang Wang Matt Cheah John Zhuge *Topics*: Wenchen’s reorganization proposal Problems with TableProvider - property map isn’t sufficient New PRs: - ReplaceTable: https://github.com/apache/spark/pull/24798 - V2 Table Resolution: https://github.com/apache/spark/pull/24741 - V2 Session Catalog: https://github.com/apache/spark/pull/24768 *Discussion*: - Wenchen’s organization proposal - Ryan: Wenchen proposed using `org.apache.spark.sql.connector.{catalog, expressions, read, write, extensions} - Ryan: I’m not sure we need extensions, but otherwise it looks good to me - Matt: This is in the catalyst module, right? - Ryan: Right. The API is in catalyst. The extensions package would be used for any parts that need to be in SQL, but hopefully there aren’t any. - Consensus was to go with the proposed organization - Problems with TableProvider: - Gengliang: CREATE TABLE with an ORC v2 table can’t report its schema because there are no files - Ryan: We hit this when trying to use ORC in SQL unit tests for v2. The problem is that the source can’t be passed the schema and other information - Gengliang: Schema could be passed using the userSpecifiedSchema arg - Ryan: The user schema is for cases where the data lacks specific types and a user supplies them, like CSV. I don’t think it makes sense to reuse that to pass the schema from the catalog - Ryan: Other table metadata should be passed as well, like partitioning, so sources don’t infer it. I think this requires some thought. Anyone want to volunteer? - No one volunteered to fix the problem - ReplaceTable PR - Matt: Needs another update after comments, but about ready - Ryan: I agree it is almost ready to commit. I should point out that this includes a default implementation that will leave a table deleted if the write fails. I think this is expected behavior because REPLACE is a DROP combined with CTAS - Michael: Sources should be able to opt out of that behavior - Ryan: We want to ensure consistent behavior across sources - Resolution: sources can implement the staging and throw an exception if they choose to opt out - V2 table resolution: - John: this should be ready to go, only minor comments from Dongjoon left - This was merged the next day - V2 session catalog - Ryan: When testing, we realized that if a default catalog is used for v2 sources (like ORC v2) then you can run CREATE TABLE, which goes to some v2 catalog, but then you can’t load the same table using the same name because the session catalog doesn’t have it. - Ryan: To fix this, we need a v2 catalog that delegates to the session catalog. This should be used for all v2 operations when the session catalog can’t be used. - Ryan: Then the v2 default catalog should be used instead of the session catalog when it is set. This provides a smooth transition from the session catalog to v2 catalogs. - Gengliang: another topic: decimals - Gengliang: v2 doesn’t insert unsafe casts, so literals in SQL cannot be inserted to double/float columns - Michael: Shouldn’t queries use decimal literals so that floating point literals can be floats? What do other databases do? - Matt: is this a v2 problem? - Ryan: this is not specific to v2 and was discovered when converting v1 to use the v2 output rules - Ryan: we could add a new decimal type that doesn’t lose data but is allowed to be cast because it can only be used for literals where the intended type is unknown. There is precedent for this in the parser with Hive char and varchar types. - Conclusion: This isn’t really a v2 problem - Michael: Any work so far on MERGE INTO? - Ryan: Not yet, but feel free to make a proposal and start working - Ryan: Do you also need to pass extra metadata with each row? - Michael: No, this should be delegated to the source - Matt: That would be operator push-down - Ryan: I agree, that’s operator push-down. It would be great to hear how that would work, but I think MERGE INTO should have a default implementation. It should be supported across sources instead of in just one so we have a reference implementation. - Michael: Having only a reference implementation was the problem with v1. The behavior should be written down in a spec. Hive has a reasonable implementation to follow. - Ryan: Yes, but it is still valuable to have a reference implementation. And of course a spec is needed. - Matt: what does the roadmap look like for finishing in time for Spark 3.0
Re: [VOTE][SPARK-25299] SPIP: Shuffle Storage API
+1 (non-binding) On Sun, Jun 16, 2019 at 11:11 PM Dongjoon Hyun wrote: > +1 > > Bests, > Dongjoon. > > > On Sun, Jun 16, 2019 at 9:41 PM Saisai Shao > wrote: > >> +1 (binding) >> >> Thanks >> Saisai >> >> Imran Rashid 于2019年6月15日周六 上午3:46写道: >> >>> +1 (binding) >>> >>> I think this is a really important feature for spark. >>> >>> First, there is already a lot of interest in alternative shuffle storage >>> in the community. There is already a lot of interest in alternative >>> shuffle storage, from dynamic allocation in kubernetes, to even just >>> improving stability in standard on-premise use of Spark. However, they're >>> often stuck doing this in forks of Spark, and in ways that are not >>> maintainable (because they copy-paste many spark internals) or are >>> incorrect (for not correctly handling speculative execution & stage >>> retries). >>> >>> Second, I think the specific proposal is good for finding the right >>> balance between flexibility and too much complexity, to allow incremental >>> improvements. A lot of work has been put into this already to try to >>> figure out which pieces are essential to make alternative shuffle storage >>> implementations feasible. >>> >>> Of course, that means it doesn't include everything imaginable; some >>> things still aren't supported, and some will still choose to use the older >>> ShuffleManager api to give total control over all of shuffle. But we know >>> there are a reasonable set of things which can be implemented behind the >>> api as the first step, and it can continue to evolve. >>> >>> On Fri, Jun 14, 2019 at 12:13 PM Ilan Filonenko >>> wrote: >>> >>>> +1 (non-binding). This API is versatile and flexible enough to handle >>>> Bloomberg's internal use-cases. The ability for us to vary implementation >>>> strategies is quite appealing. It is also worth to note the minimal changes >>>> to Spark core in order to make it work. This is a very much needed addition >>>> within the Spark shuffle story. >>>> >>>> On Fri, Jun 14, 2019 at 9:59 AM bo yang wrote: >>>> >>>>> +1 This is great work, allowing plugin of different sort shuffle >>>>> write/read implementation! Also great to see it retain the current Spark >>>>> configuration >>>>> (spark.shuffle.manager=org.apache.spark.shuffle.YourShuffleManagerImpl). >>>>> >>>>> >>>>> On Thu, Jun 13, 2019 at 2:58 PM Matt Cheah >>>>> wrote: >>>>> >>>>>> Hi everyone, >>>>>> >>>>>> >>>>>> >>>>>> I would like to call a vote for the SPIP for SPARK-25299 >>>>>> <https://issues.apache.org/jira/browse/SPARK-25299>, which proposes >>>>>> to introduce a pluggable storage API for temporary shuffle data. >>>>>> >>>>>> >>>>>> >>>>>> You may find the SPIP document here >>>>>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit> >>>>>> . >>>>>> >>>>>> >>>>>> >>>>>> The discussion thread for the SPIP was conducted here >>>>>> <https://lists.apache.org/thread.html/2fe82b6b86daadb1d2edaef66a2d1c4dd2f45449656098ee38c50079@%3Cdev.spark.apache.org%3E> >>>>>> . >>>>>> >>>>>> >>>>>> >>>>>> Please vote on whether or not this proposal is agreeable to you. >>>>>> >>>>>> >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> >>>>>> -Matt Cheah >>>>>> >>>>> -- Ryan Blue Software Engineer Netflix
Re: DSv1 removal
Hi Gabor, First, a little context... one of the goals of DSv2 is to standardize the behavior of SQL operations in Spark. For example, running CTAS when a table exists will fail, not take some action depending on what the source chooses, like drop & CTAS, inserting, or failing. Unfortunately, this means that DSv1 can't be easily replaced because it has behavior differences between sources. In addition, we're not really sure how DSv1 works in all cases -- it really depends on what seemed reasonable to authors at the time. For example, we don't have a good understanding of how file-based tables behave (those not backed by a Metastore). There are also changes that we know are breaking and are okay with, like only inserting safe casts when writing with v2. Because of this, we can't just replace v1 with v2 transparently, so the plan is to allow deployments to migrate to v2 in stages. Here's the plan: 1. Use v1 by default so all existing queries work as they do today for identifiers like `db.table` 2. Allow users to add additional v2 catalogs that will be used when identifiers specifically start with one, like `test_catalog.db.table` 3. Add a v2 catalog that delegates to the session catalog, so that v2 read/write implementations can be used, but are stored just like v1 tables in the session catalog 4. Add a setting to use a v2 catalog as the default. Setting this would use a v2 catalog for all identifiers without a catalog, like `db.table` 5. Add a way for a v2 catalog to return a table that gets converted to v1. This is what `CatalogTableAsV2` does in #24768 <https://github.com/apache/spark/pull/24768>. PR #24768 <https://github.com/apache/spark/pull/24768> implements the rest of these changes. Specifically, we initially used the default catalog for v2 sources, but that causes namespace problems, so we need the v2 session catalog (point #3) as the default when there is no default v2 catalog. I hope that answers your question. If not, I'm happy to answer follow-ups and we can add this as a topic in the next v2 sync on Wednesday. I'm also planning on talking about metadata columns or function push-down from the Kafka v2 PR at that sync, so you may want to attend. rb On Thu, Jun 20, 2019 at 4:45 AM Gabor Somogyi wrote: > Hi All, > > I've taken a look at the code and docs to find out when DSv1 sources has > to be removed (in case of DSv2 replacement is implemented). After some > digging I've found DSv1 sources which are already removed but in some cases > v1 and v2 still exists in parallel. > > Can somebody please tell me what's the overall plan in this area? > > BR, > G > > -- Ryan Blue Software Engineer Netflix
Re: Timeline for Spark 3.0
I think that with a few reviews, we could have a minimally working DSv2 SQL API ready in a couple weeks. We have PRs ready for the last of the major items, like ALTER TABLE, INSERT INTO, REPLACE TABLE, and a v2 catalog interface for the session catalog. Mainly, we're just waiting for final reviews on those and then I would be comfortable moving to QA for 3.0. The built-in file sources unfortunately have some work to do to work with SQL, so I'm not including those in that statement. That would also not include all of the non-essential SQL statements, like SHOW TABLES or CREATE TABLE LIKE. Most of the non-essential SQL could be done fairly quickly, if anyone has time! On Fri, Jun 28, 2019 at 4:53 PM Sean Owen wrote: > That's a good question. Although we had penciled in 'middle of the > year' I don't think we're in sight of a QA phase just yet, as I > believe some key items are still in progress. I'm thinking of the Hive > update, and DS v2 work (?). > > I'm also curious to hear what broad TODOs people see for 3.0? we > probably need to start focusing them down and/or deferring some parts, > in order to get a 3.0 release out anytime soon. Is that feasible? I > have a guess that it won't end up being released this quarter, but, > just a guess. > > On Fri, Jun 28, 2019 at 5:14 PM Long, Andrew > wrote: > > > > Hey Friends, > > > > > > > > Is there a timeline for spark 3.0 in terms of the first RC and final > release? > > > > > > > > Cheers Andrew > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- Ryan Blue Software Engineer Netflix
DSv2 sync notes - 26 June 2019
Here are my notes from this week’s sync. *Attendees*: Ryan Blue John Zhuge Dale Richardson Gabor Somogyi Matt Cheah Yifei Huang Xin Ren Jose Torres Gengliang Wang Kevin Yu *Topics*: - Metadata columns or function push-down for Kafka v2 source - Open PRs - REPLACE TABLE implementation: https://github.com/apache/spark/pull/24798 - Add v2 AlterTable: https://github.com/apache/spark/pull/24937 - Add v2 SessionCatalog: https://github.com/apache/spark/pull/24768 - SupportsNamespaces PR: https://github.com/apache/spark/pull/24560 - Remaining SQL statements to implement - IF NOT EXISTS for INSERT OVERWRITE - V2 file source compatibility *Discussion*: - Metadata columns or function push-down for Kafka v2 source - Ryan: Kafka v1 source has more read columns than write columns. This is to expose metadata like partition, offset, and timestamp. Those are read columns, but not write columns, which is fine in v1. v2 requires a table schema - Ryan: Two main options to fix this in v2: add metadata columns like Presto’s $file or add function push-down similar to Spark’s input_file_name(). Metadata columns require less work (expose additional columns) but functions are more flexible (can call modified_time(col1)) - Gabor: That’s a fair summary - Jose: the problem with input_file_name() is that it can be called anywhere, but is only valid in the context of a projection. After a group by, it returns empty string, which is odd. - Ryan: Couldn’t we handle that case using push-down? It is a function defined by a source that can only be run by pushing it down. It doesn’t exist after a group by, so analysis would fail if it were used in that context, just like columns don’t exist in the group by context unless they were in the grouping expression or created by aggregates. - Jose: That would work - Ryan: The metadata column approach takes less work, so I think we should do that unless someone has the time to drive the function push-down option. - Gabor: this is not required to move to v2. Currently working around this by not validating the schema. PR: https://github.com/apache/spark/pull/24738 - Mostly consensus around using metadata column approach. - REPLACE TABLE PR: - Matt: this is mostly ready, just waiting for final reviews - AlterTable PR: - Gengliang: should this handle complex updates, like replacing a struct with a different struct? - Ryan: You’re right, that doesn’t make sense. I’ll update the PR [Note: done] - V2 session catalog PR: - Ryan: We talked about this last time. Any objections? - Jose: No, this is blocking us right now - SupportsNamespaces PR: - Ryan: Please look at this, it blocks remaining SQL statements like SHOW NAMESPACES, etc. - Remaining SQL statements: - Ryan: presented a list of remaining SQL statements that need to be implemented - Important statements (for Spark 3.0): - DESCRIBE [FORMATTED|EXTENDED] [TABLE] ... - REFRESH TABLE ... - SHOW TABLES [IN catalog] [LIKE ...] - USE CATALOG ... to set the default catalog - Other missing SQL statements; most depend on SupportsNamespaces PR: - DESCRIBE [EXTENDED] (NAMESPACE|DATABASE) ... - SHOW (NAMESPACES|DATABASES) [IN catalog] [LIKE ...] - CREATE (NAMESPACE|DATABASE) ... [PROPERTIES (...)] - DROP (NAMESPACE|DATABASE) ... - USE ... [IN catalog] to set current namespace in a catalog - Matt offered to implement DESCRIBE TABLE - IF NOT EXISTS with INSERT INTO - John: This validates that overwrite does not overwrite partitions and is append only. Should this be supported? - Consensus was “why not?” Will add a mix-in trait in a follow-up for sources that choose to implement it - File source compatibility - Ryan: I tried to use built-in sources like Parquet in SQL tests and hit problems. Not being able to pass a schema or table partitioning means that these tables won’t behave right. What is the plan to get these sources working with SQL? - No one has time to work on this - Ryan: I’ll write some tests to at least set out the contract so we know when the built-in sources are ready to be used. -- Ryan Blue Software Engineer Netflix
Re: JDBC connector for DataSourceV2
I'm not aware of a JDBC connector effort. It would be great to have someone build one! On Fri, Jul 12, 2019 at 3:33 PM Shiv Prashant Sood wrote: > Can someone please help understand the current Status of DataSource V2 > based JDBC connector? I see connectors for various file formats in Master, > but can't find a JDBC implementation or related JIRA. > > DatasourceV2 APIs to me look in good shape to attempt a JDBC connector for > READ/WRITE path. > > Thanks & Regards, > Shiv > -- Ryan Blue Software Engineer Netflix
Re: JDBC connector for DataSourceV2
Sounds great! Ping me on the review, I think this will be really valuable. On Fri, Jul 12, 2019 at 6:51 PM Xianyin Xin wrote: > If there’s nobody working on that, I’d like to contribute. > > > > Loop in @Gengliang Wang. > > > > Xianyin > > > > *From: *Ryan Blue > *Reply-To: * > *Date: *Saturday, July 13, 2019 at 6:54 AM > *To: *Shiv Prashant Sood > *Cc: *Spark Dev List > *Subject: *Re: JDBC connector for DataSourceV2 > > > > I'm not aware of a JDBC connector effort. It would be great to have > someone build one! > > > > On Fri, Jul 12, 2019 at 3:33 PM Shiv Prashant Sood > wrote: > > Can someone please help understand the current Status of DataSource V2 > based JDBC connector? I see connectors for various file formats in Master, > but can't find a JDBC implementation or related JIRA. > > > > DatasourceV2 APIs to me look in good shape to attempt a JDBC connector for > READ/WRITE path. > > Thanks & Regards, > > Shiv > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix > -- Ryan Blue Software Engineer Netflix
DataSourceV2 sync notes - 10 July 2019
Here are my notes from the last sync. If you’d like to be added to the invite or have topics, please let me know. *Attendees*: Ryan Blue Matt Cheah Yifei Huang Jose Torres Burak Yavuz Gengliang Wang Michael Artz Russel Spitzer *Topics*: - Existing PRs - V2 session catalog: https://github.com/apache/spark/pull/24768 - REPLACE and RTAS: https://github.com/apache/spark/pull/24798 - DESCRIBE TABLE: https://github.com/apache/spark/pull/25040 - ALTER TABLE: https://github.com/apache/spark/pull/24937 - INSERT INTO: https://github.com/apache/spark/pull/24832 - Stats integration - CTAS and DataFrameWriter behavior *Discussion*: - ALTER TABLE PR is ready to commit (and was after the sync) - REPLACE and RTAS PR: waiting on more reviews - INSERT INTO PR: Ryan will review - DESCRIBE TABLE has test failures, Matt will fix - V2 session catalog: - How will v2 catalog be configured? - Ryan: This is up for discussion because it currently uses a table property. I think it needs to be configurable - Burak: Agree that it should be configurable - Ryan: Does this need to be determined now, or can we solve this after getting the functionality in? - Jose: let’s get it in and fix it later - Stats integration: - Matt: has anyone looked at stats integration? What needs to be done? - Ryan: stats are part of the Scan API. Configure a scan with ScanBuilder and then get stats from it. The problem is that this happens when converting to physical plan, after the optimizer. But the optimizer determines what gets broadcasted. A work-around Netflix uses is to run push down in the stats code. This runs push-down twice and was rejected from Spark, but is important for performance. We should add a property to enable this. - Ryan: The larger problem is that stats are used in the optimizer, but push-down happens when converting to physical plan. This is also related to our earlier discussions about when join types are chosen. Fixing this is a big project - CTAS and DataFrameWriter behavior - Burak: DataFrameWriter uses CTAS where it shouldn’t. It is difficult to predict v1 behavior - Ryan: Agree, v1 DataFrameWriter does not have clear behavior. We suggest a replacement with clear verbs for each SQL action: append/insert, overwrite, overwriteDynamic, create (table), replace (table) - Ryan: Prototype available here: https://gist.github.com/rdblue/6bc140a575fdf266beb2710ad9dbed8f -- Ryan Blue Software Engineer Netflix
Re: DataSourceV2 sync notes - 10 July 2019
I agree that the long-term solution is much farther away, but I'm not sure it is a good idea to do this in the optimizer. Maybe we could find a good way to do it, but the initial complication required before we moved to push-down to the conversion to physical plan was really bad. Plus, this has been outstanding for probably a year now, so I am not confident that the long-term solution would be a priority -- it seems to me that band-aid solutions persist for far too long. On Tue, Jul 23, 2019 at 4:30 AM Wenchen Fan wrote: > Hi Ryan, > > Thanks for summarizing and sending out the meeting notes! Unfortunately, I > missed the last sync, but the topics are really interesting, especially the > stats integration. > > The ideal solution I can think of is to refactor the optimizer/planner and > move all the stats-based optimization to the physical plan phase (or do it > during the planning). This needs a lot of design work and I'm not sure if > we can finish it in the near future. > > Alternatively, we can do the operator pushdown at logical plan phase via > the optimizer. This is not ideal but I think is a better workaround than > doing pushdown twice. The parquet nested column pruning is also done at the > logical plan phase, so I think there are no serious problems if we do > operator pushdown at the logical plan phase. > > This is only about the internal implementation so we can fix it at any > time. But this may hurt data source v2 performance a lot and we'd better > fix it sooner rather than later. > > > On Sat, Jul 20, 2019 at 8:20 AM Ryan Blue > wrote: > >> Here are my notes from the last sync. If you’d like to be added to the >> invite or have topics, please let me know. >> >> *Attendees*: >> >> Ryan Blue >> Matt Cheah >> Yifei Huang >> Jose Torres >> Burak Yavuz >> Gengliang Wang >> Michael Artz >> Russel Spitzer >> >> *Topics*: >> >>- Existing PRs >> - V2 session catalog: https://github.com/apache/spark/pull/24768 >> - REPLACE and RTAS: https://github.com/apache/spark/pull/24798 >> - DESCRIBE TABLE: https://github.com/apache/spark/pull/25040 >> - ALTER TABLE: https://github.com/apache/spark/pull/24937 >> - INSERT INTO: https://github.com/apache/spark/pull/24832 >>- Stats integration >>- CTAS and DataFrameWriter behavior >> >> *Discussion*: >> >>- ALTER TABLE PR is ready to commit (and was after the sync) >>- REPLACE and RTAS PR: waiting on more reviews >>- INSERT INTO PR: Ryan will review >>- DESCRIBE TABLE has test failures, Matt will fix >>- V2 session catalog: >> - How will v2 catalog be configured? >> - Ryan: This is up for discussion because it currently uses a >> table property. I think it needs to be configurable >> - Burak: Agree that it should be configurable >> - Ryan: Does this need to be determined now, or can we solve this >> after getting the functionality in? >> - Jose: let’s get it in and fix it later >>- Stats integration: >> - Matt: has anyone looked at stats integration? What needs to be >> done? >> - Ryan: stats are part of the Scan API. Configure a scan with >> ScanBuilder and then get stats from it. The problem is that this >> happens >> when converting to physical plan, after the optimizer. But the >> optimizer >> determines what gets broadcasted. A work-around Netflix uses is to run >> push >> down in the stats code. This runs push-down twice and was rejected from >> Spark, but is important for performance. We should add a property to >> enable >> this. >> - Ryan: The larger problem is that stats are used in the >> optimizer, but push-down happens when converting to physical plan. >> This is >> also related to our earlier discussions about when join types are >> chosen. >> Fixing this is a big project >>- CTAS and DataFrameWriter behavior >> - Burak: DataFrameWriter uses CTAS where it shouldn’t. It is >> difficult to predict v1 behavior >> - Ryan: Agree, v1 DataFrameWriter does not have clear behavior. We >> suggest a replacement with clear verbs for each SQL action: >> append/insert, >> overwrite, overwriteDynamic, create (table), replace (table) >> - Ryan: Prototype available here: >> https://gist.github.com/rdblue/6bc140a575fdf266beb2710ad9dbed8f >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > -- Ryan Blue Software Engineer Netflix
Re: [Discuss] Follow ANSI SQL on table insertion
I don’t think this is a good idea. Following the ANSI standard is usually fine, but here it would *silently corrupt data*. >From your proposal doc, ANSI allows implicitly casting from long to int (any numeric type to any other numeric type) and inserts NULL when a value overflows. That would drop data values and is not safe. Fixing the silent corruption by adding a runtime exception is not a good option, either. That puts off the problem until much of the job has completed, instead of catching the error at analysis time. It is better to catch this earlier during analysis than to run most of a job and then fail. In addition, part of the justification for using the ANSI standard is to avoid breaking existing jobs. But the new behavior is only applied in DataSourceV2, so it won’t affect existing jobs until sources move to v2 and break other behavior anyway. I think that the correct solution is to go with the existing validation rules that require explicit casts to truncate values. That still leaves the use case that motivated this proposal, which is that floating point literals are parsed as decimals and fail simple insert statements. We already came up with two alternatives to fix that problem in the DSv2 sync and I think it is a better idea to go with one of those instead of “fixing” Spark in a way that will corrupt data or cause runtime failures. On Thu, Jul 25, 2019 at 9:11 AM Wenchen Fan wrote: > I have heard about many complaints about the old table insertion behavior. > Blindly casting everything will leak the user mistake to a late stage of > the data pipeline, and make it very hard to debug. When a user writes > string values to an int column, it's probably a mistake and the columns are > misordered in the INSERT statement. We should fail the query earlier and > ask users to fix the mistake. > > In the meanwhile, I agree that the new table insertion behavior we > introduced for Data Source V2 is too strict. It may fail valid queries > unexpectedly. > > In general, I support the direction of following the ANSI SQL standard. > But I'd like to do it with 2 steps: > 1. only add cast when the assignment rule is satisfied. This should be the > default behavior and we should provide a legacy config to restore to the > old behavior. > 2. fail the cast operation at runtime if overflow happens. AFAIK Marco > Gaido is working on it already. This will have a config as well and by > default we still return null. > > After doing this, the default behavior will be slightly different from the > SQL standard (cast can return null), and users can turn on the ANSI mode to > fully follow the SQL standard. This is much better than before and should > prevent a lot of user mistakes. It's also a reasonable choice to me to not > throw exceptions at runtime by default, as it's usually bad for > long-running jobs. > > Thanks, > Wenchen > > On Thu, Jul 25, 2019 at 11:37 PM Gengliang Wang < > gengliang.w...@databricks.com> wrote: > >> Hi everyone, >> >> I would like to discuss the table insertion behavior of Spark. In the >> current data source V2, only UpCast is allowed for table insertion. I think >> following ANSI SQL is a better idea. >> For more information, please read the Discuss: Follow ANSI SQL on table >> insertion >> <https://docs.google.com/document/d/1b9nnWWbKVDRp7lpzhQS1buv1_lDzWIZY2ApFs5rBcGI/edit?usp=sharing> >> Please let me know if you have any thoughts on this. >> >> Regards, >> Gengliang >> > -- Ryan Blue Software Engineer Netflix
Re: [Discuss] Follow ANSI SQL on table insertion
Another important aspect of this problem is whether a user is conscious of the cast that is inserted by Spark. Most of the time, users are not aware of casts that are implicitly inserted, and that means replacing values with NULL would be a very surprising behavior. The impact of this choice affects users disproportionately: someone that knows about inserted casts is mildly annoyed when required to add an explicit cast, but a user that doesn't know an inserted cast is dropping values is very negatively impacted and may not discover the problem until it is too late. That disproportionate impact is what makes me think that it is not okay for Spark to silently replace values with NULL, even if that's what ANSI would allow. Other databases also have the ability to reject null values in tables, providing extra insurance against the problem, but Spark doesn't have required columns in its DDL. So while I agree with Reynold that there is a trade-off, I think that trade-off makes the choice between a runtime error and an analysis-time error. I'm okay with either a runtime error as the default or an analysis error as the default, as long as there is a setting that allows me to choose one for my deployment. On Wed, Jul 31, 2019 at 10:39 AM Reynold Xin wrote: > OK to push back: "disagreeing with the premise that we can afford to not > be maximal on standard 3. The correctness of the data is non-negotiable, > and whatever solution we settle on cannot silently adjust the user’s data > under any circumstances." > > This blanket statement sounds great on surface, but there are a lot of > subtleties. "Correctness" is absolutely important, but engineering/prod > development are often about tradeoffs, and the industry has consistently > traded correctness for performance or convenience, e.g. overflow checks, > null pointers, consistency in databases ... > > It all depends on the use cases and to what degree use cases can tolerate. > For example, while I want my data engineering production pipeline to throw > any error when the data doesn't match my expectations (e.g. type widening, > overflow), if I'm doing some quick and dirty data science, I don't want the > job to just fail due to outliers. > > > > On Wed, Jul 31, 2019 at 10:13 AM, Matt Cheah wrote: > >> Sorry I meant the current behavior for V2, which fails the query >> compilation if the cast is not safe. >> >> >> >> Agreed that a separate discussion about overflow might be warranted. I’m >> surprised we don’t throw an error now, but it might be warranted to do so. >> >> >> >> -Matt Cheah >> >> >> >> *From: *Reynold Xin >> *Date: *Wednesday, July 31, 2019 at 9:58 AM >> *To: *Matt Cheah >> *Cc: *Russell Spitzer , Takeshi Yamamuro < >> linguin@gmail.com>, Gengliang Wang , >> Ryan Blue , Spark dev list , >> Hyukjin Kwon , Wenchen Fan >> *Subject: *Re: [Discuss] Follow ANSI SQL on table insertion >> >> >> >> Matt what do you mean by maximizing 3, while allowing not throwing errors >> when any operations overflow? Those two seem contradicting. >> >> >> >> >> >> On Wed, Jul 31, 2019 at 9:55 AM, Matt Cheah wrote: >> >> I’m -1, simply from disagreeing with the premise that we can afford to >> not be maximal on standard 3. The correctness of the data is >> non-negotiable, and whatever solution we settle on cannot silently adjust >> the user’s data under any circumstances. >> >> >> >> I think the existing behavior is fine, or perhaps the behavior can be >> flagged by the destination writer at write time. >> >> >> >> -Matt Cheah >> >> >> >> *From: *Hyukjin Kwon >> *Date: *Monday, July 29, 2019 at 11:33 PM >> *To: *Wenchen Fan >> *Cc: *Russell Spitzer , Takeshi Yamamuro < >> linguin@gmail.com>, Gengliang Wang , >> Ryan Blue , Spark dev list >> *Subject: *Re: [Discuss] Follow ANSI SQL on table insertion >> >> >> >> From my look, +1 on the proposal, considering ASCI and other DBMSes in >> general. >> >> >> >> 2019년 7월 30일 (화) 오후 3:21, Wenchen Fan 님이 작성: >> >> We can add a config for a certain behavior if it makes sense, but the >> most important thing we want to reach an agreement here is: what should be >> the default behavior? >> >> >> >> Let's explore the solution space of table insertion behavior first: >> >> At compile time, >> >> 1. always add cast >> >> 2. add cast following the ASNI SQL store assignment rule (e.g. string to >> int is forbid
Re: DataSourceV2 : Transactional Write support
> What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic) I think that this is the approach that other systems (maybe sqoop?) have taken. Insert into independent temporary tables, which can be done quickly. Then for the final commit operation, union and insert into the final table. In a lot of cases, JDBC databases can do that quickly as well because the data is already on disk and just needs to added to the final table. On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim wrote: > I asked similar question for end-to-end exactly-once with Kafka, and > you're correct distributed transaction is not supported. Introducing > distributed transaction like "two-phase commit" requires huge change on > Spark codebase and the feedback was not positive. > > What you could try instead is intermediate output: inserting into temporal > table in executors, and move inserted records to the final table in driver > (must be atomic). > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood > wrote: > >> All, >> >> I understood that DataSourceV2 supports Transactional write and wanted >> to implement that in JDBC DataSource V2 connector ( PR#25211 >> <https://github.com/apache/spark/pull/25211> ). >> >> Don't see how this is feasible for JDBC based connector. The FW suggest >> that EXECUTOR send a commit message to DRIVER, and actual commit should >> only be done by DRIVER after receiving all commit confirmations. This will >> not work for JDBC as commits have to happen on the JDBC Connection which >> is maintained by the EXECUTORS and JDBCConnection is not serializable that >> it can be sent to the DRIVER. >> >> Am i right in thinking that this cannot be supported for JDBC? My goal is >> to either fully write or roll back the dataframe write operation. >> >> Thanks in advance for your help. >> >> Regards, >> Shiv >> > > > -- > Name : Jungtaek Lim > Blog : http://medium.com/@heartsavior > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior > -- Ryan Blue Software Engineer Netflix
Re: [Discuss] Follow ANSI SQL on table insertion
Wenchen, I don’t think we agree on what “strict mode” would mean. Marco is talking about strict mode as an extension of the flag for throwing exceptions on overflow for decimal operations. That is not ANSI SQL mode. Also, we need more than ANSI SQL and runtime failure modes. For the motivating problem of validating a write, we need a way to preserve the analysis-time failure if types don’t match. That, combined with a runtime strict mode, is the option that fails fast and guarantees data isn’t lost. I agree that adding try_cast or safe methods is a good idea. So here’s a revised set of steps: 1. Finish ANSI SQL mode - but do not make it the default because it is not safe without an option to enable strict mode. 2. Add strict mode for runtime calculations and turn it on by default 3. Add a flag to control analysis time vs runtime failures (using strict mode or ANSI SQL mode) for v2 writes The choice of whether runtime or analysis time failures should be the default for v2 writes is worth a VOTE on this list. Once we agree on what modes and options should be available, we can call a vote to build consensus around a reasonable set of defaults, given that there are a lot of varying opinions on this thread. On Mon, Aug 5, 2019 at 12:49 AM Wenchen Fan wrote: > I think we need to clarify one thing before further discussion: the > proposal is for the next release but not a long term solution. > > IMO the long term solution should be: completely follow SQL standard > (store assignment rule + runtime exception), and provide a variant of > functions that can return null instead of runtime exception. For example, > the TRY_CAST in SQL server > <https://docs.microsoft.com/en-us/sql/t-sql/functions/try-cast-transact-sql?view=sql-server-2017>, > or a more general SAFE prefix of functions in Big Query > <https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators> > . > > This proposal is the first step to move forward to the long term solution: > follow the SQL standard store assignment rule. It can help us prevent some > table insertion queries that are very likely to fail, at compile time. > > The following steps in my mind are: > * finish the "strict mode"(ANSI SQL mode). It now applies to arithmetic > functions, but we need to apply it to more places like cast. > * introduce the safe version of functions. The safe version always returns > null for invalid input, no matter the strict mode is on or not. We need > some additional work to educate users to use the safe version of the > functions if they rely on the return null behavior. > * turn on the strict mode by default. > > Hopefully we can finish it soon, in Spark 3.x. > > Thanks, > Wenchen > > On Sat, Aug 3, 2019 at 7:07 AM Matt Cheah wrote: > >> *I agree that having both modes and let the user choose the one he/she >> wants is the best option (I don't see big arguments on this honestly). Once >> we have this, I don't see big differences on what is the default. What - I >> think - we still have to work on, is to go ahead with the "strict mode" >> work and provide a more convenient way for users to switch among the 2 >> options. I mean: currently we have one flag for throwing exception on >> overflow for operations on decimals, one for doing the same for operations >> on other data types and probably going ahead we will have more. I think in >> the end we will need to collect them all under an "umbrella" flag which >> lets the user simply switch between strict and non-strict mode. I also >> think that we will need to document this very well and give it particular >> attention in our docs, maybe with a dedicated section, in order to provide >> enough visibility on it to end users.* >> >> >> >> I’m +1 on adding a strict mode flag this way, but I’m undecided on >> whether or not we want a separate flag for each of the arithmetic overflow >> situations that could produce invalid results. My intuition is yes, because >> different users have different levels of tolerance for different kinds of >> errors. I’d expect these sorts of configurations to be set up at an >> infrastructure level, e.g. to maintain consistent standards throughout a >> whole organization. >> >> >> >> *From: *Gengliang Wang >> *Date: *Thursday, August 1, 2019 at 3:07 AM >> *To: *Marco Gaido >> *Cc: *Wenchen Fan , Hyukjin Kwon < >> gurwls...@gmail.com>, Russell Spitzer , Ryan >> Blue , Reynold Xin , Matt Cheah < >> mch...@palantir.com>, Takeshi Yamamuro , Spark >> dev list >> *Subject: *Re: [Discuss] Follow ANSI SQL on table insertion >> >> >> >> Hi all,
DataSourceV2 sync notes - 24 July 2019
Here are my notes from the last DSv2 sync. Sorry it's a bit late! *Attendees*: Ryan Blue John Zhuge Raynmond McCollum Terry Kim Gengliang Wang Jose Torres Wenchen Fan Priyanka Gomatam Matt Cheah Russel Spitzer Burak Yavuz *Topics*: - Check in on blockers - Remove SaveMode - Reorganize code - waiting for INSERT INTO? - Write docs - should be done after 3.0 branching - Open PRs - V2 session catalog config: https://github.com/apache/spark/pull/25104 - DESCRIBE TABLE: https://github.com/apache/spark/pull/25040 - INSERT INTO: https://github.com/apache/spark/pull/24832 - SupportsNamespaces: https://github.com/apache/spark/pull/24560 - SHOW TABLES: https://github.com/apache/spark/pull/25247 - DELETE FROM: https://github.com/apache/spark/pull/21308 and https://github.com/apache/spark/pull/25115 - DELETE FROM approach - Filter push-down and stats - move to optimizer? - Use v2 ALTER TABLE implementations for v1 tables - CatalogPlugin changes - Reuse the existing Parquet readers? *Discussion*: - Blockers - Remove SaveMode from file sources: Blocked by TableProvider/CatalogPlugin changes. Doesn’t work with all of the using clauses from v1, like JDBC. Working on a CatalogPlugin fix. - Reorganize packages: Blocked by outstanding INSERT INTO PRs - Docs: Ryan: docs can be written after branching, so focus should be on stability right now - Any other blockers? Please send them to Ryan to track - V2 session catalog config PR: - Wenchen: this will be included in CatalogPlugin changes - DESCRIBE TABLE PR: - Matt: waiting for review - Burak: partitioning is strange, uses “Part 0” instead of names - Ryan: there are no names for transform partitions (identity partitions use column names) - Conclusion: not a big problem since there is no required schema, we can update later if better ideas come up - INSERT INTO PR: - Ryan: ready for another review, DataFrameWriter.insertInto PR will follow - SupportsNamespaces PR: - Ryan: ready for another review - SHOW TABLES PR: - Terry: there are open questions: what is the current database for v2? - Ryan: there should be a current namespace in the SessionState. This could be per catalog? - Conclusion: do not track current namespace per catalog. Reset to a catalog default when current catalog changes - Ryan: will add SupportsNamespace method for default namespace to initialize current. - Burak: USE foo.bar could set both - What is SupportsNamespaces is not implemented? Default to Seq.empty - Terry: should listing methods support search patterns? - Ryan: this adds complexity that should be handled by Spark instead of complicating the API. There isn’t a performance need to push this down because we don’t expect high cardinality for a namespace level. - Conclusion: implement in SHOW TABLES exec - Terry: how should temporary tables be handled? - Wenchen: temporary table is an alias for temporary view. SHOW TABLES does list temporary views, v2 should implement the same behavior. - Terry: support EXTENDED? - Ryan: This can be done later. - DELETE FROM PR: - Wenchen: DELETE FROM just passes filters to the data source to delete - Ryan: Instead of a complicated builder, let’s solve just the simple case (filters) and not the row-level delete case. If we do that, then we can use a simple SupportsDelete interface and put off row-level delete design - Consensus was to add a SupportsDelete interface for Table and not a new builder - Stats push-down fix: - Ryan: briefly looked into it and this can probably be done earlier, in the optimizer by creating a scan early and a special logical plan to wrap a scan. This isn’t a good long-term solution but would fix stats for the release. Write side would not change. - Ryan will submit a PR with the implementation - Using ALTER TABLE implementations for v1 - Burak: Took a stab at this, but ran into problems. Would be nice if all DDL for v1 were supported through v2 API - DDL doesn’t work with v1 for custom data sources - if the source of truth is not Hive - Matt: v2 should be used to change the source of truth. v1 behavior is to only change the session catalog (e.g., Hive). - Matt: is v1 deprecated? - Wenchen, not until stable - Burak: can’t deprecate yet - Burak: CTAS and RTAS could also call v1 - Ryan: We could build a v2 implementation that calls v1, but only append and read could be supported because v1 overwrite behavior is unreliable across sources. - Ran out of time - Wenchen’s CatalogPlugin changes can be discussed next time - Ryan will follow up
Re: [DISCUSS] ViewCatalog interface for DSv2
Thanks for working on this, John! I'd like to see a more complete write-up of what you're proposing. Without that, I don't think we can have a productive discussion about this. For example, I think you're proposing to keep the view columns to ensure that the same columns are produced by the view every time, based on requirements from the SQL spec. Let's start by stating what those behavior requirements are, so that everyone has the context to understand why your proposal includes the view columns. Similarly, I'd like to know why you're proposing `softwareVersion` in the view definition. On Tue, Aug 13, 2019 at 8:56 AM John Zhuge wrote: > Catalog support has been added to DSv2 along with a table catalog > interface. Here I'd like to propose a view catalog interface, for the > following benefit: > >- Abstraction for view management thus allowing different view backends >- Disassociation of view definition storage from Hive Metastore > > A catalog plugin can be both TableCatalog and ViewCatalog. Resolve an > identifier as view first then table. > > More details in SPIP and PR if we decide to proceed. Here is a quick > glance at the API: > > ViewCatalog interface: > >- loadView >- listViews >- createView >- deleteView > > View interface: > >- name >- originalSql >- defaultCatalog >- defaultNamespace >- viewColumns >- owner >- createTime >- softwareVersion >- options (map) > > ViewColumn interface: > >- name >- type > > > Thanks, > John Zhuge > -- Ryan Blue Software Engineer Netflix
DSv2 sync notes - 21 August 2019
Sorry these notes were delayed. Here’s what we talked about in the last DSv2 sync. *Attendees*: Ryan Blue John Zhuge Burak Yavuz Gengliang Wang Terry Kim Wenchen Fan Xin Ren Srabasti Banerjee Priyanka Gomatam *Topics*: - Follow up on renaming append to insert in v2 API - Changes to CatalogPlugin for v2 session catalog implementations - Check on blockers - Remove SaveMode - remove special case after file sources are disabled? - Reorganize packages - Open PRs - DataFrameWriterV2: https://github.com/apache/spark/pull/25354 - SHOW TABLES: https://github.com/apache/spark/pull/25247 - https://github.com/apache/spark/pull/25507 *Discussion*: - Insert in DataFrameWriter v2 API: - Ryan: After reviewing the doc that Russel sent <https://en.wikipedia.org/wiki/Merge_(SQL>) last time, it doesn’t look like there is precedent for insert implementing upsert, without an additional clause like ON DUPLICATE KEY UPDATE or ON CONFLICT I think that means that insert should not be used for upsert and it is correct to use the verb “append” in the new API. - Wenchen: Spark already supports INSERT OVERWRITE that has no precedent other than Hive - Ryan: Good point. INSERT OVERWRITE is a partition-level replace. If we think of single-key stores as partitioned by row key, then the dynamic INSERT OVERWRITE behavior is appropriate. - Ryan: One other reason to change “append” to “insert” is to match SQL. Should we consider renaming for consistency with SQL? - Burak: SQL inserts are by position and DataFrameWriterV2 appends are by name. DataFrameWriter (v1) uses position for insertInto, so there is a precedent that insert is by position. - Ryan: I agree with that logic. It makes more sense to use append to distinguish behavior. - Consensus was to keep the “append” verb, but will discuss when Russel is back. - Burak: (Continuing from DataFrameWriterV2 discussion) The v2 writer looks fine other than the partition functions are close to built-in expression functions (year vs years). - Consensus was to use “partitioning.years” for partitioning functions. - Changes to CatalogPlugin for v2 session catalog implementations - Wenchen: this adds a new config for overriding v2 session catalog, and a new abstract class that must be implemented - Ryan: Why a new config? If we intend for a user to be able to override this, then we already have a mechanism to configure it using the “session” catalog name. - Discussion on pros and cons of using a different config, consensus was to use the existing CatalogPlugin config - Ryan: Looks like this uses TableCatalog for the actual API and passes in the built-in V2SessionCatalog. That sounds like a good idea to me, instead of introducing a new API. - Burak: What about databases named “session”? - Ryan: Catalogs take precedence over databases, so the session catalog will be used for “session.table”. - Burak: Sounds like this is going to break existing queries then. - Ryan: I think the general rule that catalogs should take precedence is right. It would be worse to allow users creating databases to break other users’ catalogs — we avoid that problem with catalogs because they are limited to jobs when users create them and are otherwise an administrator option. But, session is a special case because this is Spark building a catalog into all environments… I think that’s a good reason to name it something that we think is unlikely to conflict. - Discussion came up with several alternatives (*session*, built_in, etc) but consensus settled on “spark_catalog”. That’s more descriptive and much less likely to conflict with existing databases. - Remove SaveMode: this was done by Wenchen’s commit that disabled file sources. -- Ryan Blue Software Engineer Netflix
DSv2 sync - 4 September 2019
Here are my notes from the latest sync. Feel free to reply with clarifications if I’ve missed anything. *Attendees*: Ryan Blue John Zhuge Russell Spitzer Matt Cheah Gengliang Wang Priyanka Gomatam Holden Karau *Topics*: - DataFrameWriterV2 insert vs append (recap) - ANSI and strict modes for inserting casts - Separating identifier resolution from table lookup - Open PRs - SHOW NAMESPACES - https://github.com/apache/spark/pull/25601 - DataFrameWriterV2 - https://github.com/apache/spark/pull/25681 - TableProvider API update - https://github.com/apache/spark/pull/25651 - UPDATE - https://github.com/apache/spark/pull/25626 *Discussion*: - DataFrameWriterV2 insert vs append discussion recapped the agreement from last sync - ANSI and strict modes for inserting casts: - Russell: Failure modes are important. ANSI behavior is to fail at runtime, not analysis time. If a cast is allowed, but doesn’t throw an exception at runtime then this can’t be considered ANSI behavior. - Gengliang: ANSI adds the cast - Matt: Sounds like there are two conflicting views of the world. Is the default ANSI behavior to insert a cast that may produce NULL or to fail at runtime? - Ryan: So analysis and runtime behaviors can’t be separate? - Matt: Analysis behavior is influenced by behavior at runtime. Maybe the vote should cover both? - Russell: (linked to the standard) There are 3 steps: if numeric and same type, use the data value. If the value can be rounded or truncated, round or truncate. Otherwise, throw an exception that a value can’t be cast. These are runtime requirements. - Ryan: Another consideration is that we can make Spark more permissive, but can’t make Spark more strict in future releases. - Matt: v1 silently corrupts data - Russell: ANSI is fine, as long as the runtime matches (is ANSI). Don’t tell people it’s ANSI and not do ANSI completely. - Gengliang: people are concerned about long-running jobs failing at the end - Ryan: That’s okay because they can change the defaults: use strict analysis-time validation, or allow casts to produce NULL values. - Matt: As long as this is well documented, it should be fine - Ryan: Can we run tests to find out what exactly the behavior is? - Gengliang: sqlfiddle.com - Russell ran tests in MySQL and Postgres. Both threw runtime failures. - Matt: Let’s move on, but add the runtime behavior to the VOTE - Identifier resolution and table lookup - Ryan: recent changes merged identifier resolution and table lookup together because identifiers owned by the session catalog need to be loaded to find out whether to use v1 or v2 plans. I think this should be separated so that identifier resolution happens independently to ensure that the two separate tasks don’t end up getting done at the same time and over-complicating the analyzer. - SHOW NAMESPACES - Ready for final review - DataFrameWriterV2: - Ryan: Tests failed after passing on the PR. Anyone know why that would happen? - Gengliang: tests failed in maven - Holden: PR validation runs SBT tests - TableProvider API update: skipped because Wenchen didn’t make it - UPDATE support PR - Ryan: There is a PR to add a SQL UPDATE command, but it delegates entirely to the data source, which seems strange. - Matt: What is Spark’s purpose here? Why would Spark parse a SQL statement only to pass it entirely to another engine? - Ryan: It does make sense to do this. If Spark eventually supports MERGE INTO and other row-level operations, then it makes sense to push down the operation to some sources, like JDBC. I just find it backward to add the pushdown API before adding an implementation that handles this inside Spark — pushdown is usually an optimization. - Russell: Would this be safe? Spark retries lots of operations. - Ryan: I think it would be safe because Spark won’t retry top-level operations and this is a single method call. Nothing would get retried. - Ryan: I’ll ask what the PR author’s use case is. Maybe that would help clarify why this is a good idea. -- Ryan Blue Software Engineer Netflix
Re: [VOTE][SPARK-28885] Follow ANSI store assignment rules in table insertion by default
; I'd like to call for a vote on SPARK-28885 >> <https://issues.apache.org/jira/browse/SPARK-28885> "Follow ANSI store >> assignment rules in table insertion by default". >> When inserting a value into a column with the different data type, Spark >> performs type coercion. Currently, we support 3 policies for the type >> coercion rules: ANSI, legacy and strict, which can be set via the option >> "spark.sql.storeAssignmentPolicy": >> 1. ANSI: Spark performs the type coercion as per ANSI SQL. In practice, the >> behavior is mostly the same as PostgreSQL. It disallows certain unreasonable >> type conversions such as converting `string` to `int` and `double` to >> `boolean`. >> 2. Legacy: Spark allows the type coercion as long as it is a valid `Cast`, >> which is very loose. E.g., converting either `string` to `int` or `double` >> to `boolean` is allowed. It is the current behavior in Spark 2.x for >> compatibility with Hive. >> 3. Strict: Spark doesn't allow any possible precision loss or data >> truncation in type coercion, e.g., converting either `double` to `int` or >> `decimal` to `double` is allowed. The rules are originally for Dataset >> encoder. As far as I know, no maintainstream DBMS is using this policy by >> default. >> >> Currently, the V1 data source uses "Legacy" policy by default, while V2 uses >> "Strict". This proposal is to use "ANSI" policy by default for both V1 and >> V2 in Spark 3.0. >> >> There was also a DISCUSS thread "Follow ANSI SQL on table insertion" in the >> dev mailing list. >> >> This vote is open until next Thurs (Sept. 12nd). >> >> [ ] +1: Accept the proposal >> [ ] +0 >> [ ] -1: I don't think this is a good idea because ... >> >> Thank you! >> >> Gengliang >> >> > -- Ryan Blue Software Engineer Netflix
Re: [VOTE] [SPARK-27495] SPIP: Support Stage level resource configuration and scheduling
+1 This is going to be really useful. Thanks for working on it! On Wed, Sep 11, 2019 at 9:38 AM Felix Cheung wrote: > +1 > > -- > *From:* Thomas graves > *Sent:* Wednesday, September 4, 2019 7:24:26 AM > *To:* dev > *Subject:* [VOTE] [SPARK-27495] SPIP: Support Stage level resource > configuration and scheduling > > Hey everyone, > > I'd like to call for a vote on SPARK-27495 SPIP: Support Stage level > resource configuration and scheduling > > This is for supporting stage level resource configuration and > scheduling. The basic idea is to allow the user to specify executor > and task resource requirements for each stage to allow the user to > control the resources required at a finer grain. One good example here > is doing some ETL to preprocess your data in one stage and then feed > that data into an ML algorithm (like tensorflow) that would run as a > separate stage. The ETL could need totally different resource > requirements for the executors/tasks than the ML stage does. > > The text for the SPIP is in the jira description: > > https://issues.apache.org/jira/browse/SPARK-27495 > > I split the API and Design parts into a google doc that is linked to > from the jira. > > This vote is open until next Fri (Sept 13th). > > [ ] +1: Accept the proposal as an official SPIP > [ ] +0 > [ ] -1: I don't think this is a good idea because ... > > I'll start with my +1 > > Thanks, > Tom > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- Ryan Blue Software Engineer Netflix
Re: Thoughts on Spark 3 release, or a preview release
;s: >>> >>>>>>> - DSv2? >>> >>>>>>> - Finishing touches on the Hive, JDK 11 update >>> >>>>>>> >>> >>>>>>> What about considering a preview release earlier, as happened for >>> >>>>>>> Spark 2, to get feedback much earlier than the RC cycle? Could >>> that >>> >>>>>>> even happen ... about now? >>> >>>>>>> >>> >>>>>>> I'm also wondering what a realistic estimate of Spark 3 release >>> is. My >>> >>>>>>> guess is quite early 2020, from here. >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> SPARK-29014 DataSourceV2: Clean up current, default, and session >>> catalog uses >>> >>>>>>> SPARK-28900 Test Pyspark, SparkR on JDK 11 with run-tests >>> >>>>>>> SPARK-28883 Fix a flaky test: ThriftServerQueryTestSuite >>> >>>>>>> SPARK-28717 Update SQL ALTER TABLE RENAME to use TableCatalog >>> API >>> >>>>>>> SPARK-28588 Build a SQL reference doc >>> >>>>>>> SPARK-28629 Capture the missing rules in HiveSessionStateBuilder >>> >>>>>>> SPARK-28684 Hive module support JDK 11 >>> >>>>>>> SPARK-28548 explain() shows wrong result for persisted DataFrames >>> >>>>>>> after some operations >>> >>>>>>> SPARK-28372 Document Spark WEB UI >>> >>>>>>> SPARK-28476 Support ALTER DATABASE SET LOCATION >>> >>>>>>> SPARK-28264 Revisiting Python / pandas UDF >>> >>>>>>> SPARK-28301 fix the behavior of table name resolution with >>> multi-catalog >>> >>>>>>> SPARK-28155 do not leak SaveMode to file source v2 >>> >>>>>>> SPARK-28103 Cannot infer filters from union table with empty >>> local >>> >>>>>>> relation table properly >>> >>>>>>> SPARK-28024 Incorrect numeric values when out of range >>> >>>>>>> SPARK-27936 Support local dependency uploading from --py-files >>> >>>>>>> SPARK-27884 Deprecate Python 2 support in Spark 3.0 >>> >>>>>>> SPARK-27763 Port test cases from PostgreSQL to Spark SQL >>> >>>>>>> SPARK-27780 Shuffle server & client should be versioned to enable >>> >>>>>>> smoother upgrade >>> >>>>>>> SPARK-27714 Support Join Reorder based on Genetic Algorithm when >>> the # >>> >>>>>>> of joined tables > 12 >>> >>>>>>> SPARK-27471 Reorganize public v2 catalog API >>> >>>>>>> SPARK-27520 Introduce a global config system to replace >>> hadoopConfiguration >>> >>>>>>> SPARK-24625 put all the backward compatible behavior change >>> configs >>> >>>>>>> under spark.sql.legacy.* >>> >>>>>>> SPARK-24640 size(null) returns null >>> >>>>>>> SPARK-24702 Unable to cast to calendar interval in spark sql. >>> >>>>>>> SPARK-24838 Support uncorrelated IN/EXISTS subqueries for more >>> operators >>> >>>>>>> SPARK-24941 Add RDDBarrier.coalesce() function >>> >>>>>>> SPARK-25017 Add test suite for ContextBarrierState >>> >>>>>>> SPARK-25083 remove the type erasure hack in data source scan >>> >>>>>>> SPARK-25383 Image data source supports sample pushdown >>> >>>>>>> SPARK-27272 Enable blacklisting of node/executor on fetch >>> failures by default >>> >>>>>>> SPARK-27296 User Defined Aggregating Functions (UDAFs) have a >>> major >>> >>>>>>> efficiency problem >>> >>>>>>> SPARK-25128 multiple simultaneous job submissions against k8s >>> backend >>> >>>>>>> cause driver pods to hang >>> >>>>>>> SPARK-26731 remove EOLed spark jobs from jenkins >>> >>>>>>> SPARK-26664 Make DecimalType's minimum adjusted scale >>> configurable >>> >>>>>>> SPARK-21559 Remove Mesos fine-grained mode >>> >>>>>>> SPARK-24942 Improve cluster resource management with jobs >>> containing >>> >>>>>>> barrier stage >>> >>>>>>> SPARK-25914 Separate projection from grouping and aggregate in >>> logical Aggregate >>> >>>>>>> SPARK-26022 PySpark Comparison with Pandas >>> >>>>>>> SPARK-20964 Make some keywords reserved along with the ANSI/SQL >>> standard >>> >>>>>>> SPARK-26221 Improve Spark SQL instrumentation and metrics >>> >>>>>>> SPARK-26425 Add more constraint checks in file streaming source >>> to >>> >>>>>>> avoid checkpoint corruption >>> >>>>>>> SPARK-25843 Redesign rangeBetween API >>> >>>>>>> SPARK-25841 Redesign window function rangeBetween API >>> >>>>>>> SPARK-25752 Add trait to easily whitelist logical operators that >>> >>>>>>> produce named output from CleanupAliases >>> >>>>>>> SPARK-23210 Introduce the concept of default value to schema >>> >>>>>>> SPARK-25640 Clarify/Improve EvalType for grouped aggregate and >>> window aggregate >>> >>>>>>> SPARK-25531 new write APIs for data source v2 >>> >>>>>>> SPARK-25547 Pluggable jdbc connection factory >>> >>>>>>> SPARK-20845 Support specification of column names in INSERT INTO >>> >>>>>>> SPARK-24417 Build and Run Spark on JDK11 >>> >>>>>>> SPARK-24724 Discuss necessary info and access in barrier mode + >>> Kubernetes >>> >>>>>>> SPARK-24725 Discuss necessary info and access in barrier mode + >>> Mesos >>> >>>>>>> SPARK-25074 Implement maxNumConcurrentTasks() in >>> >>>>>>> MesosFineGrainedSchedulerBackend >>> >>>>>>> SPARK-23710 Upgrade the built-in Hive to 2.3.5 for hadoop-3.2 >>> >>>>>>> SPARK-25186 Stabilize Data Source V2 API >>> >>>>>>> SPARK-25376 Scenarios we should handle but missed in 2.4 for >>> barrier >>> >>>>>>> execution mode >>> >>>>>>> SPARK-25390 data source V2 API refactoring >>> >>>>>>> SPARK-7768 Make user-defined type (UDT) API public >>> >>>>>>> SPARK-14922 Alter Table Drop Partition Using Predicate-based >>> Partition Spec >>> >>>>>>> SPARK-15691 Refactor and improve Hive support >>> >>>>>>> SPARK-15694 Implement ScriptTransformation in sql/core >>> >>>>>>> SPARK-16217 Support SELECT INTO statement >>> >>>>>>> SPARK-16452 basic INFORMATION_SCHEMA support >>> >>>>>>> SPARK-18134 SQL: MapType in Group BY and Joins not working >>> >>>>>>> SPARK-18245 Improving support for bucketed table >>> >>>>>>> SPARK-19842 Informational Referential Integrity Constraints >>> Support in Spark >>> >>>>>>> SPARK-22231 Support of map, filter, withColumn, dropColumn in >>> nested >>> >>>>>>> list of structures >>> >>>>>>> SPARK-22632 Fix the behavior of timestamp values for R's >>> DataFrame to >>> >>>>>>> respect session timezone >>> >>>>>>> SPARK-22386 Data Source V2 improvements >>> >>>>>>> SPARK-24723 Discuss necessary info and access in barrier mode + >>> YARN >>> >>>>>>> >>> >>>>>>> >>> - >>> >>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>> >>>>>>> >>> >>>>>>> >>> >>>> >>> >>>> >>> >>>> -- >>> >>>> Name : Jungtaek Lim >>> >>>> Blog : http://medium.com/@heartsavior >>> >>>> Twitter : http://twitter.com/heartsavior >>> >>>> LinkedIn : http://www.linkedin.com/in/heartsavior >>> >>> >>> >>> >>> >>> >>> >>> -- >>> >>> John Zhuge >>> >> >>> >> >>> >> >>> >> -- >>> >> Twitter: https://twitter.com/holdenkarau >>> >> Books (Learning Spark, High Performance Spark, etc.): >>> https://amzn.to/2MaRAG9 >>> >> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>> > >>> > >>> >>> - >>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>> >>> -- Ryan Blue Software Engineer Netflix
DSv2 sync notes - 28 September 2019
Here are my notes from this week’s DSv2 sync. *Attendees*: Ryan Blue Holden Karau Russell Spitzer Terry Kim Wenchen Fan Shiv Prashant Sood Joseph Torres Gengliang Wang Matt Cheah Burak Yavuz *Topics*: - Driver-side Hadoop conf - SHOW DATABASES/NAMESPACES behavior - Review outstanding 3.0 work - Spark 2.5? - Open PRs - Centralize catalog and table lookup: https://github.com/apache/spark/pull/25747 - Update TableProvider for catalogs: https://github.com/apache/spark/pull/25651 - Partitioning in DataFrameWriter.save (multiple PRs) - USE CATALOG / SET CURRENT CATALOG: https://github.com/apache/spark/pull/25771 - UPDATE: https://github.com/apache/spark/pull/25626 *Discussion*: - Driver-side Hadoop conf - Holden: DSv1 propagates Hadoop conf, but using it is a pain in DSv2. This leads to multiple instances. Would be nice for DSv2 to pass it. - Ryan: Iceberg uses it and serializes it in every task; it would be nice to avoid this. - Wenchen: how would we support this? - Holden: DSv1 generates it on the driver, could use a mix-in - Ryan: part of the complication is that it is produced per stage with the current Spark config - Matt: There is a SerializableConfiguration in Iceberg, too. Overhead per task is fine, but we could use a broadcast to avoid it. - Holden will create a PR to expose SerializableConfiguration and one to add a way to pass the config to the source - SHOW NAMESPACES vs SHOW DATABASES - Ryan: These two are separate code paths right now. We chose to make SHOW NAMESPACES compatible with SHOW DATABASES, so we can either choose a different behavior or combine the two implementations - Wenchen: we should merge the two implementations - Terry will submit a PR to merge the implementations - *Outstanding work before 3.0* - Ryan: we are about ready for 3.0, but we should make sure to focus on the remaining things that need to be done. I’d like to get an idea of what needs to be done by 3.0 and what is nice-to-have - Wenchen: Dynamic partition push-down is in 3.0, will review to see if DSv2 is compatible - Ryan: It is a little late to make this a 3.0 goal. DSv2 works well without this and I think this could be added in 3.1 - Consensus was that *dynamic push-down is nice-to-have and should not block the release* - Ryan: is REFRESH TABLE needed? File sources needed this but it was never implemented - Wenchen: refresh should not block 3.0, file sources with v2 is not a 3.0 goal - Consensus was *not to block on adding refresh (nice to have)* - *Final list*: - Finish TableProvider update to avoid another API change: pass all table config from metastore (Wenchen) - Catalog behavior fix: https://issues.apache.org/jira/browse/SPARK-29014 (Ryan) - Stats push-down fix (Ryan) - Make DataFrameWriter compatible when updating a source from v1 to v2 (Burak) - Spark 2.5 release? - Ryan: We are very close to feature complete for a release, but 3.0 is still a few months away. I’m going to be backporting DSv2 onto Spark 2.4 and I could contribute this work for a 2.5 release. I have had lots of people asking when DSv2 will be available and since it isn’t a breaking change we don’t need to wait until 3.0. - Wenchen: it would be good to have a 2.x release with the same DSv2 support as 3.0 so source authors only need to support one DSv2 API. There are substantial changes to it since 2.4. - Ryan: Good point about compatibility, that would make using DSv2 much easier. - Holden: It would also be helpful to support Java 11 for the same reason. - Ryan: I think it makes sense to do a compatibility release in preparation for 3.0 then. I’ll bring this up on the dev list. - DataFrameWriterV2 python API: - Ryan: I opened SPARK-29157 to add support for the new DataFrameWriterV2 API in python. This is probably a good starter issue if anyone wants to work on it. - Holden would like to work on this - Centralize catalog and table lookup: https://github.com/apache/spark/pull/25747 - Wenchen: This refactor cleans up the default/current catalog and also separates catalog resolution from table resolution - Ryan: One of my main concerns is not having conversion in rules, but in the ParsedStatement plans themselves - Wenchen: The PR has been updated, that is no longer the case and it uses extractors. - Ryan: Will take another look, then. Also, I’d like to keep the rules in Analyzer specific to DSv2 and keep the v1 fallback rules in DataSourceResolution. That way fallback is a special case and we can remove it without rewriting the v2 rules
[DISCUSS] Spark 2.5 release
Hi everyone, In the DSv2 sync this week, we talked about a possible Spark 2.5 release based on the latest Spark 2.4, but with DSv2 and Java 11 support added. A Spark 2.5 release with these two additions will help people migrate to Spark 3.0 when it is released because they will be able to use a single implementation for DSv2 sources that works in both 2.5 and 3.0. Similarly, upgrading to 3.0 won't also require also updating to Java 11 because users could update to Java 11 with the 2.5 release and have fewer major changes. Another reason to consider a 2.5 release is that many people are interested in a release with the latest DSv2 API and support for DSv2 SQL. I'm already going to be backporting DSv2 support to the Spark 2.4 line, so it makes sense to share this work with the community. This release line would just consist of backports like DSv2 and Java 11 that assist compatibility, to keep the scope of the release small. The purpose is to assist people moving to 3.0 and not distract from the 3.0 release. Would a Spark 2.5 release help anyone else? Are there any concerns about this plan? rb -- Ryan Blue Software Engineer Netflix
Re: Spark 3.0 preview release on-going features discussion
I’m not sure that DSv2 list is accurate. We discussed this in the DSv2 sync this week (just sent out the notes) and came up with these items: - Finish TableProvider update to avoid another API change: pass all table config from metastore - Catalog behavior fix: https://issues.apache.org/jira/browse/SPARK-29014 - Stats push-down fix: move push-down to the optimizer - Make DataFrameWriter compatible when updating a source from v1 to v2, by adding extractCatalogName and extractIdentifier to TableProvider Some of the ideas that came up, like changing the pushdown API, were passed on because it is too close to the release to reasonably get the changes done without a serious delay (like the API changes just before the 2.4 release). On Fri, Sep 20, 2019 at 9:55 AM Dongjoon Hyun wrote: > Thank you for the summarization, Xingbo. > > I also agree with Sean because I don't think those block 3.0.0 preview > release. > Especially, correctness issues should not be there. > > Instead, could you summarize what we have as of now for 3.0.0 preview? > > I believe JDK11 (SPARK-28684) and Hive 2.3.5 (SPARK-23710) will be in the > what-we-have list for 3.0.0 preview. > > Bests, > Dongjoon. > > On Fri, Sep 20, 2019 at 6:22 AM Sean Owen wrote: > >> Is this a list of items that might be focused on for the final 3.0 >> release? At least, Scala 2.13 support shouldn't be on that list. The >> others look plausible, or are already done, but there are probably >> more. >> >> As for the 3.0 preview, I wouldn't necessarily block on any particular >> feature, though, yes, the more work that can go into important items >> between now and then, the better. >> I wouldn't necessarily present any list of things that will or might >> be in 3.0 with that preview; just list the things that are done, like >> JDK 11 support. >> >> On Fri, Sep 20, 2019 at 2:46 AM Xingbo Jiang >> wrote: >> > >> > Hi all, >> > >> > Let's start a new thread to discuss the on-going features for Spark 3.0 >> preview release. >> > >> > Below is the feature list for the Spark 3.0 preview release. The list >> is collected from the previous discussions in the dev list. >> > >> > Followup of the shuffle+repartition correctness issue: support roll >> back shuffle stages (https://issues.apache.org/jira/browse/SPARK-25341) >> > Upgrade the built-in Hive to 2.3.5 for hadoop-3.2 ( >> https://issues.apache.org/jira/browse/SPARK-23710) >> > JDK 11 support (https://issues.apache.org/jira/browse/SPARK-28684) >> > Scala 2.13 support (https://issues.apache.org/jira/browse/SPARK-25075) >> > DataSourceV2 features >> > >> > Enable file source v2 writers ( >> https://issues.apache.org/jira/browse/SPARK-27589) >> > CREATE TABLE USING with DataSourceV2 >> > New pushdown API for DataSourceV2 >> > Support DELETE/UPDATE/MERGE Operations in DataSourceV2 ( >> https://issues.apache.org/jira/browse/SPARK-28303) >> > >> > Correctness issue: Stream-stream joins - left outer join gives >> inconsistent output (https://issues.apache.org/jira/browse/SPARK-26154) >> > Revisiting Python / pandas UDF ( >> https://issues.apache.org/jira/browse/SPARK-28264) >> > Spark Graph (https://issues.apache.org/jira/browse/SPARK-25994) >> > >> > Features that are nice to have: >> > >> > Use remote storage for persisting shuffle data ( >> https://issues.apache.org/jira/browse/SPARK-25299) >> > Spark + Hadoop + Parquet + Avro compatibility problems ( >> https://issues.apache.org/jira/browse/SPARK-25588) >> > Introduce new option to Kafka source - specify timestamp to start and >> end offset (https://issues.apache.org/jira/browse/SPARK-26848) >> > Delete files after processing in structured streaming ( >> https://issues.apache.org/jira/browse/SPARK-20568) >> > >> > Here, I am proposing to cut the branch on October 15th. If the features >> are targeting to 3.0 preview release, please prioritize the work and finish >> it before the date. Note, Oct. 15th is not the code freeze of Spark 3.0. >> That means, the community will still work on the features for the upcoming >> Spark 3.0 release, even if they are not included in the preview release. >> The goal of preview release is to collect more feedback from the community >> regarding the new 3.0 features/behavior changes. >> > >> > Thanks! >> >> - >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> >> -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Spark 2.5 release
> DSv2 is far from stable right? No, I think it is reasonably stable and very close to being ready for a release. > All the actual data types are unstable and you guys have completely ignored that. I think what you're referring to is the use of `InternalRow`. That's a stable API and there has been no work to avoid using it. In any case, I don't think that anyone is suggesting that we delay 3.0 until a replacement for `InternalRow` is added, right? While I understand the motivation for a better solution here, I think the pragmatic solution is to continue using `InternalRow`. > If the goal is to make DSv2 work across 3.x and 2.x, that seems too invasive of a change to backport once you consider the parts needed to make dsv2 stable. I believe that those of us working on DSv2 are confident about the current stability. We set goals for what to get into the 3.0 release months ago and have very nearly reached the point where we are ready for that release. I don't think instability would be a problem in maintaining compatibility between the 2.5 version and the 3.0 version. If we find that we need to make API changes (other than additions) then we can make those in the 3.1 release. Because the goals we set for the 3.0 release have been reached with the current API and if we are ready to release 3.0, we can release a 2.5 with the same API. On Fri, Sep 20, 2019 at 11:05 AM Reynold Xin wrote: > DSv2 is far from stable right? All the actual data types are unstable and > you guys have completely ignored that. We'd need to work on that and that > will be a breaking change. If the goal is to make DSv2 work across 3.x and > 2.x, that seems too invasive of a change to backport once you consider the > parts needed to make dsv2 stable. > > > > On Fri, Sep 20, 2019 at 10:47 AM, Ryan Blue > wrote: > >> Hi everyone, >> >> In the DSv2 sync this week, we talked about a possible Spark 2.5 release >> based on the latest Spark 2.4, but with DSv2 and Java 11 support added. >> >> A Spark 2.5 release with these two additions will help people migrate to >> Spark 3.0 when it is released because they will be able to use a single >> implementation for DSv2 sources that works in both 2.5 and 3.0. Similarly, >> upgrading to 3.0 won't also require also updating to Java 11 because users >> could update to Java 11 with the 2.5 release and have fewer major changes. >> >> Another reason to consider a 2.5 release is that many people are >> interested in a release with the latest DSv2 API and support for DSv2 SQL. >> I'm already going to be backporting DSv2 support to the Spark 2.4 line, so >> it makes sense to share this work with the community. >> >> This release line would just consist of backports like DSv2 and Java 11 >> that assist compatibility, to keep the scope of the release small. The >> purpose is to assist people moving to 3.0 and not distract from the 3.0 >> release. >> >> Would a Spark 2.5 release help anyone else? Are there any concerns about >> this plan? >> >> >> rb >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > > -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Spark 2.5 release
I didn't realize that Java 11 would require breaking changes. What breaking changes are required? On Fri, Sep 20, 2019 at 11:18 AM Sean Owen wrote: > Narrowly on Java 11: the problem is that it'll take some breaking > changes, more than would be usually appropriate in a minor release, I > think. I'm still not convinced there is a burning need to use Java 11 > but stay on 2.4, after 3.0 is out, and at least the wheels are in > motion there. Java 8 is still free and being updated. > > On Fri, Sep 20, 2019 at 12:48 PM Ryan Blue > wrote: > > > > Hi everyone, > > > > In the DSv2 sync this week, we talked about a possible Spark 2.5 release > based on the latest Spark 2.4, but with DSv2 and Java 11 support added. > > > > A Spark 2.5 release with these two additions will help people migrate to > Spark 3.0 when it is released because they will be able to use a single > implementation for DSv2 sources that works in both 2.5 and 3.0. Similarly, > upgrading to 3.0 won't also require also updating to Java 11 because users > could update to Java 11 with the 2.5 release and have fewer major changes. > > > > Another reason to consider a 2.5 release is that many people are > interested in a release with the latest DSv2 API and support for DSv2 SQL. > I'm already going to be backporting DSv2 support to the Spark 2.4 line, so > it makes sense to share this work with the community. > > > > This release line would just consist of backports like DSv2 and Java 11 > that assist compatibility, to keep the scope of the release small. The > purpose is to assist people moving to 3.0 and not distract from the 3.0 > release. > > > > Would a Spark 2.5 release help anyone else? Are there any concerns about > this plan? > > > > > > rb > > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix > -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Spark 2.5 release
When you created the PR to make InternalRow public This isn’t quite accurate. The change I made was to use InternalRow instead of UnsafeRow, which is a specific implementation of InternalRow. Exposing this API has always been a part of DSv2 and while both you and I did some work to avoid this, we are still in the phase of starting with that API. Note that any change to InternalRow would be very costly to implement because this interface is widely used. That is why I think we can certainly consider it stable enough to use here, and that’s probably why UnsafeRow was part of the original proposal. In any case, the goal for 3.0 was not to replace the use of InternalRow, it was to get the majority of SQL working on top of the interface added after 2.4. That’s done and stable, so I think a 2.5 release with it is also reasonable. On Fri, Sep 20, 2019 at 11:23 AM Reynold Xin wrote: > To push back, while I agree we should not drastically change > "InternalRow", there are a lot of changes that need to happen to make it > stable. For example, none of the publicly exposed interfaces should be in > the Catalyst package or the unsafe package. External implementations should > be decoupled from the internal implementations, with cheap ways to convert > back and forth. > > When you created the PR to make InternalRow public, the understanding was > to work towards making it stable in the future, assuming we will start with > an unstable API temporarily. You can't just make a bunch internal APIs > tightly coupled with other internal pieces public and stable and call it a > day, just because it happen to satisfy some use cases temporarily assuming > the rest of Spark doesn't change. > > > > On Fri, Sep 20, 2019 at 11:19 AM, Ryan Blue wrote: > >> > DSv2 is far from stable right? >> >> No, I think it is reasonably stable and very close to being ready for a >> release. >> >> > All the actual data types are unstable and you guys have completely >> ignored that. >> >> I think what you're referring to is the use of `InternalRow`. That's a >> stable API and there has been no work to avoid using it. In any case, I >> don't think that anyone is suggesting that we delay 3.0 until a replacement >> for `InternalRow` is added, right? >> >> While I understand the motivation for a better solution here, I think the >> pragmatic solution is to continue using `InternalRow`. >> >> > If the goal is to make DSv2 work across 3.x and 2.x, that seems too >> invasive of a change to backport once you consider the parts needed to make >> dsv2 stable. >> >> I believe that those of us working on DSv2 are confident about the >> current stability. We set goals for what to get into the 3.0 release months >> ago and have very nearly reached the point where we are ready for that >> release. >> >> I don't think instability would be a problem in maintaining compatibility >> between the 2.5 version and the 3.0 version. If we find that we need to >> make API changes (other than additions) then we can make those in the 3.1 >> release. Because the goals we set for the 3.0 release have been reached >> with the current API and if we are ready to release 3.0, we can release a >> 2.5 with the same API. >> >> On Fri, Sep 20, 2019 at 11:05 AM Reynold Xin wrote: >> >> DSv2 is far from stable right? All the actual data types are unstable and >> you guys have completely ignored that. We'd need to work on that and that >> will be a breaking change. If the goal is to make DSv2 work across 3.x and >> 2.x, that seems too invasive of a change to backport once you consider the >> parts needed to make dsv2 stable. >> >> >> >> On Fri, Sep 20, 2019 at 10:47 AM, Ryan Blue >> wrote: >> >> Hi everyone, >> >> In the DSv2 sync this week, we talked about a possible Spark 2.5 release >> based on the latest Spark 2.4, but with DSv2 and Java 11 support added. >> >> A Spark 2.5 release with these two additions will help people migrate to >> Spark 3.0 when it is released because they will be able to use a single >> implementation for DSv2 sources that works in both 2.5 and 3.0. Similarly, >> upgrading to 3.0 won't also require also updating to Java 11 because users >> could update to Java 11 with the 2.5 release and have fewer major changes. >> >> Another reason to consider a 2.5 release is that many people are >> interested in a release with the latest DSv2 API and support for DSv2 SQL. >> I'm already going to be backporting DSv2 support to the Spark 2.4 line, so >> it makes sense to share this work with
Re: [DISCUSS] Spark 2.5 release
I don’t think we need to gate a 3.0 release on making a more stable version of InternalRow Sounds like we agree, then. We will use it for 3.0, but there are known problems with it. Thinking we’d have dsv2 working in both 3.x (which will change and progress towards more stable, but will have to break certain APIs) and 2.x seems like a false premise. Why do you think we will need to break certain APIs before 3.0? I’m only suggesting that we release the same support in a 2.5 release that we do in 3.0. Since we are nearly finished with the 3.0 goals, it seems like we can certainly do that. We just won’t add any breaking changes before 3.1. On Fri, Sep 20, 2019 at 11:39 AM Reynold Xin wrote: > I don't think we need to gate a 3.0 release on making a more stable > version of InternalRow, but thinking we'd have dsv2 working in both 3.x > (which will change and progress towards more stable, but will have to break > certain APIs) and 2.x seems like a false premise. > > To point out some problems with InternalRow that you think are already > pragmatic and stable: > > The class is in catalyst, which states: > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala > > /** > * Catalyst is a library for manipulating relational query plans. All > classes in catalyst are > * considered an internal API to Spark SQL and are subject to change > between minor releases. > */ > > There is no even any annotation on the interface. > > The entire dependency chain were created to be private, and tightly > coupled with internal implementations. For example, > > > https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java > > /** > * A UTF-8 String for internal Spark use. > * > * A String encoded in UTF-8 as an Array[Byte], which can be used for > comparison, > * search, see http://en.wikipedia.org/wiki/UTF-8 for details. > * > * Note: This is not designed for general use cases, should not be used > outside SQL. > */ > > > > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala > > (which again is in catalyst package) > > > If you want to argue this way, you might as well argue we should make the > entire catalyst package public to be pragmatic and not allow any changes. > > > > > On Fri, Sep 20, 2019 at 11:32 AM, Ryan Blue wrote: > >> When you created the PR to make InternalRow public >> >> This isn’t quite accurate. The change I made was to use InternalRow >> instead of UnsafeRow, which is a specific implementation of InternalRow. >> Exposing this API has always been a part of DSv2 and while both you and I >> did some work to avoid this, we are still in the phase of starting with >> that API. >> >> Note that any change to InternalRow would be very costly to implement >> because this interface is widely used. That is why I think we can certainly >> consider it stable enough to use here, and that’s probably why UnsafeRow >> was part of the original proposal. >> >> In any case, the goal for 3.0 was not to replace the use of InternalRow, >> it was to get the majority of SQL working on top of the interface added >> after 2.4. That’s done and stable, so I think a 2.5 release with it is also >> reasonable. >> >> On Fri, Sep 20, 2019 at 11:23 AM Reynold Xin wrote: >> >> To push back, while I agree we should not drastically change >> "InternalRow", there are a lot of changes that need to happen to make it >> stable. For example, none of the publicly exposed interfaces should be in >> the Catalyst package or the unsafe package. External implementations should >> be decoupled from the internal implementations, with cheap ways to convert >> back and forth. >> >> When you created the PR to make InternalRow public, the understanding was >> to work towards making it stable in the future, assuming we will start with >> an unstable API temporarily. You can't just make a bunch internal APIs >> tightly coupled with other internal pieces public and stable and call it a >> day, just because it happen to satisfy some use cases temporarily assuming >> the rest of Spark doesn't change. >> >> >> >> On Fri, Sep 20, 2019 at 11:19 AM, Ryan Blue wrote: >> >> > DSv2 is far from stable right? >> >> No, I think it is reasonably stable and very close to being ready for a >> release. >> >> > All the actual data types are unstable and you guys have completely >> ignored that. >> >> I think what you
Re: [DISCUSS] Spark 2.5 release
Thanks for pointing this out, Dongjoon. To clarify, I’m not suggesting that we can break compatibility. I’m suggesting that we make a 2.5 release that uses the same DSv2 API as 3.0. These APIs are marked unstable, so we could make changes to them if we needed — as we have done in the 2.x line — but I don’t see a reason why we would break compatibility in the 3.x line. On Fri, Sep 20, 2019 at 8:46 PM Dongjoon Hyun wrote: > Do you mean you want to have a breaking API change between 3.0 and 3.1? > I believe we follow Semantic Versioning ( > https://spark.apache.org/versioning-policy.html ). > > > We just won’t add any breaking changes before 3.1. > > Bests, > Dongjoon. > > > On Fri, Sep 20, 2019 at 11:48 AM Ryan Blue > wrote: > >> I don’t think we need to gate a 3.0 release on making a more stable >> version of InternalRow >> >> Sounds like we agree, then. We will use it for 3.0, but there are known >> problems with it. >> >> Thinking we’d have dsv2 working in both 3.x (which will change and >> progress towards more stable, but will have to break certain APIs) and 2.x >> seems like a false premise. >> >> Why do you think we will need to break certain APIs before 3.0? >> >> I’m only suggesting that we release the same support in a 2.5 release >> that we do in 3.0. Since we are nearly finished with the 3.0 goals, it >> seems like we can certainly do that. We just won’t add any breaking changes >> before 3.1. >> >> On Fri, Sep 20, 2019 at 11:39 AM Reynold Xin wrote: >> >>> I don't think we need to gate a 3.0 release on making a more stable >>> version of InternalRow, but thinking we'd have dsv2 working in both 3.x >>> (which will change and progress towards more stable, but will have to break >>> certain APIs) and 2.x seems like a false premise. >>> >>> To point out some problems with InternalRow that you think are already >>> pragmatic and stable: >>> >>> The class is in catalyst, which states: >>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala >>> >>> /** >>> * Catalyst is a library for manipulating relational query plans. All >>> classes in catalyst are >>> * considered an internal API to Spark SQL and are subject to change >>> between minor releases. >>> */ >>> >>> There is no even any annotation on the interface. >>> >>> The entire dependency chain were created to be private, and tightly >>> coupled with internal implementations. For example, >>> >>> >>> https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java >>> >>> /** >>> * A UTF-8 String for internal Spark use. >>> * >>> * A String encoded in UTF-8 as an Array[Byte], which can be used for >>> comparison, >>> * search, see http://en.wikipedia.org/wiki/UTF-8 for details. >>> * >>> * Note: This is not designed for general use cases, should not be used >>> outside SQL. >>> */ >>> >>> >>> >>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala >>> >>> (which again is in catalyst package) >>> >>> >>> If you want to argue this way, you might as well argue we should make >>> the entire catalyst package public to be pragmatic and not allow any >>> changes. >>> >>> >>> >>> >>> On Fri, Sep 20, 2019 at 11:32 AM, Ryan Blue wrote: >>> >>>> When you created the PR to make InternalRow public >>>> >>>> This isn’t quite accurate. The change I made was to use InternalRow >>>> instead of UnsafeRow, which is a specific implementation of InternalRow. >>>> Exposing this API has always been a part of DSv2 and while both you and I >>>> did some work to avoid this, we are still in the phase of starting with >>>> that API. >>>> >>>> Note that any change to InternalRow would be very costly to implement >>>> because this interface is widely used. That is why I think we can certainly >>>> consider it stable enough to use here, and that’s probably why >>>> UnsafeRow was part of the original proposal. >>>> >>>> In any case, the goal for 3.0 was not to replace the use of InternalRow, >>>> it was to get the majority of SQL working on top of the interface added >&g
Re: [DISCUSS] Spark 2.5 release
> Making downstream to diverge their implementation heavily between minor versions (say, 2.4 vs 2.5) wouldn't be a good experience You're right that the API has been evolving in the 2.x line. But, it is now reasonably stable with respect to the current feature set and we should not need to break compatibility in the 3.x line. Because we have reached our goals for the 3.0 release, we can backport at least those features to 2.x and confidently have an API that works in both a 2.x release and is compatible with 3.0, if not 3.1 and later releases as well. > I'd rather say preparation of Spark 2.5 should be started after Spark 3.0 is officially released The reason I'm suggesting this is that I'm already going to do the work to backport the 3.0 release features to 2.4. I've been asked by several people when DSv2 will be released, so I know there is a lot of interest in making this available sooner than 3.0. If I'm already doing the work, then I'd be happy to share that with the community. I don't see why 2.5 and 3.0 are mutually exclusive. We can work on 2.5 while preparing the 3.0 preview and fixing bugs. For DSv2, the work is about complete so we can easily release the same set of features and API in 2.5 and 3.0. If we decide for some reason to wait until after 3.0 is released, I don't know that there is much value in a 2.5. The purpose is to be a step toward 3.0, and releasing that step after 3.0 doesn't seem helpful to me. It also wouldn't get these features out any sooner than 3.0, as a 2.5 release probably would, given the work needed to validate the incompatible changes in 3.0. > DSv2 change would be the major backward incompatibility which Spark 2.x users may hesitate to upgrade As I pointed out, DSv2 has been changing in the 2.x line, so this is expected. I don't think it will need incompatible changes in the 3.x line. On Fri, Sep 20, 2019 at 9:25 PM Jungtaek Lim wrote: > Just 2 cents, I haven't tracked the change of DSv2 (though I needed to > deal with this as the change made confusion on my PRs...), but my bet is > that DSv2 would be already changed in incompatible way, at least who works > for custom DataSource. Making downstream to diverge their implementation > heavily between minor versions (say, 2.4 vs 2.5) wouldn't be a good > experience - especially we are not completely closed the chance to further > modify DSv2, and the change could be backward incompatible. > > If we really want to bring the DSv2 change to 2.x version line to let end > users avoid forcing to upgrade Spark 3.x to enjoy new DSv2, I'd rather say > preparation of Spark 2.5 should be started after Spark 3.0 is officially > released, honestly even later than that, say, getting some reports from > Spark 3.0 about DSv2 so that we feel DSv2 is OK. I hope we don't make Spark > 2.5 be a kind of "tech-preview" which Spark 2.4 users may be frustrated to > upgrade to next minor version. > > Btw, do we have any specific target users for this? Personally DSv2 change > would be the major backward incompatibility which Spark 2.x users may > hesitate to upgrade, so they might be already prepared to migrate to Spark > 3.0 if they are prepared to migrate to new DSv2. > > On Sat, Sep 21, 2019 at 12:46 PM Dongjoon Hyun > wrote: > >> Do you mean you want to have a breaking API change between 3.0 and 3.1? >> I believe we follow Semantic Versioning ( >> https://spark.apache.org/versioning-policy.html ). >> >> > We just won’t add any breaking changes before 3.1. >> >> Bests, >> Dongjoon. >> >> >> On Fri, Sep 20, 2019 at 11:48 AM Ryan Blue >> wrote: >> >>> I don’t think we need to gate a 3.0 release on making a more stable >>> version of InternalRow >>> >>> Sounds like we agree, then. We will use it for 3.0, but there are known >>> problems with it. >>> >>> Thinking we’d have dsv2 working in both 3.x (which will change and >>> progress towards more stable, but will have to break certain APIs) and 2.x >>> seems like a false premise. >>> >>> Why do you think we will need to break certain APIs before 3.0? >>> >>> I’m only suggesting that we release the same support in a 2.5 release >>> that we do in 3.0. Since we are nearly finished with the 3.0 goals, it >>> seems like we can certainly do that. We just won’t add any breaking changes >>> before 3.1. >>> >>> On Fri, Sep 20, 2019 at 11:39 AM Reynold Xin >>> wrote: >>> >>>> I don't think we need to gate a 3.0 release on making a more stable >>>> version of InternalRow, but thinking we'd have dsv2 working in both 3.x >>>&g
Re: [DISCUSS] Spark 2.5 release
Why would that require an incompatible change? We *could* make an incompatible change and remove support for InternalRow, but I think we would want to carefully consider whether that is the right decision. And in any case, we would be able to keep 2.5 and 3.0 compatible, which is the main goal. On Sat, Sep 21, 2019 at 2:28 PM Reynold Xin wrote: > How would you not make incompatible changes in 3.x? As discussed the > InternalRow API is not stable and needs to change. > > On Sat, Sep 21, 2019 at 2:27 PM Ryan Blue wrote: > >> > Making downstream to diverge their implementation heavily between minor >> versions (say, 2.4 vs 2.5) wouldn't be a good experience >> >> You're right that the API has been evolving in the 2.x line. But, it is >> now reasonably stable with respect to the current feature set and we should >> not need to break compatibility in the 3.x line. Because we have reached >> our goals for the 3.0 release, we can backport at least those features to >> 2.x and confidently have an API that works in both a 2.x release and is >> compatible with 3.0, if not 3.1 and later releases as well. >> >> > I'd rather say preparation of Spark 2.5 should be started after Spark >> 3.0 is officially released >> >> The reason I'm suggesting this is that I'm already going to do the work >> to backport the 3.0 release features to 2.4. I've been asked by several >> people when DSv2 will be released, so I know there is a lot of interest in >> making this available sooner than 3.0. If I'm already doing the work, then >> I'd be happy to share that with the community. >> >> I don't see why 2.5 and 3.0 are mutually exclusive. We can work on 2.5 >> while preparing the 3.0 preview and fixing bugs. For DSv2, the work is >> about complete so we can easily release the same set of features and API in >> 2.5 and 3.0. >> >> If we decide for some reason to wait until after 3.0 is released, I don't >> know that there is much value in a 2.5. The purpose is to be a step toward >> 3.0, and releasing that step after 3.0 doesn't seem helpful to me. It also >> wouldn't get these features out any sooner than 3.0, as a 2.5 release >> probably would, given the work needed to validate the incompatible changes >> in 3.0. >> >> > DSv2 change would be the major backward incompatibility which Spark 2.x >> users may hesitate to upgrade >> >> As I pointed out, DSv2 has been changing in the 2.x line, so this is >> expected. I don't think it will need incompatible changes in the 3.x line. >> >> On Fri, Sep 20, 2019 at 9:25 PM Jungtaek Lim wrote: >> >>> Just 2 cents, I haven't tracked the change of DSv2 (though I needed to >>> deal with this as the change made confusion on my PRs...), but my bet is >>> that DSv2 would be already changed in incompatible way, at least who works >>> for custom DataSource. Making downstream to diverge their implementation >>> heavily between minor versions (say, 2.4 vs 2.5) wouldn't be a good >>> experience - especially we are not completely closed the chance to further >>> modify DSv2, and the change could be backward incompatible. >>> >>> If we really want to bring the DSv2 change to 2.x version line to let >>> end users avoid forcing to upgrade Spark 3.x to enjoy new DSv2, I'd rather >>> say preparation of Spark 2.5 should be started after Spark 3.0 is >>> officially released, honestly even later than that, say, getting some >>> reports from Spark 3.0 about DSv2 so that we feel DSv2 is OK. I hope we >>> don't make Spark 2.5 be a kind of "tech-preview" which Spark 2.4 users may >>> be frustrated to upgrade to next minor version. >>> >>> Btw, do we have any specific target users for this? Personally DSv2 >>> change would be the major backward incompatibility which Spark 2.x users >>> may hesitate to upgrade, so they might be already prepared to migrate to >>> Spark 3.0 if they are prepared to migrate to new DSv2. >>> >>> On Sat, Sep 21, 2019 at 12:46 PM Dongjoon Hyun >>> wrote: >>> >>>> Do you mean you want to have a breaking API change between 3.0 and 3.1? >>>> I believe we follow Semantic Versioning ( >>>> https://spark.apache.org/versioning-policy.html ). >>>> >>>> > We just won’t add any breaking changes before 3.1. >>>> >>>> Bests, >>>> Dongjoon. >>>> >>>> >>>> On Fri, Sep 20, 2019 at 11:48 AM Ryan Blue >>>&
Re: [DISCUSS] Spark 2.5 release
> If you insist we shouldn't change the unstable temporary API in 3.x . . . Not what I'm saying at all. I said we should carefully consider whether a breaking change is the right decision in the 3.x line. All I'm suggesting is that we can make a 2.5 release with the feature and an API that is the same as the one in 3.0. > I also don't get this backporting a giant feature to 2.x line I am planning to do this so we can use DSv2 before 3.0 is released. Then we can have a source implementation that works in both 2.x and 3.0 to make the transition easier. Since I'm already doing the work, I'm offering to share it with the community. On Sat, Sep 21, 2019 at 2:36 PM Reynold Xin wrote: > Because for example we'd need to move the location of InternalRow, > breaking the package name. If you insist we shouldn't change the unstable > temporary API in 3.x to maintain compatibility with 3.0, which is totally > different from my understanding of the situation when you exposed it, then > I'd say we should gate 3.0 on having a stable row interface. > > I also don't get this backporting a giant feature to 2.x line ... as > suggested by others in the thread, DSv2 would be one of the main reasons > people upgrade to 3.0. What's so special about DSv2 that we are doing this? > Why not abandoning 3.0 entirely and backport all the features to 2.x? > > > > On Sat, Sep 21, 2019 at 2:31 PM, Ryan Blue wrote: > >> Why would that require an incompatible change? >> >> We *could* make an incompatible change and remove support for >> InternalRow, but I think we would want to carefully consider whether that >> is the right decision. And in any case, we would be able to keep 2.5 and >> 3.0 compatible, which is the main goal. >> >> On Sat, Sep 21, 2019 at 2:28 PM Reynold Xin wrote: >> >> How would you not make incompatible changes in 3.x? As discussed the >> InternalRow API is not stable and needs to change. >> >> On Sat, Sep 21, 2019 at 2:27 PM Ryan Blue wrote: >> >> > Making downstream to diverge their implementation heavily between minor >> versions (say, 2.4 vs 2.5) wouldn't be a good experience >> >> You're right that the API has been evolving in the 2.x line. But, it is >> now reasonably stable with respect to the current feature set and we should >> not need to break compatibility in the 3.x line. Because we have reached >> our goals for the 3.0 release, we can backport at least those features to >> 2.x and confidently have an API that works in both a 2.x release and is >> compatible with 3.0, if not 3.1 and later releases as well. >> >> > I'd rather say preparation of Spark 2.5 should be started after Spark >> 3.0 is officially released >> >> The reason I'm suggesting this is that I'm already going to do the work >> to backport the 3.0 release features to 2.4. I've been asked by several >> people when DSv2 will be released, so I know there is a lot of interest in >> making this available sooner than 3.0. If I'm already doing the work, then >> I'd be happy to share that with the community. >> >> I don't see why 2.5 and 3.0 are mutually exclusive. We can work on 2.5 >> while preparing the 3.0 preview and fixing bugs. For DSv2, the work is >> about complete so we can easily release the same set of features and API in >> 2.5 and 3.0. >> >> If we decide for some reason to wait until after 3.0 is released, I don't >> know that there is much value in a 2.5. The purpose is to be a step toward >> 3.0, and releasing that step after 3.0 doesn't seem helpful to me. It also >> wouldn't get these features out any sooner than 3.0, as a 2.5 release >> probably would, given the work needed to validate the incompatible changes >> in 3.0. >> >> > DSv2 change would be the major backward incompatibility which Spark 2.x >> users may hesitate to upgrade >> >> As I pointed out, DSv2 has been changing in the 2.x line, so this is >> expected. I don't think it will need incompatible changes in the 3.x line. >> >> On Fri, Sep 20, 2019 at 9:25 PM Jungtaek Lim wrote: >> >> Just 2 cents, I haven't tracked the change of DSv2 (though I needed to >> deal with this as the change made confusion on my PRs...), but my bet is >> that DSv2 would be already changed in incompatible way, at least who works >> for custom DataSource. Making downstream to diverge their implementation >> heavily between minor versions (say, 2.4 vs 2.5) wouldn't be a good >> experience - especially we are not completely closed the chance to furthe
Re: [DISCUSS] Spark 2.5 release
My understanding is that 3.0-preview is not going to be a production-ready release. For those of us that have been using backports of DSv2 in production, that doesn't help. It also doesn't help as a stepping stone because users would need to handle all of the incompatible changes in 3.0. Using 3.0-preview would be an unstable release with breaking changes instead of a stable release without the breaking changes. I'm offering to help build a stable release without breaking changes. But if there is no community interest in it, I'm happy to drop this. On Sun, Sep 22, 2019 at 6:39 PM Hyukjin Kwon wrote: > +1 for Matei's as well. > > On Sun, 22 Sep 2019, 14:59 Marco Gaido, wrote: > >> I agree with Matei too. >> >> Thanks, >> Marco >> >> Il giorno dom 22 set 2019 alle ore 03:44 Dongjoon Hyun < >> dongjoon.h...@gmail.com> ha scritto: >> >>> +1 for Matei's suggestion! >>> >>> Bests, >>> Dongjoon. >>> >>> On Sat, Sep 21, 2019 at 5:44 PM Matei Zaharia >>> wrote: >>> >>>> If the goal is to get people to try the DSv2 API and build DSv2 data >>>> sources, can we recommend the 3.0-preview release for this? That would get >>>> people shifting to 3.0 faster, which is probably better overall compared to >>>> maintaining two major versions. There’s not that much else changing in 3.0 >>>> if you already want to update your Java version. >>>> >>>> On Sep 21, 2019, at 2:45 PM, Ryan Blue >>>> wrote: >>>> >>>> > If you insist we shouldn't change the unstable temporary API in 3.x . >>>> . . >>>> >>>> Not what I'm saying at all. I said we should carefully consider whether >>>> a breaking change is the right decision in the 3.x line. >>>> >>>> All I'm suggesting is that we can make a 2.5 release with the feature >>>> and an API that is the same as the one in 3.0. >>>> >>>> > I also don't get this backporting a giant feature to 2.x line >>>> >>>> I am planning to do this so we can use DSv2 before 3.0 is released. >>>> Then we can have a source implementation that works in both 2.x and 3.0 to >>>> make the transition easier. Since I'm already doing the work, I'm offering >>>> to share it with the community. >>>> >>>> >>>> On Sat, Sep 21, 2019 at 2:36 PM Reynold Xin >>>> wrote: >>>> >>>>> Because for example we'd need to move the location of InternalRow, >>>>> breaking the package name. If you insist we shouldn't change the unstable >>>>> temporary API in 3.x to maintain compatibility with 3.0, which is totally >>>>> different from my understanding of the situation when you exposed it, then >>>>> I'd say we should gate 3.0 on having a stable row interface. >>>>> >>>>> I also don't get this backporting a giant feature to 2.x line ... as >>>>> suggested by others in the thread, DSv2 would be one of the main reasons >>>>> people upgrade to 3.0. What's so special about DSv2 that we are doing >>>>> this? >>>>> Why not abandoning 3.0 entirely and backport all the features to 2.x? >>>>> >>>>> >>>>> >>>>> On Sat, Sep 21, 2019 at 2:31 PM, Ryan Blue wrote: >>>>> >>>>>> Why would that require an incompatible change? >>>>>> >>>>>> We *could* make an incompatible change and remove support for >>>>>> InternalRow, but I think we would want to carefully consider whether that >>>>>> is the right decision. And in any case, we would be able to keep 2.5 and >>>>>> 3.0 compatible, which is the main goal. >>>>>> >>>>>> On Sat, Sep 21, 2019 at 2:28 PM Reynold Xin >>>>>> wrote: >>>>>> >>>>>> How would you not make incompatible changes in 3.x? As discussed the >>>>>> InternalRow API is not stable and needs to change. >>>>>> >>>>>> On Sat, Sep 21, 2019 at 2:27 PM Ryan Blue wrote: >>>>>> >>>>>> > Making downstream to diverge their implementation heavily between >>>>>> minor versions (say, 2.4 vs 2.5) wouldn't be a good experience >>>>>> >>>>>> You're right that the API has been evolving in the 2.x
Re: [DISCUSS] Spark 2.5 release
>From those questions, I can see that there is significant confusion about what I'm proposing, so let me try to clear it up. > 1. Is DSv2 stable in `master`? DSv2 has reached a stable API that is capable of supporting all of the features we intend to deliver for Spark 3.0. The proposal is to backport the same API and features for Spark 2.5. I am not saying that this API won't change after 3.0. Notably, Reynold wants to change the use of InternalRow. But, these changes are after 3.0 and don't affect the compatibility I'm proposing, between the 2.5 and 3.0 releases. I also doubt that breaking changes would happen by 3.1. > 2. If then, what subset of DSv2 patches does Ryan is suggesting backporting? I am proposing backporting what we intend to deliver for 3.0: the API currently in master, SQL support, and multi-catalog support. > 3. How much those backporting DSv2 patches looks differently in `branch-2.4`? DSv2 is mostly an addition located in the `connector` package. It also changes some parts of the SQL parser and adds parsed plans, as well as new rules to convert from parsed plans. This is not an invasive change because we kept most of DSv2 separate. DSv2 should be nearly identical between the two branches. > 4. What does he mean by `without breaking changes? Is it technically feasible? DSv2 is marked unstable in the 2.x line and changes between releases. The API changed between 2.3 and 2.4, so this would be no different. But, we would keep the API the same between 2.5 and 3.0 to assist migration. This is technically feasible because what we are planning to deliver for 3.0 is nearly ready, and the API has not needed to change recently. > Apache Spark 2.4.x and 2.5.x DSv2 should be compatible. This has not been a requirement for DSv2 development so far. If this is a new requirement, then we should not do a 2.5 release. > 5. How long does it take? Is it possible before 3.0.0-preview? Who will work on that backporting? As I said, I'm already going to do this work, so I'm offering to release it to the community. I don't know how long it will take, but this work and 3.0-preview are not mutually exclusive. > 6. Is this meaningful if 2.5 and 3.1 become different again too soon (in 2020 Summer)? It is useful to me, so I assume it is useful to others. I also think it is unlikely that 3.1 will need to make API changes to DSv2. There may be some bugs found, but I don't think we will break API compatibility so quickly. Most of the changes to the API will require only additions. > If you have a working branch, please share with us. I don't have a branch to share. On Mon, Sep 23, 2019 at 6:47 PM Dongjoon Hyun wrote: > Hi, Ryan. > > This thread has many replied as you see. That is the evidence that the > community is interested in your suggestion a lot. > > > I'm offering to help build a stable release without breaking changes. > But if there is no community interest in it, I'm happy to drop this. > > In this thread, the root cause of the disagreement is due to the lack of > supporting evidence for your claims. > > 1. Is DSv2 stable in `master`? > 2. If then, what subset of DSv2 patches does Ryan is suggesting > backporting? > 3. How much those backporting DSv2 patches looks differently in > `branch-2.4`? > 4. What does he mean by `without breaking changes? Is it technically > feasible? > Apache Spark 2.4.x and 2.5.x DSv2 should be compatible. (Not between > 2.5.x DSv2 and 3.0.0 DSv2) > 5. How long does it take? Is it possible before 3.0.0-preview? Who will > work on that backporting? > 6. Is this meaningful if 2.5 and 3.1 become different again too soon (in > 2020 Summer)? > > We are SW engineers. > If you have a working branch, please share with us. > It will help us understand your suggestion and this discussion. > We can help you verify that branch achieves your goal. > The branch is tested already, isn't it? > > Bests, > Dongjoon. > > > > > On Mon, Sep 23, 2019 at 10:44 AM Holden Karau > wrote: > >> I would personally love to see us provide a gentle migration path to >> Spark 3 especially if much of the work is already going to happen anyways. >> >> Maybe giving it a different name (eg something like >> Spark-2-to-3-transitional) would make it more clear about its intended >> purpose and encourage folks to move to 3 when they can? >> >> On Mon, Sep 23, 2019 at 9:17 AM Ryan Blue >> wrote: >> >>> My understanding is that 3.0-preview is not going to be a >>> production-ready release. For those of us that have been using backports of >>> DSv2 in production, that doesn't help. >>> >>> It also doesn't help as a stepping stone because users would need to >>&g
Re: [DISCUSS] Spark 2.5 release
> That's not a new requirement, that's an "implicit" requirement via semantic versioning. The expectation is that the DSv2 API will change in minor versions in the 2.x line. The API is marked with the Experimental API annotation to signal that it can change, and it has been changing. A requirement to not change this API for a 2.5 release is a new requirement. I'm fine with that if that's what everyone wants. Like I said, if we want to add a requirement to not change this API then we shouldn't release the 2.5 that I'm proposing. On Tue, Sep 24, 2019 at 2:51 PM Jungtaek Lim wrote: > >> Apache Spark 2.4.x and 2.5.x DSv2 should be compatible. > > > This has not been a requirement for DSv2 development so far. If this is > a new requirement, then we should not do a 2.5 release. > > My 2 cents, target version of new DSv2 has been only 3.0 so we don't ever > have a chance to think about such requirement - that's why there's no > restriction on breaking compatibility on codebase. That's not a new > requirement, that's an "implicit" requirement via semantic versioning. I > agree that some of APIs have been changed between Spark 2.x versions, but I > guess the changes in "new" DSv2 would be bigger than summation of changes > on "old" DSv2 which has been introduced across multiple minor versions. > > Suppose we're developers of Spark ecosystem maintaining custom data source > (forget about developing Spark): I would get some official announcement on > next minor version, and I want to try it out quickly to see my stuff still > supports new version. When I change the dependency version everything will > break. My hopeful expectation would be no issue while upgrading but turns > out it's not, and even it requires new learning (not only fixing > compilation failures). It would just make me giving up support Spark 2.5 or > at least I won't follow up such change quickly. IMHO 3.0-techpreview has > advantage here (assuming we provide maven artifacts as well as official > announcement), as it can give us expectation that there're bunch of changes > given it's a new major version. It also provides bunch of time to try > adopting it before the version is officially released. > > > On Wed, Sep 25, 2019 at 4:56 AM Ryan Blue wrote: > >> From those questions, I can see that there is significant confusion about >> what I'm proposing, so let me try to clear it up. >> >> > 1. Is DSv2 stable in `master`? >> >> DSv2 has reached a stable API that is capable of supporting all of the >> features we intend to deliver for Spark 3.0. The proposal is to backport >> the same API and features for Spark 2.5. >> >> I am not saying that this API won't change after 3.0. Notably, Reynold >> wants to change the use of InternalRow. But, these changes are after 3.0 >> and don't affect the compatibility I'm proposing, between the 2.5 and 3.0 >> releases. I also doubt that breaking changes would happen by 3.1. >> >> > 2. If then, what subset of DSv2 patches does Ryan is suggesting >> backporting? >> >> I am proposing backporting what we intend to deliver for 3.0: the API >> currently in master, SQL support, and multi-catalog support. >> >> > 3. How much those backporting DSv2 patches looks differently in >> `branch-2.4`? >> >> DSv2 is mostly an addition located in the `connector` package. It also >> changes some parts of the SQL parser and adds parsed plans, as well as new >> rules to convert from parsed plans. This is not an invasive change because >> we kept most of DSv2 separate. DSv2 should be nearly identical between the >> two branches. >> >> > 4. What does he mean by `without breaking changes? Is it technically >> feasible? >> >> DSv2 is marked unstable in the 2.x line and changes between releases. The >> API changed between 2.3 and 2.4, so this would be no different. But, we >> would keep the API the same between 2.5 and 3.0 to assist migration. >> >> This is technically feasible because what we are planning to deliver for >> 3.0 is nearly ready, and the API has not needed to change recently. >> >> > Apache Spark 2.4.x and 2.5.x DSv2 should be compatible. >> >> This has not been a requirement for DSv2 development so far. If this is a >> new requirement, then we should not do a 2.5 release. >> >> > 5. How long does it take? Is it possible before 3.0.0-preview? Who will >> work on that backporting? >> >> As I said, I'm already going to do this work, so I'm offering to release >> it to the
[DISCUSS] Out of order optimizer rules?
Hi everyone, I have been working on a PR that moves filter and projection pushdown into the optimizer for DSv2, instead of when converting to physical plan. This will make DSv2 work with optimizer rules that depend on stats, like join reordering. While adding the optimizer rule, I found that some rules appear to be out of order. For example, PruneFileSourcePartitions that handles filter pushdown for v1 scans is in SparkOptimizer (spark-sql) in a batch that will run after all of the batches in Optimizer (spark-catalyst) including CostBasedJoinReorder. SparkOptimizer also adds the new “dynamic partition pruning” rules *after* both the cost-based join reordering and the v1 partition pruning rule. I’m not sure why this should run after join reordering and partition pruning, since it seems to me like additional filters would be good to have before those rules run. It looks like this might just be that the rules were written in the spark-sql module instead of in catalyst. That makes some sense for the v1 pushdown, which is altering physical plan details (FileIndex) that have leaked into the logical plan. I’m not sure why the dynamic partition pruning rules aren’t in catalyst or why they run after the v1 predicate pushdown. Can someone more familiar with these rules clarify why they appear to be out of order? Assuming that this is an accident, I think it’s something that should be fixed before 3.0. My PR fixes early pushdown, but the “dynamic” pruning may still need to be addressed. rb -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Out of order optimizer rules?
Where can I find a design doc for dynamic partition pruning that explains how it works? The JIRA issue, SPARK-11150, doesn't seem to describe dynamic partition pruning (as pointed out by Henry R.) and doesn't have any comments about the implementation's approach. And the PR description also doesn't have much information. It lists 3 cases for how a filter is built, but nothing about the overall approach or design that helps when trying to find out where it should be placed in the optimizer rules. It also isn't clear why this couldn't be part of spark-catalyst. On Wed, Oct 2, 2019 at 1:48 AM Wenchen Fan wrote: > dynamic partition pruning rule generates "hidden" filters that will be > converted to real predicates at runtime, so it doesn't matter where we run > the rule. > > For PruneFileSourcePartitions, I'm not quite sure. Seems to me it's better > to run it before join reorder. > > On Sun, Sep 29, 2019 at 5:51 AM Ryan Blue > wrote: > >> Hi everyone, >> >> I have been working on a PR that moves filter and projection pushdown >> into the optimizer for DSv2, instead of when converting to physical plan. >> This will make DSv2 work with optimizer rules that depend on stats, like >> join reordering. >> >> While adding the optimizer rule, I found that some rules appear to be out >> of order. For example, PruneFileSourcePartitions that handles filter >> pushdown for v1 scans is in SparkOptimizer (spark-sql) in a batch that >> will run after all of the batches in Optimizer (spark-catalyst) >> including CostBasedJoinReorder. >> >> SparkOptimizer also adds the new “dynamic partition pruning” rules >> *after* both the cost-based join reordering and the v1 partition pruning >> rule. I’m not sure why this should run after join reordering and partition >> pruning, since it seems to me like additional filters would be good to have >> before those rules run. >> >> It looks like this might just be that the rules were written in the >> spark-sql module instead of in catalyst. That makes some sense for the v1 >> pushdown, which is altering physical plan details (FileIndex) that have >> leaked into the logical plan. I’m not sure why the dynamic partition >> pruning rules aren’t in catalyst or why they run after the v1 predicate >> pushdown. >> >> Can someone more familiar with these rules clarify why they appear to be >> out of order? >> >> Assuming that this is an accident, I think it’s something that should be >> fixed before 3.0. My PR fixes early pushdown, but the “dynamic” pruning may >> still need to be addressed. >> >> rb >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Out of order optimizer rules?
Thanks for the pointers, but what I'm looking for is information about the design of this implementation, like what requires this to be in spark-sql instead of spark-catalyst. Even a high-level description, like what the optimizer rules are and what they do would be great. Was there one written up internally that you could share? On Wed, Oct 2, 2019 at 10:40 AM Maryann Xue wrote: > > It lists 3 cases for how a filter is built, but nothing about the > overall approach or design that helps when trying to find out where it > should be placed in the optimizer rules. > > The overall idea/design of DPP can be simply put as using the result of > one side of the join to prune partitions of a scan on the other side. The > optimal situation is when the join is a broadcast join and the table being > partition-pruned is on the probe side. In that case, by the time the probe > side starts, the filter will already have the results available and ready > for reuse. > > Regarding the place in the optimizer rules, it's preferred to happen late > in the optimization, and definitely after join reorder. > > > Thanks, > Maryann > > On Wed, Oct 2, 2019 at 12:20 PM Reynold Xin wrote: > >> Whoever created the JIRA years ago didn't describe dpp correctly, but the >> linked jira in Hive was correct (which unfortunately is much more terse >> than any of the patches we have in Spark >> https://issues.apache.org/jira/browse/HIVE-9152). Henry R's description >> was also correct. >> >> >> >> >> >> On Wed, Oct 02, 2019 at 9:18 AM, Ryan Blue >> wrote: >> >>> Where can I find a design doc for dynamic partition pruning that >>> explains how it works? >>> >>> The JIRA issue, SPARK-11150, doesn't seem to describe dynamic partition >>> pruning (as pointed out by Henry R.) and doesn't have any comments about >>> the implementation's approach. And the PR description also doesn't have >>> much information. It lists 3 cases for how a filter is built, but >>> nothing about the overall approach or design that helps when trying to find >>> out where it should be placed in the optimizer rules. It also isn't clear >>> why this couldn't be part of spark-catalyst. >>> >>> On Wed, Oct 2, 2019 at 1:48 AM Wenchen Fan wrote: >>> >>>> dynamic partition pruning rule generates "hidden" filters that will be >>>> converted to real predicates at runtime, so it doesn't matter where we run >>>> the rule. >>>> >>>> For PruneFileSourcePartitions, I'm not quite sure. Seems to me it's >>>> better to run it before join reorder. >>>> >>>> On Sun, Sep 29, 2019 at 5:51 AM Ryan Blue >>>> wrote: >>>> >>>>> Hi everyone, >>>>> >>>>> I have been working on a PR that moves filter and projection pushdown >>>>> into the optimizer for DSv2, instead of when converting to physical plan. >>>>> This will make DSv2 work with optimizer rules that depend on stats, like >>>>> join reordering. >>>>> >>>>> While adding the optimizer rule, I found that some rules appear to be >>>>> out of order. For example, PruneFileSourcePartitions that handles >>>>> filter pushdown for v1 scans is in SparkOptimizer (spark-sql) in a >>>>> batch that will run after all of the batches in Optimizer >>>>> (spark-catalyst) including CostBasedJoinReorder. >>>>> >>>>> SparkOptimizer also adds the new “dynamic partition pruning” rules >>>>> *after* both the cost-based join reordering and the v1 partition >>>>> pruning rule. I’m not sure why this should run after join reordering and >>>>> partition pruning, since it seems to me like additional filters would be >>>>> good to have before those rules run. >>>>> >>>>> It looks like this might just be that the rules were written in the >>>>> spark-sql module instead of in catalyst. That makes some sense for the v1 >>>>> pushdown, which is altering physical plan details (FileIndex) that >>>>> have leaked into the logical plan. I’m not sure why the dynamic partition >>>>> pruning rules aren’t in catalyst or why they run after the v1 predicate >>>>> pushdown. >>>>> >>>>> Can someone more familiar with these rules clarify why they appear to >>>>> be out of order? >>>>> >>>>> Assuming that this is an accident, I think it’s something that should >>>>> be fixed before 3.0. My PR fixes early pushdown, but the “dynamic” pruning >>>>> may still need to be addressed. >>>>> >>>>> rb >>>>> -- >>>>> Ryan Blue >>>>> Software Engineer >>>>> Netflix >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> >> -- Ryan Blue Software Engineer Netflix
Re: [VOTE][SPARK-28885] Follow ANSI store assignment rules in table insertion by default
+1 Thanks for fixing this! On Thu, Oct 10, 2019 at 6:30 AM Xiao Li wrote: > +1 > > On Thu, Oct 10, 2019 at 2:13 AM Hyukjin Kwon wrote: > >> +1 (binding) >> >> 2019년 10월 10일 (목) 오후 5:11, Takeshi Yamamuro 님이 작성: >> >>> Thanks for the great work, Gengliang! >>> >>> +1 for that. >>> As I said before, the behaviour is pretty common in DBMSs, so the change >>> helps for DMBS users. >>> >>> Bests, >>> Takeshi >>> >>> >>> On Mon, Oct 7, 2019 at 5:24 PM Gengliang Wang < >>> gengliang.w...@databricks.com> wrote: >>> >>>> Hi everyone, >>>> >>>> I'd like to call for a new vote on SPARK-28885 >>>> <https://issues.apache.org/jira/browse/SPARK-28885> "Follow ANSI store >>>> assignment rules in table insertion by default" after revising the ANSI >>>> store assignment policy(SPARK-29326 >>>> <https://issues.apache.org/jira/browse/SPARK-29326>). >>>> When inserting a value into a column with the different data type, >>>> Spark performs type coercion. Currently, we support 3 policies for the >>>> store assignment rules: ANSI, legacy and strict, which can be set via the >>>> option "spark.sql.storeAssignmentPolicy": >>>> 1. ANSI: Spark performs the store assignment as per ANSI SQL. In >>>> practice, the behavior is mostly the same as PostgreSQL. It disallows >>>> certain unreasonable type conversions such as converting `string` to `int` >>>> and `double` to `boolean`. It will throw a runtime exception if the value >>>> is out-of-range(overflow). >>>> 2. Legacy: Spark allows the store assignment as long as it is a valid >>>> `Cast`, which is very loose. E.g., converting either `string` to `int` or >>>> `double` to `boolean` is allowed. It is the current behavior in Spark 2.x >>>> for compatibility with Hive. When inserting an out-of-range value to an >>>> integral field, the low-order bits of the value is inserted(the same as >>>> Java/Scala numeric type casting). For example, if 257 is inserted into a >>>> field of Byte type, the result is 1. >>>> 3. Strict: Spark doesn't allow any possible precision loss or data >>>> truncation in store assignment, e.g., converting either `double` to `int` >>>> or `decimal` to `double` is allowed. The rules are originally for Dataset >>>> encoder. As far as I know, no mainstream DBMS is using this policy by >>>> default. >>>> >>>> Currently, the V1 data source uses "Legacy" policy by default, while V2 >>>> uses "Strict". This proposal is to use "ANSI" policy by default for both V1 >>>> and V2 in Spark 3.0. >>>> >>>> This vote is open until Friday (Oct. 11). >>>> >>>> [ ] +1: Accept the proposal >>>> [ ] +0 >>>> [ ] -1: I don't think this is a good idea because ... >>>> >>>> Thank you! >>>> >>>> Gengliang >>>> >>> >>> >>> -- >>> --- >>> Takeshi Yamamuro >>> >> -- > [image: Databricks Summit - Watch the talks] > <https://databricks.com/sparkaisummit/north-america> > -- Ryan Blue Software Engineer Netflix
DataSourceV2 sync notes - 2 October 2019
Here are my notes from last week's DSv2 sync. *Attendees*: Ryan Blue Terry Kim Wenchen Fan *Topics*: - SchemaPruning only supports Parquet and ORC? - Out of order optimizer rules - 3.0 work - Rename session catalog to spark_catalog - Finish TableProvider update to avoid another API change: pass all table config from metastore - Catalog behavior fix: https://issues.apache.org/jira/browse/SPARK-29014 - Stats push-down optimization: https://github.com/apache/spark/pull/25955 - DataFrameWriter v1/v2 compatibility progress - Open PRs - Update identifier resolution and table resolution: https://github.com/apache/spark/pull/25747 - Expose SerializableConfiguration: https://github.com/apache/spark/pull/26005 - Early DSv2 pushdown: https://github.com/apache/spark/pull/25955 *Discussion*: - Update identifier and table resolution - Wenchen: Will not handle SPARK-29014, it is a pure refactor - Ryan: I think this should separate the v2 rules from the v1 fallback, to keep table and identifier resolution separate. The only time that table resolution needs to be done at the same time is for v1 fallback. - This was merged last week - Update to use spark_catalog - Wenchen: this will be a separate PR. - Now open: https://github.com/apache/spark/pull/26071 - Early DSv2 pushdown - Ryan: this depends on fixing a few more tests. To validate there are no calls to computeStats with the DSv2 relation, I’ve temporarily removed the method. Other than a few remaining test failures where the old relation was expected, it looks like there are no uses of computeStats before early pushdown in the optimizer. - Wenchen: agreed that the batch was in the correct place in the optimizer - Ryan: once tests are passing, will add the computeStats implementation back with Utils.isTesting to fail during testing when called before early pushdown, but will not fail at runtime - Wenchen: when using v2, there is no way to configure custom options for a JDBC table. For v1, the table was created and stored in the session catalog, at which point Spark-specific properties like parallelism could be stored. In v2, the catalog is the source of truth, so tables don’t get created in the same way. Options are only passed in a create statement. - Ryan: this could be fixed by allowing users to pass options as table properties. We mix the two today, but if we used a prefix for table properties, “options.”, then you could use SET TBLPROPERTIES to get around this. That’s also better for compatibility. I’ll open a PR for this. - Ryan: this could also be solved by adding an OPTIONS clause or hint to SELECT - Wenchen: There are commands without v2 statements. We should add v2 statements to reject non-v1 uses. - Ryan: Doesn’t the parser only parse up to 2 identifiers for these? That would handle the majority of cases - Wenchen: Yes, but there is still a problem for identifiers with 1 part in v2 catalogs, like catalog.table. Commands that don’t support v2 will use catalog.table in the v1 catalog. - Ryan: Sounds like a good plan to update the parser and add statements for these. Do we have a list of commands to update? - Wenchen: REFRESH TABLE, ANALYZE TABLE, ALTER TABLE PARTITION, etc. Will open an umbrella JIRA with a list. -- Ryan Blue Software Engineer Netflix
Cancel DSv2 sync this week
Hi everyone, I can't make it to the DSv2 sync tomorrow, so let's skip it. If anyone would prefer to have one and is willing to take notes, I can send out the invite. Just let me know, otherwise let's consider it cancelled. Thanks, rb -- Ryan Blue Software Engineer Netflix
DSv2 sync notes - 30 October 2019
*Attendees*: Ryan Blue Terry Kim Wenchen Fan Jose Torres Jacky Lee Gengliang Wang *Topics*: - DROP NAMESPACE cascade behavior - 3.0 tasks - TableProvider API changes - V1 and V2 table resolution rules - Separate logical and physical write (for streaming) - Bucketing support (if time) - Open PRs *Discussion*: - DROP NAMESPACE cascade - Terry: How should the cascade option be handled? - Ryan: The API currently requires failing when the namespace is non-empty; the intent is for Spark to handle the complexity of recursive deletes - Wenchen: That will be slow because Spark has to list and issue individual delete calls. - Ryan: What about changing this so that DROP is always a recursive drop? Then Spark can check all implemented features (views for ViewCatalog, tables for TableCatalog) and we don’t need to add more calls and args. - Consensus was to update dropNamespace so that it is always cascading, so implementations can speed up the operation. Spark will check whether a namespace is empty and not issue the call if it is non-empty or the query was not cascading. - Remaining 3.0 tasks: - Add inferSchema and inferPartitioning to TableProvider (#26297) - Add catalog and identifier methods so that DataFrameWriter can support ErrorIfExists and Ignore modes - TableProvider changes: - Wenchen: tables need both schema and partitioning. Sometimes these are provided but not always. Currently, they are inferred if not provided, but this is implicit based on whether they are passed. - Wenchen: A better API is to add inferSchema and inferPartitioning that are separate from getTable, so they are always explicitly passed to getTable. - Wenchen: the only problem is on the write path, where inference is not currently done for path-based tables. The PR has a special case to skip inference in this case. - Ryan: Sounds okay, will review soon. - Ryan: Why is inference so expensive? - Wenchen: No validation on write means extra validation is needed to read. All file schemas should be used to ensure compatibility. Partitioning is similar: more examples are needed to determine partition column types. - Resolution rules - Ryan: we found that the v1 and v2 rules are order dependent. Wenchen has a PR, but it rewrites the v1 ResolveRelations rule. That’s concerning because we don’t want to risk breaking v1 in 3.0. So we need to find a work-around - Wenchen: Burak suggested a work-around that should be a good approach - Ryan: Agreed. And in the long term, I don’t think we want to mix view and table resolution. View resolution is complicated because it requires context (e.g., current db). But it shouldn’t be necessary to resolve tables at the same time. Identifiers can be rewritten to avoid this. We should also consider moving view resolution into an earlier batch. In that case, view resolution would happen in a fixed-point batch and it wouldn’t need the custom recursive code. - Ryan: Can permanent views resolve temporary views? If not, we can move temporary views sooner, which would help simplify the v2 resolution rules. - Separating logical and physical writes - Wenchen: there is a use case to add physical information to streaming writes, like parallelism. The way streaming is written, it makes sense to separate writes into logical and physical stages, like the read side with Scan and Batch. - Ryan: So this would create separate Write and Batch objects? Would this move epoch ID to the creation of a batch write? - Wenchen: maybe. Will write up a design doc. Goal is to get this into Spark 3.0 if possible - Ryan: Okay, but I think TableProvider is still high priority for the 3.0 work - Wenchen: Agreed. -- Ryan Blue Software Engineer Netflix
Re: DSv2 reader lifecycle
Hi Andrew, This is expected behavior for DSv2 in 2.4. A separate reader is configured for each operation because the configuration will change. A count, for example, doesn't need to project any columns, but a count distinct will. Similarly, if your read has different filters we need to apply those to a separate reader for each scan. The newer API that we are releasing in Spark 3.0 addresses the concern that each reader is independent by using Catalog and Table interfaces. In the new version, Spark will load a table by name from a persistent catalog (loaded once) and use the table to create a reader for each operation. That way, you can load common metadata in the table, cache the table, and pass its info to readers as they are created. rb On Tue, Nov 5, 2019 at 4:58 PM Andrew Melo wrote: > Hello, > > During testing of our DSv2 implementation (on 2.4.3 FWIW), it appears that > our DataSourceReader is being instantiated multiple times for the same > dataframe. For example, the following snippet > > Dataset df = spark > .read() > .format("edu.vanderbilt.accre.laurelin.Root") > .option("tree", "Events") > .load("testdata/pristine/2018nanoaod1june2019.root"); > > Constructs edu.vanderbilt.accre.laurelin.Root twice and then calls > createReader once (as an aside, this seems like a lot for 1000 columns? > "CodeGenerator: Code generated in 8162.847517 ms") > > but then running operations on that dataframe (e.g. df.count()) calls > createReader for each call, instead of holding the existing > DataSourceReader. > > Is that the expected behavior? Because of the file format, it's quite > expensive to deserialize all the various metadata, so I was holding the > deserialized version in the DataSourceReader, but if Spark is repeatedly > constructing new ones, then that doesn't help. If this is the expected > behavior, how should I handle this as a consumer of the API? > > Thanks! > Andrew > -- Ryan Blue Software Engineer Netflix
Re: Enabling fully disaggregated shuffle on Spark
huffle manager itself. If there is substantial overlap between the >> SortShuffleManager and other implementations, then the storage details can >> be abstracted at the appropriate level. (SPARK-25299 does not currently >> change this.) >> >> >> Do not require MapStatus to include blockmanager IDs where they are not >> relevant. This is captured by ShuffleBlockInfo >> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> >> including an optional BlockManagerId in SPARK-25299. However, this >> change should be lifted to the MapStatus level so that it applies to all >> ShuffleManagers. Alternatively, use a more general data-location >> abstraction than BlockManagerId. This gives the shuffle manager more >> flexibility and the scheduler more information with respect to data >> residence. >> Serialization >> >> Allow serializers to be used more flexibly and efficiently. For example, >> have serializers support writing an arbitrary number of objects into an >> existing OutputStream or ByteBuffer. This enables objects to be serialized >> to direct buffers where doing so makes sense. More importantly, it allows >> arbitrary metadata/framing data to be wrapped around individual objects >> cheaply. Right now, that’s only possible at the stream level. (There are >> hacks around this, but this would enable more idiomatic use in efficient >> shuffle implementations.) >> >> >> Have serializers indicate whether they are deterministic. This provides >> much of the value of a shuffle service because it means that reducers do >> not need to spill to disk when reading/merging/combining inputs--the data >> can be grouped by the service, even without the service understanding data >> types or byte representations. Alternative (less preferable since it would >> break Java serialization, for example): require all serializers to be >> deterministic. >> >> >> >> -- >> >> - Ben >> > -- Ryan Blue Software Engineer Netflix
Re: Spark 2.4.5 release for Parquet and Avro dependency updates?
Just to clarify, I don't think that Parquet 1.10.1 to 1.11.0 is a runtime-incompatible change. The example mixed 1.11.0 and 1.10.1 in the same execution. Michael, please be more careful about announcing compatibility problems in other communities. If you've observed problems, let's find out the root cause first. rb On Fri, Nov 22, 2019 at 8:56 AM Michael Heuer wrote: > Hello, > > Avro 1.8.2 to 1.9.1 is a binary incompatible update, and it appears that > Parquet 1.10.1 to 1.11 will be a runtime-incompatible update (see thread on > dev@parquet > <https://mail-archives.apache.org/mod_mbox/parquet-dev/201911.mbox/%3c8357699c-9295-4eb0-a39e-b3538d717...@gmail.com%3E> > ). > > Might there be any desire to cut a Spark 2.4.5 release so that users can > pick up these changes independently of all the other changes in Spark 3.0? > > Thank you in advance, > >michael > -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Consistent relation resolution behavior in SparkSQL
+1 for the proposal. The current behavior is confusing. We also came up with another case that we should consider while implementing a ViewCatalog: an unresolved relation in a permanent view (from a view catalog) should never resolve a temporary table. If I have a view `pview` defined as `select * from t1` with database `db`, then `t1` should always resolve to `db.t1` and never a temp view `t1`. If it resolves to the temp view, then temp views can unexpectedly change the behavior of stored views. On Wed, Dec 4, 2019 at 7:02 PM Wenchen Fan wrote: > +1, I think it's good for both end-users and Spark developers: > * for end-users, when they lookup a table, they don't need to care which > command triggers it, as the behavior is consistent in all the places. > * for Spark developers, we may simplify the code quite a bit. For now we > have two code paths to lookup tables: one for SELECT/INSERT and one for > other commands. > > Thanks, > Wenchen > > On Mon, Dec 2, 2019 at 9:12 AM Terry Kim wrote: > >> Hi all, >> >> As discussed in SPARK-29900, Spark currently has two different relation >> resolution behaviors: >> >>1. Look up temp view first, then table/persistent view >>2. Look up table/persistent view >> >> The first behavior is used in SELECT, INSERT and a few commands that >> support temp views such as DESCRIBE TABLE, etc. The second behavior is used >> in most commands. Thus, it is hard to predict which relation resolution >> rule is being applied for a given command. >> >> I want to propose a consistent relation resolution behavior in which temp >> views are always looked up first before table/persistent view, as >> described more in detail in this doc: consistent relation resolution >> proposal >> <https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing> >> . >> >> Note that this proposal is a breaking change, but the impact should be >> minimal since this applies only when there are temp views and tables with >> the same name. >> >> Any feedback will be appreciated. >> >> I also want to thank Wenchen Fan, Ryan Blue, Burak Yavuz, and Dongjoon >> Hyun for guidance and suggestion. >> >> Regards, >> Terry >> >> >> <https://issues.apache.org/jira/browse/SPARK-29900> >> > -- Ryan Blue Software Engineer Netflix
Next DSv2 sync date
Hi everyone, I have a conflict with the normal DSv2 sync time this Wednesday and I'd like to attend to talk about the TableProvider API. Would it work for everyone to have the sync at 6PM PST on Tuesday, 10 December instead? I could also make it at the normal time on Thursday. Thanks, -- Ryan Blue Software Engineer Netflix
Re: Next DSv2 sync date
Actually, my conflict was cancelled so I'll send out the usual invite for Wednesday. Sorry for the noise. On Sun, Dec 8, 2019 at 3:15 PM Ryan Blue wrote: > Hi everyone, > > I have a conflict with the normal DSv2 sync time this Wednesday and I'd > like to attend to talk about the TableProvider API. > > Would it work for everyone to have the sync at 6PM PST on Tuesday, 10 > December instead? I could also make it at the normal time on Thursday. > > Thanks, > > -- > Ryan Blue > Software Engineer > Netflix > -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Add close() on DataWriter interface
Sounds good to me, too. On Wed, Dec 11, 2019 at 1:18 AM Jungtaek Lim wrote: > Thanks for the quick response, Wenchen! > > I'll leave this thread for early tomorrow so that someone in US timezone > can chime in, and craft a patch if no one objects. > > On Wed, Dec 11, 2019 at 4:41 PM Wenchen Fan wrote: > >> PartitionReader extends Closable, seems reasonable to me to do the same >> for DataWriter. >> >> On Wed, Dec 11, 2019 at 1:35 PM Jungtaek Lim < >> kabhwan.opensou...@gmail.com> wrote: >> >>> Hi devs, >>> >>> I'd like to propose to add close() on DataWriter explicitly, which is >>> the place for resource cleanup. >>> >>> The rationalization of the proposal is due to the lifecycle of >>> DataWriter. If the scaladoc of DataWriter is correct, the lifecycle of >>> DataWriter instance ends at either commit() or abort(). That makes >>> datasource implementors to feel they can place resource cleanup in both >>> sides, but abort() can be called when commit() fails; so they have to >>> ensure they don't do double-cleanup if cleanup is not idempotent. >>> >>> I've checked some callers to see whether they can apply >>> "try-catch-finally" to ensure close() is called at the end of lifecycle for >>> DataWriter, and they look like so, but I might be missing something. >>> >>> What do you think? It would bring backward incompatible change, but >>> given the interface is marked as Evolving and we're making backward >>> incompatible changes in Spark 3.0, so I feel it may not matter. >>> >>> Would love to hear your thoughts. >>> >>> Thanks in advance, >>> Jungtaek Lim (HeartSaVioR) >>> >>> >>> -- Ryan Blue Software Engineer Netflix
DSv2 sync notes - 11 December 2019
Hi everyone, here are my notes for the DSv2 sync last week. Sorry they’re late! Feel free to add more details or corrections. Thanks! rb *Attendees*: Ryan Blue John Zhuge Dongjoon Hyun Joseph Torres Kevin Yu Russel Spitzer Terry Kim Wenchen Fan Hyukjin Kwan Jacky Lee *Topics*: - Relation resolution behavior doc: https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit - Nested schema pruning for v2 (Dongjoon) - TableProvider changes - Tasks for Spark 3.0 - Open PRs - Nested schema pruning: https://github.com/apache/spark/pull/26751 - Support FIRST and AFTER in DDL: https://github.com/apache/spark/pull/26817 - Add VACUUM - Spark 3.1 goals (if time) *Discussion*: - User-specified schema handling - Burak: User-specified schema should - Relation resolution behavior (see doc link above) - Ryan: Thanks to Terry for fixing table resolution: https://github.com/apache/spark/pull/26684, next step is to clean up temp views - Terry: the idea is to always resolve identifiers the same way and not to resolve a temp view in some cases but not others. If an identifier is a temp view and is used in a context where you can’t use a view, it should fail instead of finding a table. - Ryan: does this need to be done by 3.0? - *Consensus was that it should be done for 3.0* - Ryan: not much activity on the dev list thread for this. Do we move forward anyway? - Wenchen: okay to fix because the scope is small - *Consensus was to go ahead and notify the dev list about changes* because this is a low-risk case that does not occur often (table and temp view conflict) - Burak: cached tables are similar: for insert you get the new results. - Ryan: is that the same issue or a similar problem to fix? - Burak: similar, it can be done separately - Ryan: does this also need to be fixed by 3.0? - Wenchen: it is a blocker (Yes). Spark should invalidate the cached table after a write - Ryan: There’s another issue: how do we handle a permanent view with a name that resolves to a temp view? If incorrect, this changes the results of a stored view. - Wenchen: This is currently broken, Spark will resolve the relation as a temp view. But Spark could use the analysis context to fix this. - Ryan: We should fix this when fixing temp views. - Nested schema pruning: - Dongjoon: Nested schema pruning was only done for Parquet and ORC instead of all v2 sources. Anton submitted a PR that fixes it. - At the time, the PR had +1s and was pending some minor discussion. It was merged the next day. - TableProvider changes: - Wenchen: Spark always calls 1 method to load a table. The implementation can do schema and partition inference in that method. Forcing this to be separated into other methods causes problems in the file source. FileIndex is used for all these tasks. - Ryan: I’m not sure that existing file source code is a good enough justification to change the proposed API. Seems too path dependent. - Ryan: It is also strange to have the source of truth for schema information differ between code paths. Some getTable uses would pass the schema to the source (from metastore) with TableProvider, but some would instead rely on the table from getTable to provide its own schema. This is confusing to implementers. - Burak: The default mode in DataFrameWriter is ErrorIfExists, which doesn’t currently work with v2 sources. Moving from Kafka to KafkaV2, for example, would probably break. - Ryan: So do we want to get extractCatalog and extractIdentifier into 3.0? Or is this blocked by the infer changes? - Burak: It would be good to have. - Wenchen: Schema may be inferred, or provided by Spark - Ryan: Sources should specify whether they accept a user-specified schema. But either way, the schema is still external and passed into the table. The main decision is whether all cases (inference included) should pass the schema into the table. - Tasks for 3.0 - Decided to get temp view resolution fixed - Decided to get TableProvider changes in - extractCatalog/extractIdentifier are nice-to-have (but small) - Burak: Upgrading to v2 saveAsTable from DataFrameWriter v1 creates RTAS, but in Delta v1 would only overwrite the schema if requested. It would be nice to be able to select - Ryan: Standardizing behavior (replace vs truncate vs dynamic overwrite) is a main point of v2. Allowing sources to choose their own behavior is not supported in v2 so that we can guarantee consistent semantics across tables. Making a way for Delta to change its semantics doesn’t seem like a good idea. To
Re: [DISCUSS] Revert and revisit the public custom expression API for partition (a.k.a. Transform API)
worse, the parser/analyzer allows arbitrary string as Transform >> name, so it's impossible to have well-defined semantic, and also different >> sources may have different semantic for the same Transform name. >> >> I'd suggest we forbid arbitrary string as Transform (the ApplyTransform >> class). We can even follow DS V1 Filter and expose the classes directly. >> >> On Thu, Jan 16, 2020 at 6:56 PM Hyukjin Kwon wrote: >> >>> Hi all, >>> >>> I would like to suggest to take one step back at >>> https://github.com/apache/spark/pull/24117 and rethink about it. >>> I am writing this email as I raised the issue few times but could not >>> have enough responses promptly, and >>> the code freeze is being close. >>> >>> In particular, please refer the below comments for the full context: >>> - https://github.com/apache/spark/pull/24117#issuecomment-568891483 >>> - https://github.com/apache/spark/pull/24117#issuecomment-568614961 >>> - https://github.com/apache/spark/pull/24117#issuecomment-568891483 >>> >>> >>> In short, this PR added an API in DSv2: >>> >>> CREATE TABLE table(col INT) USING parquet PARTITIONED BY *transform(col)* >>> >>> >>> So people can write some classes for *transform(col)* for partitioned >>> column specifically. >>> >>> However, there are some design concerns which looked not addressed >>> properly. >>> >>> Note that one of the main point is to avoid half-baked or >>> just-work-for-now APIs. However, this looks >>> definitely like half-completed. Therefore, I would like to propose to >>> take one step back and revert it for now. >>> Please see below the concerns listed. >>> >>> *Duplication of existing expressions* >>> Seems like existing expressions are going to be duplicated. See below >>> new APIs added: >>> >>> def years(column: String): YearsTransform = >>> YearsTransform(reference(column)) >>> def months(column: String): MonthsTransform = >>> MonthsTransform(reference(column)) >>> def days(column: String): DaysTransform = DaysTransform(reference(column)) >>> def hours(column: String): HoursTransform = >>> HoursTransform(reference(column)) >>> ... >>> >>> It looks like it requires to add a copy of our existing expressions, in >>> the future. >>> >>> >>> *Limited Extensibility* >>> It has a clear limitation. It looks other expressions are going to be >>> allowed together (e.g., `concat(years(col) + days(col))`); >>> however, it looks impossible to extend with the current design. It just >>> directly maps transformName to implementation class, >>> and just pass arguments: >>> >>> transform >>> ... >>> | transformName=identifier >>> '(' argument+=transformArgument (',' argument+=transformArgument)* >>> ')' #applyTransform >>> ; >>> >>> It looks regular expressions are supported; however, it's not. >>> - If we should support, the design had to consider that. >>> - if we should not support, different syntax might have to be used >>> instead. >>> >>> *Limited Compatibility Management* >>> The name can be arbitrary. For instance, if "transform" is supported in >>> Spark side, the name is preempted by Spark. >>> If every the datasource supported such name, it becomes not compatible. >>> >>> >>> >>> -- Ryan Blue Software Engineer Netflix
Re: [Discuss] Metrics Support for DS V2
We've implemented these metrics in the RDD (for input metrics) and in the v2 DataWritingSparkTask. That approach gives you the same metrics in the stage views that you get with v1 sources, regardless of the v2 implementation. I'm not sure why they weren't included from the start. It looks like the way metrics are collected is changing. There are a couple of metrics for number of rows; looks like one that goes to the Spark SQL tab and one that is used for the stages view. If you'd like, I can send you a patch. rb On Fri, Jan 17, 2020 at 5:09 AM Wenchen Fan wrote: > I think there are a few details we need to discuss. > > how frequently a source should update its metrics? For example, if file > source needs to report size metrics per row, it'll be super slow. > > what metrics a source should report? data size? numFiles? read time? > > shall we show metrics in SQL web UI as well? > > On Fri, Jan 17, 2020 at 3:07 PM Sandeep Katta < > sandeep0102.opensou...@gmail.com> wrote: > >> Hi Devs, >> >> Currently DS V2 does not update any input metrics. SPARK-30362 aims at >> solving this problem. >> >> We can have the below approach. Have marker interface let's say >> "ReportMetrics" >> >> If the DataSource Implements this interface, then it will be easy to >> collect the metrics. >> >> For e.g. FilePartitionReaderFactory can support metrics. >> >> So it will be easy to collect the metrics if FilePartitionReaderFactory >> implements ReportMetrics >> >> >> Please let me know the views, or even if we want to have new solution or >> design. >> > -- Ryan Blue Software Engineer Netflix
Re: [Discuss] Metrics Support for DS V2
I sent them to you. I had to go direct because the ASF mailing list will remove attachments. I'm happy to send them to others if needed as well. On Sun, Jan 19, 2020 at 9:01 PM Sandeep Katta < sandeep0102.opensou...@gmail.com> wrote: > Please send me the patch , I will apply and test. > > On Fri, 17 Jan 2020 at 10:33 PM, Ryan Blue wrote: > >> We've implemented these metrics in the RDD (for input metrics) and in the >> v2 DataWritingSparkTask. That approach gives you the same metrics in the >> stage views that you get with v1 sources, regardless of the v2 >> implementation. >> >> I'm not sure why they weren't included from the start. It looks like the >> way metrics are collected is changing. There are a couple of metrics for >> number of rows; looks like one that goes to the Spark SQL tab and one that >> is used for the stages view. >> >> If you'd like, I can send you a patch. >> >> rb >> >> On Fri, Jan 17, 2020 at 5:09 AM Wenchen Fan wrote: >> >>> I think there are a few details we need to discuss. >>> >>> how frequently a source should update its metrics? For example, if file >>> source needs to report size metrics per row, it'll be super slow. >>> >>> what metrics a source should report? data size? numFiles? read time? >>> >>> shall we show metrics in SQL web UI as well? >>> >>> On Fri, Jan 17, 2020 at 3:07 PM Sandeep Katta < >>> sandeep0102.opensou...@gmail.com> wrote: >>> >>>> Hi Devs, >>>> >>>> Currently DS V2 does not update any input metrics. SPARK-30362 aims at >>>> solving this problem. >>>> >>>> We can have the below approach. Have marker interface let's say >>>> "ReportMetrics" >>>> >>>> If the DataSource Implements this interface, then it will be easy to >>>> collect the metrics. >>>> >>>> For e.g. FilePartitionReaderFactory can support metrics. >>>> >>>> So it will be easy to collect the metrics if FilePartitionReaderFactory >>>> implements ReportMetrics >>>> >>>> >>>> Please let me know the views, or even if we want to have new solution >>>> or design. >>>> >>> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Resolve ambiguous parser rule between two "create table"s
t;>>>> >>>>>> If we really want to promote Spark's one for CREATE TABLE, then would >>>>>> it really matter to treat Hive CREATE TABLE be "exceptional" one and try >>>>>> to >>>>>> isolate each other? What's the point of providing a legacy config to go >>>>>> back to the old one even we fear about breaking something to make it >>>>>> better >>>>>> or clearer? We do think that table provider is important (hence the >>>>>> change >>>>>> was done), then is it still a trivial problem whether the provider is >>>>>> affected by specifying the "optional" fields? >>>>>> >>>>>> >>>>>> On Wed, Mar 18, 2020 at 4:38 PM Wenchen Fan >>>>>> wrote: >>>>>> >>>>>>> I think the general guideline is to promote Spark's own CREATE TABLE >>>>>>> syntax instead of the Hive one. Previously these two rules are mutually >>>>>>> exclusive because the native syntax requires the USING clause while the >>>>>>> Hive syntax makes ROW FORMAT or STORED AS clause optional. >>>>>>> >>>>>>> It's a good move to make the USING clause optional, which makes it >>>>>>> easier to write the native CREATE TABLE syntax. Unfortunately, it leads >>>>>>> to >>>>>>> some conflicts with the Hive CREATE TABLE syntax, but I don't see a >>>>>>> serious >>>>>>> problem here. If a user just writes CREATE TABLE without USING or ROW >>>>>>> FORMAT or STORED AS, does it matter what table we create? Internally the >>>>>>> parser rules conflict and we pick the native syntax depending on the >>>>>>> rule >>>>>>> order. But the user-facing behavior looks fine. >>>>>>> >>>>>>> CREATE EXTERNAL TABLE is a problem as it works in 2.4 but not in >>>>>>> 3.0. Shall we simply remove EXTERNAL from the native CREATE TABLE >>>>>>> syntax? >>>>>>> Then CREATE EXTERNAL TABLE creates Hive table like 2.4. >>>>>>> >>>>>>> On Mon, Mar 16, 2020 at 10:55 AM Jungtaek Lim < >>>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi devs, >>>>>>>> >>>>>>>> I'd like to initiate discussion and hear the voices for resolving >>>>>>>> ambiguous parser rule between two "create table"s being brought by >>>>>>>> SPARK-30098 [1]. >>>>>>>> >>>>>>>> Previously, "create table" parser rules were clearly distinguished >>>>>>>> via "USING provider", which was very intuitive and deterministic. Say, >>>>>>>> DDL >>>>>>>> query creates "Hive" table unless "USING provider" is specified, >>>>>>>> (Please refer the parser rule in branch-2.4 [2]) >>>>>>>> >>>>>>>> After SPARK-30098, "create table" parser rules became ambiguous >>>>>>>> (please refer the parser rule in branch-3.0 [3]) - the factors >>>>>>>> differentiating two rules are only "ROW FORMAT" and "STORED AS" which >>>>>>>> are >>>>>>>> all defined as "optional". Now it relies on the "order" of parser rule >>>>>>>> which end users would have no idea to reason about, and very >>>>>>>> unintuitive. >>>>>>>> >>>>>>>> Furthermore, undocumented rule of EXTERNAL (added in the first rule >>>>>>>> to provide better message) brought more confusion (I've described the >>>>>>>> broken existing query via SPARK-30436 [4]). >>>>>>>> >>>>>>>> Personally I'd like to see two rules mutually exclusive, instead of >>>>>>>> trying to document the difference and talk end users to be careful >>>>>>>> about >>>>>>>> their query. I'm seeing two ways to make rules be mutually exclusive: >>>>>>>> >>>>>>>> 1. Add some identifier in create Hive table rule, like `CREATE ... >>>>>>>> "HIVE" TABLE ...`. >>>>>>>> >>>>>>>> pros. This is the simplest way to distinguish between two rules. >>>>>>>> cons. This would lead end users to change their query if they >>>>>>>> intend to create Hive table. (Given we will also provide legacy option >>>>>>>> I'm >>>>>>>> feeling this is acceptable.) >>>>>>>> >>>>>>>> 2. Define "ROW FORMAT" or "STORED AS" as mandatory one. >>>>>>>> >>>>>>>> pros. Less invasive for existing queries. >>>>>>>> cons. Less intuitive, because they have been optional and now >>>>>>>> become mandatory to fall into the second rule. >>>>>>>> >>>>>>>> Would like to hear everyone's voices; better ideas are welcome! >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Jungtaek Lim (HeartSaVioR) >>>>>>>> >>>>>>>> 1. SPARK-30098 Use default datasource as provider for CREATE TABLE >>>>>>>> syntax >>>>>>>> https://issues.apache.org/jira/browse/SPARK-30098 >>>>>>>> 2. >>>>>>>> https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 >>>>>>>> 3. >>>>>>>> https://github.com/apache/spark/blob/branch-3.0/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 >>>>>>>> 4. https://issues.apache.org/jira/browse/SPARK-30436 >>>>>>>> >>>>>>>> -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Resolve ambiguous parser rule between two "create table"s
I have an update to the parser that unifies the CREATE TABLE rules. It took surprisingly little work to get the parser updated to produce CreateTableStatement and CreateTableAsSelectStatement with the Hive info. And the only fields I need to add to those statements were serde: SerdeInfo and external: Boolean. >From there, we can use the conversion rules to re-create the same Hive command for v1 or pass the data as properties for v2. I’ll work on getting this cleaned up and open a PR hopefully tomorrow. For the questions about how this gets converted to either a Spark or Hive create table command, that is really up to analyzer rules and configuration. With my changes, it is no longer determined by the parser: the parser just produces a node that includes all of the user options and Spark decides what to do with that in the analyzer. Also, there's already an option to convert Hive syntax to a Spark command, spark.sql.hive.convertCTAS. rb On Thu, Mar 19, 2020 at 12:46 AM Wenchen Fan wrote: > Big +1 to have one single unified CREATE TABLE syntax. > > In general, we can say there are 2 ways to specify the table provider: > USING clause and ROW FORMAT/STORED AS clause. These 2 ways are mutually > exclusive. If none is specified, it implicitly indicates USING > defaultSource. > > I'm fine with a few special cases which can indicate the table provider, > like EXTERNAL indicates Hive Serde table. A few thoughts: > 1. SKEWED BY ...: We support it in Hive syntax just to fail it with a > nice error message. We can support it in the unified syntax as well, and > fail it. > 2. PARTITIONED BY colTypeList: I think we can support it in the unified > syntax. Just make sure it doesn't appear together with PARTITIONED BY > transformList. > 3. OPTIONS: We can either map it to Hive Serde properties, or let it > indicate non-Hive tables. > > On Thu, Mar 19, 2020 at 1:00 PM Jungtaek Lim > wrote: > >> Thanks Nicholas for the side comment; you'll need to interpret "CREATE >> TABLE USING HIVE FORMAT" as CREATE TABLE using "HIVE FORMAT", but yes it >> may add the confusion. >> >> Ryan, thanks for the detailed analysis and proposal. That's what I would >> like to see in discussion thread. >> >> I'm open to solutions which enable end users to specify their intention >> properly - my main concern of SPARK-30098 is that it becomes unclear which >> provider the query will use in create table unless USING provider is >> explicitly specified. If the new proposal makes clear on this, that should >> be better than now. >> >> Replying inline: >> >> On Thu, Mar 19, 2020 at 11:06 AM Nicholas Chammas < >> nicholas.cham...@gmail.com> wrote: >> >>> Side comment: The current docs for CREATE TABLE >>> <https://github.com/apache/spark/blob/4237251861c79f3176de7cf5232f0388ec5d946e/docs/sql-ref-syntax-ddl-create-table.md#description> >>> add to the confusion by describing the Hive-compatible command as "CREATE >>> TABLE USING HIVE FORMAT", but neither "USING" nor "HIVE FORMAT" are >>> actually part of the syntax >>> <https://github.com/apache/spark/blob/4237251861c79f3176de7cf5232f0388ec5d946e/docs/sql-ref-syntax-ddl-create-table-hiveformat.md> >>> . >>> >>> On Wed, Mar 18, 2020 at 8:31 PM Ryan Blue >>> wrote: >>> >>>> Jungtaek, it sounds like you consider the two rules to be separate >>>> syntaxes with their own consistency rules. For example, if I am using the >>>> Hive syntax rule, then the PARTITIONED BY clause adds new (partition) >>>> columns and requires types for those columns; if I’m using the Spark syntax >>>> rule with USING then PARTITIONED BY must reference existing columns >>>> and cannot include types. >>>> >>>> I agree that this is confusing to users! We should fix it, but I don’t >>>> think the right solution is to continue to have two rules with divergent >>>> syntax. >>>> >>>> This is confusing to users because they don’t know anything about >>>> separate parser rules. All the user sees is that sometimes PARTITION BY >>>> requires types and sometimes it doesn’t. Yes, we could add a keyword, >>>> HIVE, to signal that the syntax is borrowed from Hive for that case, >>>> but that actually breaks queries that run in Hive. >>>> >>> That might less matter, because SPARK-30098 (and I guess your proposal >> as well) enforces end users to add "USING HIVE" for their queries to enable >> Hive provider in any way, even only when th
Re: [DISCUSS] Supporting hive on DataSourceV2
Hi Jacky, We’ve internally released support for Hive tables (and Spark FileFormat tables) using DataSourceV2 so that we can switch between catalogs; sounds like that’s what you are planning to build as well. It would be great to work with the broader community on a Hive connector. I will get a branch of our connectors published so that you can take a look. I think it should be fairly close to what you’re talking about building, with a few exceptions: - Our implementation always uses our S3 committers, but it should be easy to change this - It supports per-partition formats, like Hive Do you have an idea about where the connector should be developed? I don’t think it makes sense for it to be part of Spark. That would keep complexity in the main project and require updating Hive versions slowly. Using a separate project would mean less code in Spark specific to one source, and could more easily support multiple Hive versions. Maybe we should create a project for catalog plug-ins? rb On Mon, Mar 23, 2020 at 4:20 AM JackyLee wrote: > Hi devs, > I’d like to start a discussion about Supporting Hive on DatasourceV2. We’re > now working on a project using DataSourceV2 to provide multiple source > support and it works with the data lake solution very well, yet it does not > yet support HiveTable. > > There are 3 reasons why we need to support Hive on DataSourceV2. > 1. Hive itself is one of Spark data sources. > 2. HiveTable is essentially a FileTable with its own input and output > formats, it works fine with FileTable. > 3. HiveTable should be stateless, and users can freely read or write Hive > using batch or microbatch. > > We implemented stateless Hive on DataSourceV1, it supports user to write > into Hive on streaming or batch and it has widely used in our company. > Recently, we are trying to support Hive on DataSourceV2, Multiple Hive > Catalog and DDL Commands have already been supported. > > Looking forward to more discussions on this. > > > > -- > Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ > > ----- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- Ryan Blue Software Engineer Netflix
Re: [DISCUSS] Resolve ambiguous parser rule between two "create table"s
Here's a WIP PR with the basic changes: https://github.com/apache/spark/pull/28026 I still need to update tests in that branch and add the conversions to the old Hive plans. But at least you can see how the parser part works and how I'm converting the extra clauses for DSv2. This also enables us to support Hive create syntax in DSv2. On Wed, Mar 25, 2020 at 3:59 PM Jungtaek Lim wrote: > Would it be better to prioritize this to make sure the change is included > in Spark 3.0? (Maybe filing an issue and set as a blocker) > > Looks like there's consensus that SPARK-30098 brought ambiguous issue > which should be fixed (though the consideration of severity seems to be > different), and once we notice the issue it would be really odd if we > publish it as it is, and try to fix it later (the fix may not be even > included in 3.0.x as it might bring behavioral change). > > On Tue, Mar 24, 2020 at 3:37 PM Wenchen Fan wrote: > >> Hi Ryan, >> >> It's great to hear that you are cleaning up this long-standing mess. >> Please let me know if you hit any problems that I can help with. >> >> Thanks, >> Wenchen >> >> On Sat, Mar 21, 2020 at 3:16 AM Nicholas Chammas < >> nicholas.cham...@gmail.com> wrote: >> >>> On Thu, Mar 19, 2020 at 3:46 AM Wenchen Fan wrote: >>> >>>> 2. PARTITIONED BY colTypeList: I think we can support it in the >>>> unified syntax. Just make sure it doesn't appear together with PARTITIONED >>>> BY transformList. >>>> >>> >>> Another side note: Perhaps as part of (or after) unifying the CREATE >>> TABLE syntax, we can also update Catalog.createTable() to support >>> creating partitioned tables >>> <https://issues.apache.org/jira/browse/SPARK-31001>. >>> >> -- Ryan Blue Software Engineer Netflix