SPARK_WORKER_MEMORY in Spark Standalone - conf.getenv vs System.getenv?
Hi, Is there a reason to use conf to read SPARK_WORKER_MEMORY not System.getenv as for the other env vars? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala#L45 Pozdrawiam, Jacek Jacek Laskowski | https://medium.com/@jaceklaskowski/ Mastering Apache Spark ==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/ Follow me at https://twitter.com/jaceklaskowski - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Long running Spark job on YARN throws "No AMRMToken"
Steve, When ResourceManager is submitted with an application, AMLauncher creates the token YARN_AM_RM_TOKEN (token used between RM and AM). When ApplicationMaster is launched, it tries to contact RM for registering request, allocate request to receive containers, finish request. In all the requests, ResourceManager does the authorizeRequest, where it checks if the Current User has the token YARN_AM_RM_TOKEN, if not throws the *"No AMRMToken". * ResourceManager for every yarn.resourcemanager.am-rm-tokens.master-key-rolling-interval-sec rolls the master key, before rolling it, it has a period of 1.5 * yarn.am.liveness-monitor.expiry-interval-ms during which if AM contacts RM with allocate request, RM checks if the AM has the YARN_AM_RM_TOKEN prepared using the previous master key, if so, it updates the AM user with YARN_AM_RM_TOKEN prepared using new master key. If AM contacts with an YARN_AM_RM_TOKEN which is neither constructed using current master key nor previous master key, then *"Invalid AMRMToken"* message is thrown. This error is the one will happen if AM has not been updated with new RM master key. [YARN-3103 and YARN-2212 ] Need your help to find scenario where "No AMRMToken" will happen, an user added with a token but later that token is missing. Is token removed since expired? Thanks, Prabhu Joseph On Wed, Feb 10, 2016 at 12:59 AM, Hari Shreedharan < hshreedha...@cloudera.com> wrote: > The credentials file approach (using keytab for spark apps) will only > update HDFS tokens. YARN's AMRM tokens should be taken care of by YARN > internally. > > Steve - correct me if I am wrong here: If the AMRM tokens are disappearing > it might be a YARN bug (does the AMRM token have a 7 day limit as well? I > thought that was only for HDFS). > > > Thanks, > Hari > > On Tue, Feb 9, 2016 at 8:44 AM, Steve Loughran > wrote: > >> >> On 9 Feb 2016, at 11:26, Steve Loughran wrote: >> >> >> On 9 Feb 2016, at 05:55, Prabhu Joseph >> wrote: >> >> + Spark-Dev >> >> On Tue, Feb 9, 2016 at 10:04 AM, Prabhu Joseph < >> prabhujose.ga...@gmail.com> wrote: >> >>> Hi All, >>> >>> A long running Spark job on YARN throws below exception after >>> running for few days. >>> >>> yarn.ApplicationMaster: Reporter thread fails 1 time(s) in a row. >>> org.apache.hadoop.yarn.exceptions.YarnException: *No AMRMToken found* for >>> user prabhu at org.apache.hadoop.yarn.ipc.RPC >>> Util.getRemoteException(RPCUtil.java:45) >>> >>> Do any of the below renew the AMRMToken and solve the issue >>> >>> 1. yarn-resourcemanager.delegation.token.max-lifetime increase from 7 >>> days >>> >>> 2. Configuring Proxy user: >>> >>> hadoop.proxyuser.yarn.hosts * >>> >>> hadoop.proxyuser.yarn.groups * >>> >>> >> >> wouldnt do that: security issues >> >> >>> 3. Can Spark-1.4.0 handle with fix >>> https://issues.apache.org/jira/browse/SPARK-5342 >>> >>> spark.yarn.credentials.file >>> >>> >>> >> I'll say "maybe" there >> >> >> uprated to a no, having looked at the code more >> >> >> How to renew the AMRMToken for a long running job on YARN? >>> >>> >>> >> >> AMRM token renewal should be automatic in AM; Yarn sends a message to the >> AM (actually an allocate() response with no containers but a new token at >> the tail of the message. >> >> i don't see any logging in the Hadoopp code there (AMRMClientImpl); filed >> YARN-4682 to add a log statement >> >> if someone other than me were to supply a patch to that JIRA to add a log >> statement *by the end of the day* I'll review it and get it in to Hadoop 2.8 >> >> >> like I said: I'll get this in to hadoop-2.8 if someone is timely with the >> diff >> >> >
Re: [build system] brief downtime, 8am PST thursday feb 10th
reminder: this is happening in ~30 minutes On Wed, Feb 10, 2016 at 10:58 AM, shane knapp wrote: > reminder: this is happening tomorrow morning. > > On Mon, Feb 8, 2016 at 9:27 AM, shane knapp wrote: >> happy monday! >> >> i will be bringing down jenkins and the workers thursday morning to >> upgrade docker on all of the workers from 1.5.0-1 to 1.7.1-2. >> >> as of december last year, docker 1.5 and older lost the ability to >> pull from the docker hub. since we're running centos 6.X on our >> workers, and can't run the 3.X kernel, that limits our options to >> docker 1.7. >> >> this will allow us to close out https://github.com/apache/spark/pull/9893 >> >> i'll be sure to send updates as they happen. >> >> shane - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [build system] brief downtime, 8am PST thursday feb 10th
this is now done. On Thu, Feb 11, 2016 at 7:35 AM, shane knapp wrote: > reminder: this is happening in ~30 minutes > > > On Wed, Feb 10, 2016 at 10:58 AM, shane knapp wrote: >> reminder: this is happening tomorrow morning. >> >> On Mon, Feb 8, 2016 at 9:27 AM, shane knapp wrote: >>> happy monday! >>> >>> i will be bringing down jenkins and the workers thursday morning to >>> upgrade docker on all of the workers from 1.5.0-1 to 1.7.1-2. >>> >>> as of december last year, docker 1.5 and older lost the ability to >>> pull from the docker hub. since we're running centos 6.X on our >>> workers, and can't run the 3.X kernel, that limits our options to >>> docker 1.7. >>> >>> this will allow us to close out https://github.com/apache/spark/pull/9893 >>> >>> i'll be sure to send updates as they happen. >>> >>> shane - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[build system] additional jenkins downtime next thursday
there's a big security patch coming out next week, and i'd like to upgrade our jenkins installation so that we're covered. it'll be around 8am, again, and i'll send out more details about the upgrade when i get them. thanks! shane - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Making BatchPythonEvaluation actually Batch
Had a quick look in your commit, I think that make sense, could you send a PR for that, then we can review it. In order to support 2), we need to change the serialized Python function from `f(iter)` to `f(x)`, process one row at a time (not a partition), then we can easily combine them together: for f1(f2(x)) and g1(g2(x)), we can do this in Python: for row in reading_stream: x1, x2 = row y1 = f1(f2(x1)) y2 = g1(g2(x2)) yield (y1, y2) For RDD, we still need to use `f(iter)`, but for SQL UDF, use `f(x)`. On Sun, Jan 31, 2016 at 1:37 PM, Justin Uang wrote: > Hey guys, > > BLUF: sorry for the length of this email, trying to figure out how to batch > Python UDF executions, and since this is my first time messing with > catalyst, would like any feedback > > My team is starting to use PySpark UDFs quite heavily, and performance is a > huge blocker. The extra roundtrip serialization from Java to Python is not a > huge concern if we only incur it ~once per column for most workflows, since > it'll be in the same order of magnitude as reading files from disk. However, > right now each Python UDFs lead to a single roundtrip. There is definitely a > lot we can do regarding this: > > (all the prototyping code is here: > https://github.com/justinuang/spark/commit/8176749f8a6e6dc5a49fbbb952735ff40fb309fc) > > 1. We can't chain Python UDFs. > > df.select(python_times_2(python_times_2("col1"))) > > throws an exception saying that the inner expression isn't evaluable. The > workaround is to do > > > df.select(python_times_2("col1").alias("tmp")).select(python_time_2("tmp")) > > This can be solved in ExtractPythonUDFs by always extracting the inner most > Python UDF first. > > // Pick the UDF we are going to evaluate (TODO: Support evaluating > multiple UDFs at a time) > // If there is more than one, we will add another evaluation > operator in a subsequent pass. > -udfs.find(_.resolved) match { > +udfs.find { udf => > + udf.resolved && udf.children.map { child: Expression => > +child.find { // really hacky way to find if a child of a udf > has the PythonUDF node > + case p: PythonUDF => true > + case _ => false > +}.isEmpty > + }.reduce((x, y) => x && y) > +} match { >case Some(udf) => > var evaluation: EvaluatePython = null > > 2. If we have a Python UDF applied to many different columns, where they > don’t depend on each other, we can optimize them by collapsing them down > into a single python worker. Although we have to serialize and send the same > amount of data to the python interpreter, in the case where I am applying > the same function to 20 columns, the overhead/context_switches of having 20 > interpreters run at the same time causes huge performance hits. I have > confirmed this by manually taking the 20 columns, converting them to a > struct, and then writing a UDF that processes the struct at the same time, > and the speed difference is 2x. My approach to adding this to catalyst is > basically to write an optimizer rule called CombinePython which joins > adjacent EvaluatePython nodes that don’t depend on each other’s variables, > and then having BatchPythonEvaluation run multiple lambdas at once. I would > also like to be able to handle the case > df.select(python_times_2(“col1”).alias(“col1x2”)).select(F.col(“col1x2”), > python_times_2(“col1x2”).alias(“col1x4”)). To get around that, I add a > PushDownPythonEvaluation optimizer that will push the optimization through a > select/project, so that the CombinePython rule can join the two. > > 3. I would like CombinePython to be able to handle UDFs that chain off of > each other. > > df.select(python_times_2(python_times_2(“col1”))) > > I haven’t prototyped this yet, since it’s a lot more complex. The way I’m > thinking about this is to still have a rule called CombinePython, except > that the BatchPythonEvaluation will need to be smart enough to build up the > dag of dependencies, and then feed that information to the python > interpreter, so it can compute things in the right order, and reuse the > in-memory objects that it has already computed. Does this seem right? Should > the code mainly be in BatchPythonEvaluation? In addition, we will need to > change up the protocol between the java and python sides to support sending > this information. What is acceptable? > > Any help would be much appreciated! Especially w.r.t where to the design > choices such that the PR that has a chance of being accepted. > > Justin - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Operations on DataFrames with User Defined Types in pyspark
I'm using the UDT api to work with a custom Money datatype in dataframes. heres how i have it setup class StringUDT(UserDefinedType): @classmethod def sqlType(self): return StringType() @classmethod def module(cls): return cls.__module__ @classmethod def scalaUDT(cls): return '' def serialize(self, obj): return str(obj) def deserialize(self, datum): return Money(datum) class MoneyUDT(StringUDT): pass Money.__UDT__ = MoneyUDT() I then create a DataFrame like so df = sc.sql.createDataFrame([[Money("25.0")], [Money("100.0")]], spark_schema) However i've run into a few snags with this. DFs created using this UDT can not be orderedBy the UDT column and i can't Union two DFs that have this UDT on one of their columns. Is this expected behaviour ? or is my UDT setup wrong ?.
Spark Summit San Francisco 2016 call for presentations (CFP)
FYI, Call for presentations is now open for Spark Summit. The event will take place on June 6-8 in San Francisco. Submissions are welcome across a variety of Spark-related topics, including applications, development, data science, business value, spark ecosystem and research. Please submit by February 29th to be considered. Link to submission: https://spark-summit.org/2016/
Building Spark with a Custom Version of Hadoop: HDFS ClassNotFoundException
I am having issues trying to run a test job on a built version of Spark with a custom Hadoop JAR. My custom hadoop version runs without issues and I can run jobs from a precompiled version of Spark (with Hadoop) no problem. However, whenever I try to run the same Spark example on the Spark version with my custom hadoop JAR - I get this error:"Exception in thread "main" java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.Hdfs not found" Does anybody know why this is happening? Thanks,Charles.
Re: Building Spark with a Custom Version of Hadoop: HDFS ClassNotFoundException
Hdfs class is in hadoop-hdfs-XX.jar Can you check the classpath to see if the above jar is there ? Please describe the command lines you used for building hadoop / Spark. Cheers On Thu, Feb 11, 2016 at 5:15 PM, Charlie Wright wrote: > I am having issues trying to run a test job on a built version of Spark > with a custom Hadoop JAR. > My custom hadoop version runs without issues and I can run jobs from a > precompiled version of Spark (with Hadoop) no problem. > > However, whenever I try to run the same Spark example on the Spark version > with my custom hadoop JAR - I get this error: > "Exception in thread "main" java.lang.RuntimeException: > java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.Hdfs not found" > > Does anybody know why this is happening? > > Thanks, > Charles. > >