RE: How to build single jar for single project in spark

2019-03-26 Thread Ajith shetty
You can try using -pl maven option for this
> mvn clean install -pl :spark-core_2.11
From:Qiu, Gerry 
To:zhangliyun ;dev@spark.apache.org 
Date:2019-03-26 14:34:20
Subject:RE: How to build single jar for single project in spark

You can try this  
https://spark.apache.org/docs/latest/building-spark.html#building-submodules-individually

Thanks,
Gerry

From: zhangliyun 
Sent: 2019年3月26日 16:50
To: dev@spark.apache.org
Subject: How to build single jar for single project in spark

Hi all:
  I have a question when i modify one file  in spark project like 
org/apache/spark/sql/execution/ui/SparkPlanGraph.scala.  Can i only build the 
single jar  spark-core_2.11-2.3.2.jar? After finishing building the single jar 
then copy the jar to $SPARK_HOME/jars directory.  If anyone knows the detail 
way to make it work, please tell me because  i think it spends long time(20+ 
minutes in my windows laptop) to rebuild the whole package if only make small 
modification.


Best Regards
Zhang,Liyun/Kelly Zhang





RE: [VOTE] Release Apache Spark 2.4.5 (RC2)

2020-02-02 Thread Ajith shetty
Is hadoop-3.1 profile supported for this release.? i see lot of UTs failing 
under this profile.
https://github.com/apache/spark/blob/v2.4.5-rc2/pom.xml

Example:
 [INFO] Running org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite
[ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time elapsed: 1.717 s 
<<< FAILURE! - in org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite
[ERROR] 
saveExternalTableAndQueryIt(org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite)
  Time elapsed: 1.675 s  <<< ERROR!
java.lang.ExceptionInInitializerError
at 
org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite.setUp(JavaMetastoreDataSourcesSuite.java:66)
Caused by: java.lang.IllegalArgumentException: Unrecognized Hadoop major 
version number: 3.1.0
at 
org.apache.spark.sql.hive.JavaMetastoreDataSourcesSuite.setUp(JavaMetastoreDataSourcesSuite.java:66)


RE: Java 11

2020-02-03 Thread Ajith shetty
Support will be released as part of  Spark 3.0

Preview:
https://spark.apache.org/docs/3.0.0-preview2/#downloading

Refer:
https://issues.apache.org/jira/browse/SPARK-24417


Spark SQL : How to make Spark support parallelism per sql

2017-11-23 Thread Ajith shetty
Hi all

The parallelism of queries executed in given SparkContext can be controlled via 
spark.default.parallelism

I have a scenario where need to run multiple concurrent queries in a single 
context, but so that to ensure concurrent queries shall be able to utilize the 
resources without resource starvation.

If need to control parallelism such that, each query has its own upper cap of 
resource usage. I do not see a solution as currently system supports context 
level resource assignment, which means the resources allocated will be shared 
between the queries which are executing in the same context.

For Example: My SparkContext has started over a cluster of 100 cores, with two 
concurrent queries Query1 and Query2. Now Query1 to be restricted to use 5 
cores and Query 2 to be restricted to 10 cores

As I am very new to spark development, a simple solution I see is maintain a ( 
executionId vs coresUsed ) in TaskSchedulerImpl and hence controlling it @ 
resourceOfferSingleTaskSet but not sure it will be a good idea. ( seeing it may 
have adverse effect over parallelize, makeRDD etc)

Any suggestions.? or please correct me if there is a problem in my use case.

Regards
Ajith


Spark SQL : Exception on concurrent insert due to lease over _SUCCESS

2018-01-07 Thread Ajith shetty
Hi all

I am using spark 2.1 and I encounter exception when do concurrent insert on a 
table, Here is my scenario and some analysis

create table sample using csv options('path' '/tmp/f/')

When concurrent insert are executed, we see exception like below:

2017-12-29 13:41:11,117 | ERROR | main | Aborting job null. | 
org.apache.spark.internal.Logging$class.logError(Logging.scala:91)
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): No lease 
on /tmp/f/_SUCCESS (inode 1032508): File does not exist. Holder 
DFSClient_NONMAPREDUCE_8638078_1 does not have any open files.
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3466)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3562)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3525)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:917)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:573)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:973)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2260)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2256)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1778)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2254)

at org.apache.hadoop.ipc.Client.call(Client.java:1524)
at org.apache.hadoop.ipc.Client.call(Client.java:1460)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy14.complete(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:480)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:202)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy15.complete(Unknown Source)
at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:887)
at 
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:861)
at 
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:822)
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:336)
at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2$$anonfun$apply$mcV$sp$1.apply$mcV$sp(FileFormatWriter.scala:167)
at org.apache.spark.util.Utils$.proxyOperate(Utils.scala:2706)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:166)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
  

[Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

2018-03-04 Thread Ajith shetty
DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted events 
has to be processed as DAGSchedulerEventProcessLoop is single threaded and it 
will block other tasks in queue like TaskCompletion.
The JobSubmitted event is time consuming depending on the nature of the job 
(Example: calculating parent stage dependencies, shuffle dependencies, 
partitions) and thus it blocks all the events to be processed.

I see multiple JIRA referring to this behavior
https://issues.apache.org/jira/browse/SPARK-2647
https://issues.apache.org/jira/browse/SPARK-4961

Similarly in my cluster some jobs partition calculation is time consuming 
(Similar to stack at SPARK-2647) hence it slows down the spark 
DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if 
its tasks are finished within seconds, as TaskCompletion Events are processed 
at a slower rate due to blockage.

I think we can split a JobSubmitted Event into 2 events
Step 1. JobSubmittedPreperation - Runs in separate thread on JobSubmission, 
this will involve steps 
org.apache.spark.scheduler.DAGScheduler#createResultStage
Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to 
DAGSchedulerEventProcessLoop and let it process output of 
org.apache.spark.scheduler.DAGScheduler#createResultStage

I can see the effect of doing this may be that Job Submissions may not be FIFO 
depending on how much time Step 1 mentioned above is going to consume.

Does above solution suffice for the problem described? And is there any other 
side effect of this solution?

Regards
Ajith


RE: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

2018-03-07 Thread Ajith shetty
Thank you all for the responses and feedback. I just checked the code and looks 
like as Reynold already mentioned, if we change below data structures

private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
val jobIds = new HashSet[Int]
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]

to avoid concurrency issues, it should be good enough.

I would like to work on this, so i will open a JIRA and update with a PR very 
soon. We can continue discussion on the JIRA

Regards
Ajith

From: Reynold Xin [r...@databricks.com]
Sent: Wednesday, March 07, 2018 2:47 AM
To: Shivaram Venkataraman
Cc: Ryan Blue; Ajith shetty; dev@spark.apache.org
Subject: Re: [Spark][Scheduler] Spark DAGScheduler scheduling performance 
hindered on JobSubmitted Event

It's mostly just hash maps from some ids to some state, and those can be 
replaced just with concurrent hash maps?

(I haven't actually looked at code and am just guessing based on recollection.)

On Tue, Mar 6, 2018 at 10:42 AM, Shivaram Venkataraman 
mailto:shiva...@eecs.berkeley.edu>> wrote:
The problem with doing work in the callsite thread is that there are a
number of data structures that are updated during job submission and
these data structures are guarded by the event loop ensuring only one
thread accesses them.  I dont think there is a very easy fix for this
given the structure of the DAGScheduler.

Thanks
Shivaram

On Tue, Mar 6, 2018 at 8:53 AM, Ryan Blue  wrote:
> I agree with Reynold. We don't need to use a separate pool, which would have
> the problem you raised about FIFO. We just need to do the planning outside
> of the scheduler loop. The call site thread sounds like a reasonable place
> to me.
>
> On Mon, Mar 5, 2018 at 12:56 PM, Reynold Xin 
> mailto:r...@databricks.com>> wrote:
>>
>> Rather than using a separate thread pool, perhaps we can just move the
>> prep code to the call site thread?
>>
>>
>> On Sun, Mar 4, 2018 at 11:15 PM, Ajith shetty 
>> mailto:ajith.she...@huawei.com>>
>> wrote:
>>>
>>> DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted
>>> events has to be processed as DAGSchedulerEventProcessLoop is single
>>> threaded and it will block other tasks in queue like TaskCompletion.
>>>
>>> The JobSubmitted event is time consuming depending on the nature of the
>>> job (Example: calculating parent stage dependencies, shuffle dependencies,
>>> partitions) and thus it blocks all the events to be processed.
>>>
>>>
>>>
>>> I see multiple JIRA referring to this behavior
>>>
>>> https://issues.apache.org/jira/browse/SPARK-2647
>>>
>>> https://issues.apache.org/jira/browse/SPARK-4961
>>>
>>>
>>>
>>> Similarly in my cluster some jobs partition calculation is time consuming
>>> (Similar to stack at SPARK-2647) hence it slows down the spark
>>> DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if
>>> its tasks are finished within seconds, as TaskCompletion Events are
>>> processed at a slower rate due to blockage.
>>>
>>>
>>>
>>> I think we can split a JobSubmitted Event into 2 events
>>>
>>> Step 1. JobSubmittedPreperation - Runs in separate thread on
>>> JobSubmission, this will involve steps
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>> Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to
>>> DAGSchedulerEventProcessLoop and let it process output of
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>>
>>>
>>> I can see the effect of doing this may be that Job Submissions may not be
>>> FIFO depending on how much time Step 1 mentioned above is going to consume.
>>>
>>>
>>>
>>> Does above solution suffice for the problem described? And is there any
>>> other side effect of this solution?
>>>
>>>
>>>
>>> Regards
>>>
>>> Ajith
>>
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix



[Spark][Security] UGI credentials lost between driver and executor in yarn mode

2018-03-20 Thread Ajith shetty
Hi all

I see UGI credentials (ex sparkCookie) shared from driver to executor is being 
lost on driver side in yarn mode. Below is the analysis on start of 
thriftserver,

Step 1. SparkSubmit create submit env which does a loginUserFromKeytab
  "main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
  at 
org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab(UserGroupInformation.java:1041)
  - locked <0x582> (a java.lang.Class)
  at 
org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:336)
  at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:156)
  at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:122)
  at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala:-1)

Step 2. HiveThriftServer does SparkSQLEnv.init which will Create SparkContext, 
and hence calls createDriverEnv which will generate secret key and add to UGI 
by Step 1 as credentials
"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at 
org.apache.spark.SecurityManager.generateSecretKey(SecurityManager.scala:429)
at 
org.apache.spark.SecurityManager.(SecurityManager.scala:228)
at 
org.apache.spark.SparkEnv$.create(SparkEnv.scala:237)
at 
org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
at 
org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:258)
at 
org.apache.spark.SparkContext.(SparkContext.scala:433)
at 
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2521)
- locked <0x10a5> (a java.lang.Object)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:923)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:915)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:915)
- locked <0x1091> (a 
org.apache.spark.sql.SparkSession$Builder)
- locked <0x10a6> (a 
org.apache.spark.sql.SparkSession$)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:48)
at 
org.apache.spark.sql.hive.thriftserver.a$.main(HiveThriftServer2.scala:86)
at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala:-1)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at 
java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:798)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183)
at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208)
at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:122)
at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala:-1)


Step 3. Next Application is submitted which will create container launch 
context using UGI passed from Step 2
"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at 
org.apache.spark.SecurityManager.generateSecretKey(SecurityManager.scala:429)
at 
org.apache.spark.SecurityManager.(SecurityManager.scala:228)
at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:999)
at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:194)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:173)
at 
org.apache.spark.SparkContext.(SparkContext.scala:510)
at 
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2521)

Permanent UDF support across session

2018-09-19 Thread Ajith shetty
I have a question related to Permanent UDF for spark enabled hive support.

When we do create function, this is registered with hive via
 spark-sql>create function customfun as 
'org.apache.hadoop.hive.ql.udf.generic.GenericUDFLastDay' using jar 
'hdfs:///tmp/hive-exec.jar';
 call stack:
 org.apache.spark.sql.hive.client.HiveClientImpl#createFunction
 org.apache.spark.sql.hive.HiveExternalCatalog#createFunction
 org.apache.spark.sql.catalyst.catalog.SessionCatalog#createFunction
 org.apache.spark.sql.execution.command.CreateFunctionCommand#run


but when we call a registered UDF, we do ADD JAR call to hive
 spark-sql> select customfun('2015-08-22');
 call stack:
 org.apache.spark.sql.hive.client.HiveClientImpl#addJar
 org.apache.spark.sql.hive.HiveSessionResourceLoader#addJar
 org.apache.spark.sql.internal.SessionResourceLoader#loadResource
 org.apache.spark.sql.catalyst.catalog.SessionCatalog#loadFunctionResources
 org.apache.spark.sql.catalyst.catalog.SessionCatalog#lookupFunction

so is the ADD JAR call to hive necessary when we invoke a already registered 
UDF.? as i see if we follow current code,
1. hive can lookup already registered UDFs without explicit add jar call from 
spark , Refer https://cwiki.apache.org/confluence/display/Hive/HivePlugins 
fixed via https://issues.apache.org/jira/browse/HIVE-6380 ( When the function 
is referenced for the first time by a Hive session, these resources will be 
added to the environment. )
2. We cannot have across session as the new session again need to do add jar 
internally on UDF call, which will fail as caller neeed to have a admin role 
set ( hive requires add jar to be run only via admin role )

Please correct me if i am wrong, can we avoid add jar when we invoke a 
registered UDF.? any side-effects if i modify this flow.?