RE: How to build single jar for single project in spark
You can try using -pl maven option for this > mvn clean install -pl :spark-core_2.11 From:Qiu, Gerry To:zhangliyun ;dev@spark.apache.org Date:2019-03-26 14:34:20 Subject:RE: How to build single jar for single project in spark You can try this https://spark.apache.org/docs/latest/building-spark.html#building-submodules-individually Thanks, Gerry From: zhangliyun Sent: 2019年3月26日 16:50 To: dev@spark.apache.org Subject: How to build single jar for single project in spark Hi all: I have a question when i modify one file in spark project like org/apache/spark/sql/execution/ui/SparkPlanGraph.scala. Can i only build the single jar spark-core_2.11-2.3.2.jar? After finishing building the single jar then copy the jar to $SPARK_HOME/jars directory. If anyone knows the detail way to make it work, please tell me because i think it spends long time(20+ minutes in my windows laptop) to rebuild the whole package if only make small modification. Best Regards Zhang,Liyun/Kelly Zhang
RE: [VOTE] Release Apache Spark 2.4.5 (RC2)
Is hadoop-3.1 profile supported for this release.? i see lot of UTs failing under this profile. https://github.com/apache/spark/blob/v2.4.5-rc2/pom.xml Example: [INFO] Running org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time elapsed: 1.717 s <<< FAILURE! - in org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite [ERROR] saveExternalTableAndQueryIt(org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite) Time elapsed: 1.675 s <<< ERROR! java.lang.ExceptionInInitializerError at org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite.setUp(JavaMetastoreDataSourcesSuite.java:66) Caused by: java.lang.IllegalArgumentException: Unrecognized Hadoop major version number: 3.1.0 at org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite.setUp(JavaMetastoreDataSourcesSuite.java:66)
RE: Java 11
Support will be released as part of Spark 3.0 Preview: https://spark.apache.org/docs/3.0.0-preview2/#downloading Refer: https://issues.apache.org/jira/browse/SPARK-24417
Spark SQL : How to make Spark support parallelism per sql
Hi all The parallelism of queries executed in given SparkContext can be controlled via spark.default.parallelism I have a scenario where need to run multiple concurrent queries in a single context, but so that to ensure concurrent queries shall be able to utilize the resources without resource starvation. If need to control parallelism such that, each query has its own upper cap of resource usage. I do not see a solution as currently system supports context level resource assignment, which means the resources allocated will be shared between the queries which are executing in the same context. For Example: My SparkContext has started over a cluster of 100 cores, with two concurrent queries Query1 and Query2. Now Query1 to be restricted to use 5 cores and Query 2 to be restricted to 10 cores As I am very new to spark development, a simple solution I see is maintain a ( executionId vs coresUsed ) in TaskSchedulerImpl and hence controlling it @ resourceOfferSingleTaskSet but not sure it will be a good idea. ( seeing it may have adverse effect over parallelize, makeRDD etc) Any suggestions.? or please correct me if there is a problem in my use case. Regards Ajith
Spark SQL : Exception on concurrent insert due to lease over _SUCCESS
Hi all I am using spark 2.1 and I encounter exception when do concurrent insert on a table, Here is my scenario and some analysis create table sample using csv options('path' '/tmp/f/') When concurrent insert are executed, we see exception like below: 2017-12-29 13:41:11,117 | ERROR | main | Aborting job null. | org.apache.spark.internal.Logging$class.logError(Logging.scala:91) org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): No lease on /tmp/f/_SUCCESS (inode 1032508): File does not exist. Holder DFSClient_NONMAPREDUCE_8638078_1 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3466) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3562) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3525) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:917) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:573) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:973) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2260) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2256) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1778) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2254) at org.apache.hadoop.ipc.Client.call(Client.java:1524) at org.apache.hadoop.ipc.Client.call(Client.java:1460) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy14.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:480) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:202) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) at com.sun.proxy.$Proxy15.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:887) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:861) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:822) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:336) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2$$anonfun$apply$mcV$sp$1.apply$mcV$sp(FileFormatWriter.scala:167) at org.apache.spark.util.Utils$.proxyOperate(Utils.scala:2706) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:166) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
[Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event
DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted events has to be processed as DAGSchedulerEventProcessLoop is single threaded and it will block other tasks in queue like TaskCompletion. The JobSubmitted event is time consuming depending on the nature of the job (Example: calculating parent stage dependencies, shuffle dependencies, partitions) and thus it blocks all the events to be processed. I see multiple JIRA referring to this behavior https://issues.apache.org/jira/browse/SPARK-2647 https://issues.apache.org/jira/browse/SPARK-4961 Similarly in my cluster some jobs partition calculation is time consuming (Similar to stack at SPARK-2647) hence it slows down the spark DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if its tasks are finished within seconds, as TaskCompletion Events are processed at a slower rate due to blockage. I think we can split a JobSubmitted Event into 2 events Step 1. JobSubmittedPreperation - Runs in separate thread on JobSubmission, this will involve steps org.apache.spark.scheduler.DAGScheduler#createResultStage Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to DAGSchedulerEventProcessLoop and let it process output of org.apache.spark.scheduler.DAGScheduler#createResultStage I can see the effect of doing this may be that Job Submissions may not be FIFO depending on how much time Step 1 mentioned above is going to consume. Does above solution suffice for the problem described? And is there any other side effect of this solution? Regards Ajith
RE: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event
Thank you all for the responses and feedback. I just checked the code and looks like as Reynold already mentioned, if we change below data structures private[scheduler] val stageIdToStage = new HashMap[Int, Stage] val jobIds = new HashSet[Int] private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] to avoid concurrency issues, it should be good enough. I would like to work on this, so i will open a JIRA and update with a PR very soon. We can continue discussion on the JIRA Regards Ajith From: Reynold Xin [r...@databricks.com] Sent: Wednesday, March 07, 2018 2:47 AM To: Shivaram Venkataraman Cc: Ryan Blue; Ajith shetty; dev@spark.apache.org Subject: Re: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event It's mostly just hash maps from some ids to some state, and those can be replaced just with concurrent hash maps? (I haven't actually looked at code and am just guessing based on recollection.) On Tue, Mar 6, 2018 at 10:42 AM, Shivaram Venkataraman mailto:shiva...@eecs.berkeley.edu>> wrote: The problem with doing work in the callsite thread is that there are a number of data structures that are updated during job submission and these data structures are guarded by the event loop ensuring only one thread accesses them. I dont think there is a very easy fix for this given the structure of the DAGScheduler. Thanks Shivaram On Tue, Mar 6, 2018 at 8:53 AM, Ryan Blue wrote: > I agree with Reynold. We don't need to use a separate pool, which would have > the problem you raised about FIFO. We just need to do the planning outside > of the scheduler loop. The call site thread sounds like a reasonable place > to me. > > On Mon, Mar 5, 2018 at 12:56 PM, Reynold Xin > mailto:r...@databricks.com>> wrote: >> >> Rather than using a separate thread pool, perhaps we can just move the >> prep code to the call site thread? >> >> >> On Sun, Mar 4, 2018 at 11:15 PM, Ajith shetty >> mailto:ajith.she...@huawei.com>> >> wrote: >>> >>> DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted >>> events has to be processed as DAGSchedulerEventProcessLoop is single >>> threaded and it will block other tasks in queue like TaskCompletion. >>> >>> The JobSubmitted event is time consuming depending on the nature of the >>> job (Example: calculating parent stage dependencies, shuffle dependencies, >>> partitions) and thus it blocks all the events to be processed. >>> >>> >>> >>> I see multiple JIRA referring to this behavior >>> >>> https://issues.apache.org/jira/browse/SPARK-2647 >>> >>> https://issues.apache.org/jira/browse/SPARK-4961 >>> >>> >>> >>> Similarly in my cluster some jobs partition calculation is time consuming >>> (Similar to stack at SPARK-2647) hence it slows down the spark >>> DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if >>> its tasks are finished within seconds, as TaskCompletion Events are >>> processed at a slower rate due to blockage. >>> >>> >>> >>> I think we can split a JobSubmitted Event into 2 events >>> >>> Step 1. JobSubmittedPreperation - Runs in separate thread on >>> JobSubmission, this will involve steps >>> org.apache.spark.scheduler.DAGScheduler#createResultStage >>> >>> Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to >>> DAGSchedulerEventProcessLoop and let it process output of >>> org.apache.spark.scheduler.DAGScheduler#createResultStage >>> >>> >>> >>> I can see the effect of doing this may be that Job Submissions may not be >>> FIFO depending on how much time Step 1 mentioned above is going to consume. >>> >>> >>> >>> Does above solution suffice for the problem described? And is there any >>> other side effect of this solution? >>> >>> >>> >>> Regards >>> >>> Ajith >> >> > > > > -- > Ryan Blue > Software Engineer > Netflix
[Spark][Security] UGI credentials lost between driver and executor in yarn mode
Hi all I see UGI credentials (ex sparkCookie) shared from driver to executor is being lost on driver side in yarn mode. Below is the analysis on start of thriftserver, Step 1. SparkSubmit create submit env which does a loginUserFromKeytab "main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab(UserGroupInformation.java:1041) - locked <0x582> (a java.lang.Class) at org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:336) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:156) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:122) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala:-1) Step 2. HiveThriftServer does SparkSQLEnv.init which will Create SparkContext, and hence calls createDriverEnv which will generate secret key and add to UGI by Step 1 as credentials "main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.spark.SecurityManager.generateSecretKey(SecurityManager.scala:429) at org.apache.spark.SecurityManager.(SecurityManager.scala:228) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:237) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:258) at org.apache.spark.SparkContext.(SparkContext.scala:433) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2521) - locked <0x10a5> (a java.lang.Object) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:923) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:915) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:915) - locked <0x1091> (a org.apache.spark.sql.SparkSession$Builder) - locked <0x10a6> (a org.apache.spark.sql.SparkSession$) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:48) at org.apache.spark.sql.hive.thriftserver.a$.main(HiveThriftServer2.scala:86) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala:-1) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:798) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:122) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala:-1) Step 3. Next Application is submitted which will create container launch context using UGI passed from Step 2 "main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.spark.SecurityManager.generateSecretKey(SecurityManager.scala:429) at org.apache.spark.SecurityManager.(SecurityManager.scala:228) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:999) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:194) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:173) at org.apache.spark.SparkContext.(SparkContext.scala:510) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2521)
Permanent UDF support across session
I have a question related to Permanent UDF for spark enabled hive support. When we do create function, this is registered with hive via spark-sql>create function customfun as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFLastDay' using jar 'hdfs:///tmp/hive-exec.jar'; call stack: org.apache.spark.sql.hive.client.HiveClientImpl#createFunction org.apache.spark.sql.hive.HiveExternalCatalog#createFunction org.apache.spark.sql.catalyst.catalog.SessionCatalog#createFunction org.apache.spark.sql.execution.command.CreateFunctionCommand#run but when we call a registered UDF, we do ADD JAR call to hive spark-sql> select customfun('2015-08-22'); call stack: org.apache.spark.sql.hive.client.HiveClientImpl#addJar org.apache.spark.sql.hive.HiveSessionResourceLoader#addJar org.apache.spark.sql.internal.SessionResourceLoader#loadResource org.apache.spark.sql.catalyst.catalog.SessionCatalog#loadFunctionResources org.apache.spark.sql.catalyst.catalog.SessionCatalog#lookupFunction so is the ADD JAR call to hive necessary when we invoke a already registered UDF.? as i see if we follow current code, 1. hive can lookup already registered UDFs without explicit add jar call from spark , Refer https://cwiki.apache.org/confluence/display/Hive/HivePlugins fixed via https://issues.apache.org/jira/browse/HIVE-6380 ( When the function is referenced for the first time by a Hive session, these resources will be added to the environment. ) 2. We cannot have across session as the new session again need to do add jar internally on UDF call, which will fail as caller neeed to have a admin role set ( hive requires add jar to be run only via admin role ) Please correct me if i am wrong, can we avoid add jar when we invoke a registered UDF.? any side-effects if i modify this flow.?