SPARK_WORKER_MEMORY in Spark Standalone - conf.getenv vs System.getenv?

2016-02-11 Thread Jacek Laskowski
Hi,

Is there a reason to use conf to read SPARK_WORKER_MEMORY not
System.getenv as for the other env vars?

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala#L45

Pozdrawiam,
Jacek

Jacek Laskowski | https://medium.com/@jaceklaskowski/
Mastering Apache Spark
==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Long running Spark job on YARN throws "No AMRMToken"

2016-02-11 Thread Prabhu Joseph
Steve,


  When ResourceManager is submitted with an application, AMLauncher
creates the token YARN_AM_RM_TOKEN (token used between RM and AM). When
ApplicationMaster
is launched, it tries to contact RM for registering request, allocate
request to receive containers, finish request. In all the requests,
ResourceManager does the
authorizeRequest, where it checks if the Current User has the token
YARN_AM_RM_TOKEN, if not throws the *"No AMRMToken". *

   ResourceManager for every
yarn.resourcemanager.am-rm-tokens.master-key-rolling-interval-sec rolls the
master key, before rolling it, it has a period
of 1.5 *  yarn.am.liveness-monitor.expiry-interval-ms during which if AM
contacts RM with allocate request, RM checks if the AM has the
YARN_AM_RM_TOKEN
prepared using the previous master key, if so, it updates the AM user with
YARN_AM_RM_TOKEN prepared using new master key.

 If AM contacts with an YARN_AM_RM_TOKEN which is neither constructed
using current master key nor previous master key, then *"Invalid AMRMToken"*
message is thrown. This
error is the one will happen if AM has not been updated with new RM master
key. [YARN-3103 and YARN-2212 ]

Need your help to find scenario where "No AMRMToken" will happen, an user
added with a token but later that token is missing. Is token removed since
expired?


Thanks,
Prabhu Joseph

On Wed, Feb 10, 2016 at 12:59 AM, Hari Shreedharan <
hshreedha...@cloudera.com> wrote:

> The credentials file approach (using keytab for spark apps) will only
> update HDFS tokens. YARN's AMRM tokens should be taken care of by YARN
> internally.
>
> Steve - correct me if I am wrong here: If the AMRM tokens are disappearing
> it might be a YARN bug (does the AMRM token have a 7 day limit as well? I
> thought that was only for HDFS).
>
>
> Thanks,
> Hari
>
> On Tue, Feb 9, 2016 at 8:44 AM, Steve Loughran 
> wrote:
>
>>
>> On 9 Feb 2016, at 11:26, Steve Loughran  wrote:
>>
>>
>> On 9 Feb 2016, at 05:55, Prabhu Joseph 
>> wrote:
>>
>> + Spark-Dev
>>
>> On Tue, Feb 9, 2016 at 10:04 AM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> A long running Spark job on YARN throws below exception after
>>> running for few days.
>>>
>>> yarn.ApplicationMaster: Reporter thread fails 1 time(s) in a row.
>>> org.apache.hadoop.yarn.exceptions.YarnException: *No AMRMToken found* for
>>> user prabhu at org.apache.hadoop.yarn.ipc.RPC
>>> Util.getRemoteException(RPCUtil.java:45)
>>>
>>> Do any of the below renew the AMRMToken and solve the issue
>>>
>>> 1. yarn-resourcemanager.delegation.token.max-lifetime increase from 7
>>> days
>>>
>>> 2. Configuring Proxy user:
>>>
>>>  hadoop.proxyuser.yarn.hosts *
>>> 
>>>  hadoop.proxyuser.yarn.groups *
>>> 
>>>
>>
>> wouldnt do that: security issues
>>
>>
>>> 3. Can Spark-1.4.0 handle with fix
>>> https://issues.apache.org/jira/browse/SPARK-5342
>>>
>>> spark.yarn.credentials.file
>>>
>>>
>>>
>> I'll say "maybe" there
>>
>>
>> uprated to a no, having looked at the code more
>>
>>
>> How to renew the AMRMToken for a long running job on YARN?
>>>
>>>
>>>
>>
>> AMRM token renewal should be automatic in AM; Yarn sends a message to the
>> AM (actually an allocate() response with no containers but a new token at
>> the tail of the message.
>>
>> i don't see any logging in the Hadoopp code there (AMRMClientImpl); filed
>> YARN-4682 to add a log statement
>>
>> if someone other than me were to supply a patch to that JIRA to add a log
>> statement *by the end of the day* I'll review it and get it in to Hadoop 2.8
>>
>>
>> like I said: I'll get this in to hadoop-2.8 if someone is timely with the
>> diff
>>
>>
>


Re: [build system] brief downtime, 8am PST thursday feb 10th

2016-02-11 Thread shane knapp
reminder:  this is happening in ~30 minutes


On Wed, Feb 10, 2016 at 10:58 AM, shane knapp  wrote:
> reminder:  this is happening tomorrow morning.
>
> On Mon, Feb 8, 2016 at 9:27 AM, shane knapp  wrote:
>> happy monday!
>>
>> i will be bringing down jenkins and the workers thursday morning to
>> upgrade docker on all of the workers from 1.5.0-1 to 1.7.1-2.
>>
>> as of december last year, docker 1.5 and older lost the ability to
>> pull from the docker hub.  since we're running centos 6.X on our
>> workers, and can't run the 3.X kernel, that limits our options to
>> docker 1.7.
>>
>> this will allow us to close out https://github.com/apache/spark/pull/9893
>>
>> i'll be sure to send updates as they happen.
>>
>> shane

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [build system] brief downtime, 8am PST thursday feb 10th

2016-02-11 Thread shane knapp
this is now done.

On Thu, Feb 11, 2016 at 7:35 AM, shane knapp  wrote:
> reminder:  this is happening in ~30 minutes
>
>
> On Wed, Feb 10, 2016 at 10:58 AM, shane knapp  wrote:
>> reminder:  this is happening tomorrow morning.
>>
>> On Mon, Feb 8, 2016 at 9:27 AM, shane knapp  wrote:
>>> happy monday!
>>>
>>> i will be bringing down jenkins and the workers thursday morning to
>>> upgrade docker on all of the workers from 1.5.0-1 to 1.7.1-2.
>>>
>>> as of december last year, docker 1.5 and older lost the ability to
>>> pull from the docker hub.  since we're running centos 6.X on our
>>> workers, and can't run the 3.X kernel, that limits our options to
>>> docker 1.7.
>>>
>>> this will allow us to close out https://github.com/apache/spark/pull/9893
>>>
>>> i'll be sure to send updates as they happen.
>>>
>>> shane

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[build system] additional jenkins downtime next thursday

2016-02-11 Thread shane knapp
there's a big security patch coming out next week, and i'd like to
upgrade our jenkins installation so that we're covered.  it'll be
around 8am, again, and i'll send out more details about the upgrade
when i get them.

thanks!

shane

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Making BatchPythonEvaluation actually Batch

2016-02-11 Thread Davies Liu
Had a quick look in your commit, I think that make sense, could you
send a PR for that, then we can review it.

In order to support 2), we need to change the serialized Python
function from `f(iter)` to `f(x)`, process one row at a time (not a
partition),
then we can easily combine them together:

for f1(f2(x))  and g1(g2(x)), we can do this in Python:

for row in reading_stream:
   x1, x2 = row
   y1 = f1(f2(x1))
   y2 = g1(g2(x2))
   yield (y1, y2)

For RDD, we still need to use `f(iter)`, but for SQL UDF, use `f(x)`.

On Sun, Jan 31, 2016 at 1:37 PM, Justin Uang  wrote:
> Hey guys,
>
> BLUF: sorry for the length of this email, trying to figure out how to batch
> Python UDF executions, and since this is my first time messing with
> catalyst, would like any feedback
>
> My team is starting to use PySpark UDFs quite heavily, and performance is a
> huge blocker. The extra roundtrip serialization from Java to Python is not a
> huge concern if we only incur it ~once per column for most workflows, since
> it'll be in the same order of magnitude as reading files from disk. However,
> right now each Python UDFs lead to a single roundtrip. There is definitely a
> lot we can do regarding this:
>
> (all the prototyping code is here:
> https://github.com/justinuang/spark/commit/8176749f8a6e6dc5a49fbbb952735ff40fb309fc)
>
> 1. We can't chain Python UDFs.
>
> df.select(python_times_2(python_times_2("col1")))
>
> throws an exception saying that the inner expression isn't evaluable. The
> workaround is to do
>
>
> df.select(python_times_2("col1").alias("tmp")).select(python_time_2("tmp"))
>
> This can be solved in ExtractPythonUDFs by always extracting the inner most
> Python UDF first.
>
>  // Pick the UDF we are going to evaluate (TODO: Support evaluating
> multiple UDFs at a time)
>  // If there is more than one, we will add another evaluation
> operator in a subsequent pass.
> -udfs.find(_.resolved) match {
> +udfs.find { udf =>
> +  udf.resolved && udf.children.map { child: Expression =>
> +child.find { // really hacky way to find if a child of a udf
> has the PythonUDF node
> +  case p: PythonUDF => true
> +  case _ => false
> +}.isEmpty
> +  }.reduce((x, y) => x && y)
> +} match {
>case Some(udf) =>
>  var evaluation: EvaluatePython = null
>
> 2. If we have a Python UDF applied to many different columns, where they
> don’t depend on each other, we can optimize them by collapsing them down
> into a single python worker. Although we have to serialize and send the same
> amount of data to the python interpreter, in the case where I am applying
> the same function to 20 columns, the overhead/context_switches of having 20
> interpreters run at the same time causes huge performance hits. I have
> confirmed this by manually taking the 20 columns, converting them to a
> struct, and then writing a UDF that processes the struct at the same time,
> and the speed difference is 2x. My approach to adding this to catalyst is
> basically to write an optimizer rule called CombinePython which joins
> adjacent EvaluatePython nodes that don’t depend on each other’s variables,
> and then having BatchPythonEvaluation run multiple lambdas at once. I would
> also like to be able to handle the case
> df.select(python_times_2(“col1”).alias(“col1x2”)).select(F.col(“col1x2”),
> python_times_2(“col1x2”).alias(“col1x4”)). To get around that, I add a
> PushDownPythonEvaluation optimizer that will push the optimization through a
> select/project, so that the CombinePython rule can join the two.
>
> 3. I would like CombinePython to be able to handle UDFs that chain off of
> each other.
>
> df.select(python_times_2(python_times_2(“col1”)))
>
> I haven’t prototyped this yet, since it’s a lot more complex. The way I’m
> thinking about this is to still have a rule called CombinePython, except
> that the BatchPythonEvaluation will need to be smart enough to build up the
> dag of dependencies, and then feed that information to the python
> interpreter, so it can compute things in the right order, and reuse the
> in-memory objects that it has already computed. Does this seem right? Should
> the code mainly be in BatchPythonEvaluation? In addition, we will need to
> change up the protocol between the java and python sides to support sending
> this information. What is acceptable?
>
> Any help would be much appreciated! Especially w.r.t where to the design
> choices such that the PR that has a chance of being accepted.
>
> Justin

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Operations on DataFrames with User Defined Types in pyspark

2016-02-11 Thread Franklyn D'souza
I'm using the UDT api to work with a custom Money datatype in dataframes.
heres how i have it setup

class StringUDT(UserDefinedType):


@classmethod
def sqlType(self):
return StringType()

@classmethod
def module(cls):
return cls.__module__

@classmethod
def scalaUDT(cls):
return ''

def serialize(self, obj):
return str(obj)

def deserialize(self, datum):
return Money(datum)


class MoneyUDT(StringUDT):
pass

Money.__UDT__ = MoneyUDT()

I then create a DataFrame like so

df = sc.sql.createDataFrame([[Money("25.0")], [Money("100.0")]], spark_schema)

However i've run into a few snags with this. DFs created using this
UDT can not be orderedBy the UDT column and i can't Union two DFs that
have this UDT on one of their columns.

Is this expected behaviour ? or is my UDT setup wrong ?.


Spark Summit San Francisco 2016 call for presentations (CFP)

2016-02-11 Thread Reynold Xin
FYI,

Call for presentations is now open for Spark Summit. The event will take
place on June 6-8 in San Francisco. Submissions are welcome across a
variety of Spark-related topics, including applications, development, data
science, business value, spark ecosystem and research. Please submit by
February 29th to be considered.

Link to submission: https://spark-summit.org/2016/


Building Spark with a Custom Version of Hadoop: HDFS ClassNotFoundException

2016-02-11 Thread Charlie Wright
I am having issues trying to run a test job on a built version of Spark with a 
custom Hadoop JAR. My custom hadoop version runs without issues and I can run 
jobs from a precompiled version of Spark (with Hadoop) no problem. 
However, whenever I try to run the same Spark example on the Spark version with 
my custom hadoop JAR - I get this error:"Exception in thread "main" 
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.Hdfs not found"
Does anybody know why this is happening?
Thanks,Charles.
  

Re: Building Spark with a Custom Version of Hadoop: HDFS ClassNotFoundException

2016-02-11 Thread Ted Yu
Hdfs class is in hadoop-hdfs-XX.jar

Can you check the classpath to see if the above jar is there ?

Please describe the command lines you used for building hadoop / Spark.

Cheers

On Thu, Feb 11, 2016 at 5:15 PM, Charlie Wright 
wrote:

> I am having issues trying to run a test job on a built version of Spark
> with a custom Hadoop JAR.
> My custom hadoop version runs without issues and I can run jobs from a
> precompiled version of Spark (with Hadoop) no problem.
>
> However, whenever I try to run the same Spark example on the Spark version
> with my custom hadoop JAR - I get this error:
> "Exception in thread "main" java.lang.RuntimeException:
> java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.Hdfs not found"
>
> Does anybody know why this is happening?
>
> Thanks,
> Charles.
>
>