Re: [DISCUSS] Null-handling of primitive-type of untyped Scala UDF in Scala 2.12

2020-03-14 Thread Takeshi Yamamuro
hi, Yi,

Probably, I miss something though, we cannot just wrap the udf with
`if (isnull(x)) null else udf(knownnotnull(x))`?

On Fri, Mar 13, 2020 at 6:22 PM wuyi  wrote:

> Hi all, I'd like to raise a discussion here about null-handling of
> primitive-type of untyped Scala UDF [ udf(f: AnyRef, dataType: DataType) ].
>
> After we switch to Scala 2.12 in 3.0, the untyped Scala UDF is broken
> because now we can't use reflection to get the parameter types of the Scala
> lambda.
> This leads to silent result changing, for example, with UDF defined as `val
> f = udf((x: Int) => x, IntegerType)`, the query `select f($"x")` has
> different
> behavior between 2.4 and 3.0 when the input value of column x is null.
>
> Spark 2.4:  null
> Spark 3.0:  0
>
> Because of it, we deprecate the untyped Scala UDF in 3.0 and recommend
> users
> to use the typed ones. However, recently I identified several valid use
> cases,
> e.g., `val f = (r: Row) => Row(r.getAs[Int](0) * 2)`, where the schema
> cannot be detected in typed Scala UDF [ udf[RT: TypeTag, A1: TypeTag](f:
> Function1[A1, RT]) ].
>
> There are 3 solutions:
> 1. find a way to get Scala lambda parameter types by reflection (I tried it
> very hard but has no luck. The Java SAM type is so dynamic)
> 2. support case class as the input of typed Scala UDF, so at least people
> can still deal with struct type input column with UDF
> 3. add a new variant of untyped Scala UDF which users can specify input
> types
>
> I'd like to see more feedbacks or ideas about how to move forward.
>
> Thanks,
> Yi Wu
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
---
Takeshi Yamamuro


Re: [DISCUSS] Null-handling of primitive-type of untyped Scala UDF in Scala 2.12

2020-03-14 Thread wuyi
Hi Takeshi, thanks for your reply.

Before the broken, we only do the null check for primitive types and leave
null value of non-primitive type to UDF itself in case it will be handled
specifically, e.g., a UDF may return something else for null String. 



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [DISCUSS] Null-handling of primitive-type of untyped Scala UDF in Scala 2.12

2020-03-14 Thread Takeshi Yamamuro
Ah, I see now what the "broken' means. Thanks, Yi.
I personally think the option 1 is the best for existing Spark users to
support the usecase you suggested above.
So, I think this decision depends on how difficult it is to implement "get
Scala lambda parameter types by reflection"
and the complexity of it's implementation.
(I'm not familiar with the 2.12 implementation, so I'm not really sure how
difficult it is)

If we cannot choose the option 1, I like the option 2 better than
adding a new API for the usecase (the option 3).

Bests,
Takeshi

On Sat, Mar 14, 2020 at 6:24 PM wuyi  wrote:

> Hi Takeshi, thanks for your reply.
>
> Before the broken, we only do the null check for primitive types and leave
> null value of non-primitive type to UDF itself in case it will be handled
> specifically, e.g., a UDF may return something else for null String.
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
---
Takeshi Yamamuro


Re: [DISCUSS] Null-handling of primitive-type of untyped Scala UDF in Scala 2.12

2020-03-14 Thread Sean Owen
I don't think it's possible to get the parameters by reflection
anymore -- they are lambdas now in the JVM. At least, indeed, I recall
a few people couldn't find a solution back when we added 2.12 support.
This isn't 'new' in that it has always been the case for Scala 2.12.
If there is a better idea, sure.

On Sat, Mar 14, 2020 at 5:50 AM Takeshi Yamamuro  wrote:
>
> Ah, I see now what the "broken' means. Thanks, Yi.
> I personally think the option 1 is the best for existing Spark users to 
> support the usecase you suggested above.
> So, I think this decision depends on how difficult it is to implement "get 
> Scala lambda parameter types by reflection"
> and the complexity of it's implementation.
> (I'm not familiar with the 2.12 implementation, so I'm not really sure how 
> difficult it is)
>
> If we cannot choose the option 1, I like the option 2 better than
> adding a new API for the usecase (the option 3).
>
> Bests,
> Takeshi
>
> On Sat, Mar 14, 2020 at 6:24 PM wuyi  wrote:
>>
>> Hi Takeshi, thanks for your reply.
>>
>> Before the broken, we only do the null check for primitive types and leave
>> null value of non-primitive type to UDF itself in case it will be handled
>> specifically, e.g., a UDF may return something else for null String.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
> --
> ---
> Takeshi Yamamuro

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



FYI: The evolution on `CHAR` type behavior

2020-03-14 Thread Dongjoon Hyun
Hi, All.

Apache Spark has been suffered from a known consistency issue on `CHAR`
type behavior among its usages and configurations. However, the evolution
direction has been gradually moving forward to be consistent inside Apache
Spark because we don't have `CHAR` offically. The following is the summary.

With 1.6.x ~ 2.3.x, `STORED PARQUET` has the following different result.
(`spark.sql.hive.convertMetastoreParquet=false` provides a fallback to Hive
behavior.)

spark-sql> CREATE TABLE t1(a CHAR(3));
spark-sql> CREATE TABLE t2(a CHAR(3)) STORED AS ORC;
spark-sql> CREATE TABLE t3(a CHAR(3)) STORED AS PARQUET;

spark-sql> INSERT INTO TABLE t1 SELECT 'a ';
spark-sql> INSERT INTO TABLE t2 SELECT 'a ';
spark-sql> INSERT INTO TABLE t3 SELECT 'a ';

spark-sql> SELECT a, length(a) FROM t1;
a   3
spark-sql> SELECT a, length(a) FROM t2;
a   3
spark-sql> SELECT a, length(a) FROM t3;
a 2

Since 2.4.0, `STORED AS ORC` became consistent.
(`spark.sql.hive.convertMetastoreOrc=false` provides a fallback to Hive
behavior.)

spark-sql> SELECT a, length(a) FROM t1;
a   3
spark-sql> SELECT a, length(a) FROM t2;
a 2
spark-sql> SELECT a, length(a) FROM t3;
a 2

Since 3.0.0-preview2, `CREATE TABLE` (without `STORED AS` clause) became
consistent.
(`spark.sql.legacy.createHiveTableByDefault.enabled=true` provides a
fallback to Hive behavior.)

spark-sql> SELECT a, length(a) FROM t1;
a 2
spark-sql> SELECT a, length(a) FROM t2;
a 2
spark-sql> SELECT a, length(a) FROM t3;
a 2

In addition, in 3.0.0, SPARK-31147 aims to ban `CHAR/VARCHAR` type in the
following syntax to be safe.

CREATE TABLE t(a CHAR(3));
https://github.com/apache/spark/pull/27902

This email is sent out to inform you based on the new policy we voted.
The recommendation is always using Apache Spark's native type `String`.

Bests,
Dongjoon.

References:
1. "CHAR implementation?", 2017/09/15

https://lists.apache.org/thread.html/96b004331d9762e356053b5c8c97e953e398e489d15e1b49e775702f%40%3Cdev.spark.apache.org%3E
2. "FYI: SPARK-30098 Use default datasource as provider for CREATE TABLE
syntax", 2019/12/06

https://lists.apache.org/thread.html/493f88c10169680191791f9f6962fd16cd0ffa3b06726e92ed04cbe1%40%3Cdev.spark.apache.org%3E


Re: [DISCUSS] Remove multiple workers on the same host support from Standalone backend

2020-03-14 Thread Andrew Melo
Hi Sean

On Fri, Mar 13, 2020 at 6:46 PM Sean Owen  wrote:

> Do you really need a new cluster per user? and if so, why specify N
> workers > M machines? I am not seeing a need for that. I don't even
> think 2 workers on the same host makes sense, as they are both
> managing the same resources; it only exists for test purposes AFAICT.
>

Sorry, I'm from a completely different field, so I've inherited a
completely different vocabulary. So thanks for bearing with me :)

I think from reading your response, maybe the confusion is that HTCondor is
a completely different resource acquisition model than what industry is
familiar with. Unlike AWS that gives you a whole VM or k8s that gives you a
whole container, condor (and most other batch schedulers) split up a single
bare machine that your job shares with whatever else is on that machine.
You don't get your own machine or even the illusion you have your own
machine (via containerization).

Using these schedulers it's not that you ask for N workers when there's
only M machines, you request N x 8core slots when there are M cores
available, and the scheduler packs them wherever there's free resources.

> What you are trying to do sounds like one cluster, not many. JVMs

> can't be shared across users; JVM = executor. But that's a good thing,
> or else there would be all kinds of collisions.


> What pools are you referring to?


If you're talking about the 2nd half, let's say I'm running two pyspark
notebooks connected to the system above, and batch scheduler gives each of
them 2 cores of slaves. Each notebook will have their own set (which I
called a pool earlier) of slaves, so when you're working in one notebook,
the other notebook of slaves is idle. My comment was about the resources
being idle and the desire to increase utillzation.

Thanks
Andrew

Sean
>
> On Fri, Mar 13, 2020 at 6:33 PM Andrew Melo  wrote:
> >
> > Hi Xingbo, Sean,
> >
> > On Fri, Mar 13, 2020 at 12:31 PM Xingbo Jiang 
> wrote:
> >>
> >> Andrew, could you provide more context of your use case please? Is it
> like you deploy homogeneous containers on hosts with available resources,
> and each container launches one worker? Or you deploy workers directly on
> hosts thus you could have multiple workers from the same application on the
> same host?
> >
> >
> > Sure, I describe a bit more detail about the actual workload below [*],
> but the short version is that our computing resources/infrastructure are
> all built around batch submission into (usually) the HTCondor scheduler,
> and we've got a PoC using pyspark to replace the interactive portion of
> data analysis. To run pyspark on our main resources, we use some scripts
> around the standalone mode to spin up N slaves per-user**, which may or may
> not end up on the same host. I understood Xingbo's original mail to mean
> that wouldn't be allowed in the future, but from Sean's response, it seems
> like I'm incorrect.
> >
> > That being said, our use-case is very bursty, and it would be very good
> if there was a way we could have one pool of JVMs that could be shared
> between N different concurrent users instead of having N different pools of
> JVMs that each serve one person. We're already resource constrained, and
> we're expecting our data rates to increase 10x in 2026, so the less idle
> CPU, the better for us.
> >
> > Andrew
> >
> > * I work for one of the LHC experiments at CERN (https://cms.cern/) and
> there's two main "phases" of our data pipeline: production and analysis.
> The analysis half is currently implemented by having users writing some
> software, splitting the input dataset(s) into N parts and then submitting
> those jobs to the batch system (combining the outputs in a manual
> postprocessing step). In terms of scale, there are currently ~100 users
> running ~900 tasks over ~50k cpus. The use case relevant to this context is
> the terminal analysis phase which involves calculating some additional
> columns, applying calibrations, filtering out the 'interesting' events and
> extracting histograms describing those events. Conceptually, it's an
> iterative process of "extract plots, look at plots, change parameters", but
> running in a batch system means the latency is bad, so it can take a long
> time to converge to the right set of params.
> >
> > ** though we have much smaller, dedicated k8s/mesos/yarn clusters we use
> for prototyping
> >
> >>
> >> Thanks,
> >>
> >> Xingbo
> >>
> >> On Fri, Mar 13, 2020 at 10:23 AM Sean Owen  wrote:
> >>>
> >>> You have multiple workers in one Spark (standalone) app? this wouldn't
> >>> prevent N apps from each having a worker on a machine.
> >>>
> >>> On Fri, Mar 13, 2020 at 11:51 AM Andrew Melo 
> wrote:
> >>> >
> >>> > Hello,
> >>> >
> >>> > On Fri, Feb 28, 2020 at 13:21 Xingbo Jiang 
> wrote:
> >>> >>
> >>> >> Hi all,
> >>> >>
> >>> >> Based on my experience, there is no scenario that necessarily
> requires deploying multiple Workers on the same node with Standalone
> backend. A work

Re: [DISCUSS] Remove multiple workers on the same host support from Standalone backend

2020-03-14 Thread Sean Owen
On Sat, Mar 14, 2020 at 5:56 PM Andrew Melo  wrote:
> Sorry, I'm from a completely different field, so I've inherited a completely 
> different vocabulary. So thanks for bearing with me :)
>
> I think from reading your response, maybe the confusion is that HTCondor is a 
> completely different resource acquisition model than what industry is 
> familiar with. Unlike AWS that gives you a whole VM or k8s that gives you a 
> whole container, condor (and most other batch schedulers) split up a single 
> bare machine that your job shares with whatever else is on that machine. You 
> don't get your own machine or even the illusion you have your own machine 
> (via containerization).
>
> Using these schedulers it's not that you ask for N workers when there's only 
> M machines, you request N x 8core slots when there are M cores available, and 
> the scheduler packs them wherever there's free resources.

Actually, that's exactly what a Spark standalone worker or YARN
NodeManager does. It allocates resources on a shared machine, without
virtualization. If there were Spark <> HTCondor integration, you'd
really just submit apps to the HTCondor cluster and let it allocate
_executors_ for the app for you.

Indeed you would not generally expect a resource manager to guarantee
where the resources come from. So it's possible and normal to have
multiple executors allocated by the resource manager on one machine,
for the same app.

It's not so normal to allocate multiple workers (resource manager
daemons) on a set of physical resources; it needlessly chops them up,
or even, risks them both thinking they're in charge of the same
resources. So, in Spark standalone where you control where workers
run, you wouldn't normally run multiple ones per machine. You'd let
one manage whatever resources the Spark cluster should take on the
hardware. Likewise YARN has one NodeManager per machine.

Here, you have the extra step here of allocating a resource manager
(Spark standalone) within your resource manager (HTCondor) because
there is no direct integration. And I think that's the issue. Resource
manager HTCondor isn't necessarily allocating resources in a way that
makes sense for a second-level resource manager.


> If you're talking about the 2nd half, let's say I'm running two pyspark 
> notebooks connected to the system above, and batch scheduler gives each of 
> them 2 cores of slaves. Each notebook will have their own set (which I called 
> a pool earlier) of slaves, so when you're working in one notebook, the other 
> notebook of slaves is idle. My comment was about the resources being idle and 
> the desire to increase utillzation.

I think you are saying each job spins up a whole new Spark cluster,
and every Spark cluster runs just one app. That's not crazy at all,
though, normally you would also have the possibility of one cluster
running N apps of course, and better sharing its resources. But it
sounds like it's the way you have to do it.

Well I can see some possible outcomes:

1) Can you not use HTCondor? allocate a long-lived Spark standalone
cluster instead on resources managed only by the Spark cluster, and
submit apps to it. The price is no reuse of resources with other
non-Spark applications
2) Can HTCondor be convinced to allocate chunks of resources on
distinct machines? that'd do it too
3) HTCondor can't be convinced to do any isolation of the processes
themselves right? because if the workers aren't on the same 'virtual'
machine or space then it all works out, which is why all this works
fine on K8S.
4) .. just keep this functionality in Spark as a sort of generic
resource manager bridge for cases like this. We may have identified
the perhaps niche but real use case for it beyond testing

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: FYI: The evolution on `CHAR` type behavior

2020-03-14 Thread Reynold Xin
I don’t understand this change. Wouldn’t this “ban” confuse the hell out of
both new and old users?

For old users, their old code that was working for char(3) would now stop
working.

For new users, depending on whether the underlying metastore char(3) is
either supported but different from ansi Sql (which is not that big of a
deal if we explain it) or not supported.

On Sat, Mar 14, 2020 at 3:51 PM Dongjoon Hyun 
wrote:

> Hi, All.
>
> Apache Spark has been suffered from a known consistency issue on `CHAR`
> type behavior among its usages and configurations. However, the evolution
> direction has been gradually moving forward to be consistent inside Apache
> Spark because we don't have `CHAR` offically. The following is the summary.
>
> With 1.6.x ~ 2.3.x, `STORED PARQUET` has the following different result.
> (`spark.sql.hive.convertMetastoreParquet=false` provides a fallback to
> Hive behavior.)
>
> spark-sql> CREATE TABLE t1(a CHAR(3));
> spark-sql> CREATE TABLE t2(a CHAR(3)) STORED AS ORC;
> spark-sql> CREATE TABLE t3(a CHAR(3)) STORED AS PARQUET;
>
> spark-sql> INSERT INTO TABLE t1 SELECT 'a ';
> spark-sql> INSERT INTO TABLE t2 SELECT 'a ';
> spark-sql> INSERT INTO TABLE t3 SELECT 'a ';
>
> spark-sql> SELECT a, length(a) FROM t1;
> a   3
> spark-sql> SELECT a, length(a) FROM t2;
> a   3
> spark-sql> SELECT a, length(a) FROM t3;
> a 2
>
> Since 2.4.0, `STORED AS ORC` became consistent.
> (`spark.sql.hive.convertMetastoreOrc=false` provides a fallback to Hive
> behavior.)
>
> spark-sql> SELECT a, length(a) FROM t1;
> a   3
> spark-sql> SELECT a, length(a) FROM t2;
> a 2
> spark-sql> SELECT a, length(a) FROM t3;
> a 2
>
> Since 3.0.0-preview2, `CREATE TABLE` (without `STORED AS` clause) became
> consistent.
> (`spark.sql.legacy.createHiveTableByDefault.enabled=true` provides a
> fallback to Hive behavior.)
>
> spark-sql> SELECT a, length(a) FROM t1;
> a 2
> spark-sql> SELECT a, length(a) FROM t2;
> a 2
> spark-sql> SELECT a, length(a) FROM t3;
> a 2
>
> In addition, in 3.0.0, SPARK-31147 aims to ban `CHAR/VARCHAR` type in the
> following syntax to be safe.
>
> CREATE TABLE t(a CHAR(3));
> https://github.com/apache/spark/pull/27902
>
> This email is sent out to inform you based on the new policy we voted.
> The recommendation is always using Apache Spark's native type `String`.
>
> Bests,
> Dongjoon.
>
> References:
> 1. "CHAR implementation?", 2017/09/15
>
> https://lists.apache.org/thread.html/96b004331d9762e356053b5c8c97e953e398e489d15e1b49e775702f%40%3Cdev.spark.apache.org%3E
> 2. "FYI: SPARK-30098 Use default datasource as provider for CREATE TABLE
> syntax", 2019/12/06
>
> https://lists.apache.org/thread.html/493f88c10169680191791f9f6962fd16cd0ffa3b06726e92ed04cbe1%40%3Cdev.spark.apache.org%3E
>


smime.p7s
Description: S/MIME Cryptographic Signature