Spark SQL - Long running job

2015-02-21 Thread nitin
Hi All,

I intend to build a long running spark application which fetches data/tuples
from parquet, does some processing(time consuming) and then cache the
processed table (InMemoryColumnarTableScan). My use case is good retrieval
time for SQL query(benefits of Spark SQL optimizer) and data
compression(in-built in in-memory caching). Now the problem is that if my
driver goes down, I will have to fetch the data again for all the tables and
compute it and cache which is time consuming.

Is it possible to persist processed/cached RDDs on disk such that my system
up time is less when restarted after failure/going down?

On a side note, the data processing contains a shuffle step which creates
huge temporary shuffle files on local disk in temp folder and as per current
logic, shuffle files don't get deleted for running executors. This is
leading to my local disk getting filled up quickly and going out of space as
its a long running spark job. (running spark in yarn-client mode btw).

Thanks
-Nitin 



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Long-running-job-tp10717.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Spark SQL - Long running job

2015-02-22 Thread nitin
I believe calling processedSchemaRdd.persist(DISK) and
processedSchemaRdd.checkpoint() only persists data and I will lose all the
RDD metadata and when I re-start my driver, that data is kind of useless for
me (correct me if I am wrong).

I thought of doing processedSchemaRdd.saveAsParquetFile (hdfs file system)
but I fear that in case my "HDFS block size" > "partition file size", I will
get more partitions when reading instead of original schemaRdd. 



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Long-running-job-tp10717p10727.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Does Spark delete shuffle files of lost executor in running system(on YARN)?

2015-02-24 Thread nitin
Hi All,

I noticed that Spark doesn't delete local shuffle files of a lost executor
in a running system(running in yarn-client mode). For long running system,
this might fill up disk space in case of frequent executor failures. Can we
delete these files when executor loss reported to driver?

Thanks
-Nitin



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Does-Spark-delete-shuffle-files-of-lost-executor-in-running-system-on-YARN-tp10755.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Continuous warning while consuming using new kafka-spark010 API

2016-09-19 Thread Nitin Goyal
Hi All,

I am using the new kafka-spark010 API to consume messages from Kafka
(brokers running kafka 0.10.0.1).

I am seeing continuous following warning only when producer is writing
messages to kafka in parallel (increased
spark.streaming.kafka.consumer.poll.ms to 1024 ms as well) :-

16/09/19 16:44:53 WARN TaskSetManager: Lost task 97.0 in stage 32.0 (TID
4942, host-3): java.lang.AssertionError: assertion failed: Failed to get
records for spark-executor-example topic2 8 1052989 after polling for 1024

while at same time, I see this in spark UI corresponding to that job
topic: topic2partition: 8offsets: 1051731 to 1066124

Code :-

val stream = KafkaUtils.createDirectStream[String, String]( ssc,
PreferConsistent, Subscribe[String, String](topics, kafkaParams.asScala) )

stream.foreachRDD {rdd => rdd.filter(_ => false).collect}


Has anyone encountered this with the new API? Is this the expected
behaviour or am I missing something here?

-- 
Regards
Nitin Goyal


Parquet-like partitioning support in spark SQL's in-memory columnar cache

2016-11-24 Thread Nitin Goyal
Hi,

Do we have any plan of supporting parquet-like partitioning support in
Spark SQL in-memory cache? Something like one RDD[CachedBatch] per
in-memory cache partition.


-Nitin


Re: Parquet-like partitioning support in spark SQL's in-memory columnar cache

2016-11-28 Thread Nitin Goyal
+Cheng

Hi Reynold,

I think you are referring to bucketing in in-memory columnar cache.

I am proposing that if we have a parquet structure like following :-

//file1/id=1/
//file1/id=2/

and if we read and cache it, it should create 2 RDD[CachedBatch] (each per
value of "id")

Is this what you were refering to originally?

Thanks
-Nitin


On Fri, Nov 25, 2016 at 11:29 AM, Reynold Xin  wrote:

> It's already there isn't it? The in-memory columnar cache format.
>
>
> On Thu, Nov 24, 2016 at 9:06 PM, Nitin Goyal 
> wrote:
>
>> Hi,
>>
>> Do we have any plan of supporting parquet-like partitioning support in
>> Spark SQL in-memory cache? Something like one RDD[CachedBatch] per
>> in-memory cache partition.
>>
>>
>> -Nitin
>>
>
>


-- 
Regards
Nitin Goyal


Guidance for becoming Spark contributor

2015-04-10 Thread Nitin Mathur
Hi Spark Dev Team,

I want to start contributing to Spark Open source. This is the first time I
will be doing any open source contributions.

It would be great if I can get some guidance on where I can start with.

Thanks,
- Nitin


ClosureCleaner slowing down Spark SQL queries

2015-05-27 Thread Nitin Goyal
Hi All,

I am running a SQL query (spark version 1.2) on a table created from
unionAll of 3 schema RDDs which gets executed in roughly 400ms (200ms at
driver and roughly 200ms at executors).

If I run same query on a table created from unionAll of 27 schema RDDS, I
see that executors time is same(because of concurrency and nature of my
query) but driver time shoots to 600ms (and total query time being = 600 +
200 = 800ms).

I attached JProfiler and found that ClosureCleaner clean method is taking
time at driver(some issue related to URLClassLoader) and it linearly
increases with number of RDDs being union-ed on which query is getting
fired. This is causing my query to take a huge amount of time where I expect
the query to be executed within 400ms irrespective of number of RDDs (since
I have executors available to cater my need). PFB the links of screenshots
from Jprofiler :-

http://pasteboard.co/MnQtB4o.png

http://pasteboard.co/MnrzHwJ.png

Any help/suggestion to fix this will be highly appreciated since this needs
to be fixed for production

Thanks in Advance,
Nitin



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/ClosureCleaner-slowing-down-Spark-SQL-queries-tp12466.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: ClosureCleaner slowing down Spark SQL queries

2015-05-27 Thread Nitin Goyal
Hi Ted,

Thanks a lot for replying. First of all, moving to 1.4.0 RC2 is not easy for
us as migration cost is big since lot has changed in Spark SQL since 1.2.

Regarding SPARK-7233, I had already looked at it few hours back and it
solves the problem for concurrent queries but my problem is just for a
single query. I also looked at the fix's code diff and it wasn't related to
the problem which seems to exist in Closure Cleaner code.

Thanks
-Nitin



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/ClosureCleaner-slowing-down-Spark-SQL-queries-tp12466p12468.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: ClosureCleaner slowing down Spark SQL queries

2015-05-30 Thread Nitin Goyal
Thanks Josh and Yin.

Created following JIRA for the same :-

https://issues.apache.org/jira/browse/SPARK-7970

Thanks
-Nitin



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/ClosureCleaner-slowing-down-Spark-SQL-queries-tp12466p12515.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Ever increasing physical memory for a Spark Application in YARN

2015-07-27 Thread Nitin Goyal
I am running a spark application in YARN having 2 executors with Xms/Xmx as
32 Gigs and spark.yarn.excutor.memoryOverhead as 6 gigs.

I am seeing that the app's physical memory is ever increasing and finally
gets killed by node manager

2015-07-25 15:07:05,354 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Container [pid=10508,containerID=container_1437828324746_0002_01_03] is
running beyond physical memory limits. Current usage: 38.0 GB of 38 GB
physical memory used; 39.5 GB of 152 GB virtual memory used. Killing
container.
Dump of the process-tree for container_1437828324746_0002_01_03 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 10508 9563 10508 10508 (bash) 0 0 9433088 314 /bin/bash -c
/usr/java/default/bin/java -server -XX:OnOutOfMemoryError='kill %p'
-Xms32768m -Xmx32768m  -Dlog4j.configuration=log4j-executor.properties
-XX:MetaspaceSize=512m -XX:+UseG1GC -XX:+PrintGCTimeStamps
-XX:+PrintGCDateStamps -XX:+PrintGCDetails -Xloggc:gc.log
-XX:AdaptiveSizePolicyOutputInterval=1  -XX:+UseGCLogFileRotation
-XX:GCLogFileSize=500M -XX:NumberOfGCLogFiles=1
-XX:MaxDirectMemorySize=3500M -XX:NewRatio=3 -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=36082
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false -XX:NativeMemoryTracking=detail
-XX:ReservedCodeCacheSize=100M -XX:MaxMetaspaceSize=512m
-XX:CompressedClassSpaceSize=256m
-Djava.io.tmpdir=/data/yarn/datanode/nm-local-dir/usercache/admin/appcache/application_1437828324746_0002/container_1437828324746_0002_01_03/tmp
'-Dspark.driver.port=43354'
-Dspark.yarn.app.container.log.dir=/opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://sparkDriver@nn1:43354/user/CoarseGrainedScheduler 1 dn3 6
application_1437828324746_0002 1>
/opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03/stdout
2>
/opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03/stderr


I diabled YARN's parameter "yarn.nodemanager.pmem-check-enabled" and noticed
that physical memory usage went till 40 gigs

I checked the total RSS in /proc/pid/smaps and it was same value as physical
memory reported by Yarn and seen in top command.

I checked that its not a problem with the heap but something is increasing
in off heap/ native memory. I used tools like Visual VM but didn't find
anything that's increasing there. MaxDirectMmeory also didn't exceed 600MB.
Peak number of active threads was 70-80 and thread stack size didn't exceed
100MB. MetaspaceSize was around 60-70MB.

FYI, I am on Spark 1.2 and Hadoop 2.4.0 and my spark application is based on
Spark SQL and it's an HDFS read/write intensive application and caches data
in Spark SQL's in-memory caching

Any help would be highly appreciated. Or any hint that where should I look
to debug memory leak or if any tool already there. Let me know if any other
information is needed.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Ever-increasing-physical-memory-for-a-Spark-Application-in-YARN-tp13446.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [ compress in-memory column storage used in sparksql cache table ]

2015-09-02 Thread Nitin Goyal
I think spark sql's in-memory columnar cache already does compression. Check
out classes in following path :-

https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression

Although compression ratio is not as good as Parquet.

Thanks
-Nitin



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/compress-in-memory-column-storage-used-in-sparksql-cache-table-tp13932p13937.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Operations with cached RDD

2015-10-11 Thread Nitin Goyal
The problem is not that zipWithIndex is executed again. "groupBy" triggered
hash partitioning on your keys and a shuffle happened due to that and
that's why you are seeing 2 stages. You can confirm this by clicking on
latter "zipWithIndex" stage and input data has "(memory)" written which
means input data has been fetched from memory (your cached RDD).

As far as lineage/call site is concerned, I think there was a change in
spark 1.3 which excluded some classes from appearing in call site (I know
that some Spark SQL related were removed for sure).

Thanks
-Nitin


On Sat, Oct 10, 2015 at 5:05 AM, Ulanov, Alexander  wrote:

> Dear Spark developers,
>
>
>
> I am trying to understand how Spark UI displays operation with the cached
> RDD.
>
>
>
> For example, the following code caches an rdd:
>
> >> val rdd = sc.parallelize(1 to 5, 5).zipWithIndex.cache
>
> >> rdd.count
>
> The Jobs tab shows me that the RDD is evaluated:
>
> : 1 count at :24  2015/10/09 16:15:430.4
> s   1/1
>
> : 0 zipWithIndex at  :21 2015/10/09 16:15:38
> 0.6 s   1/1
>
> An I can observe this rdd in the Storage tab of Spark UI:
>
> : ZippedWithIndexRDD  Memory Deserialized 1x Replicated
>
>
>
> Then I want to make an operation over the cached RDD. I run the following
> code:
>
> >> val g = rdd.groupByKey()
>
> >> g.count
>
> The Jobs tab shows me a new Job:
>
> : 2 count at :26
>
> Inside this Job there are two stages:
>
> : 3 count at :26 +details 2015/10/09 16:16:18   0.2 s
> 5/5
>
> : 2 zipWithIndex at :21
>
> It shows that zipWithIndex is executed again. It does not seem to be
> reasonable, because the rdd is cached, and zipWithIndex is already executed
> previously.
>
>
>
> Could you explain why if I perform an operation followed by an action on a
> cached RDD, then the last operation in the lineage of the cached RDD is
> shown to be executed in the Spark UI?
>
>
>
>
>
> Best regards, Alexander
>



-- 
Regards
Nitin Goyal


Re: want to contribute

2015-10-29 Thread Nitin Goyal
You both can check out following links :-

https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark

http://spark.apache.org/docs/latest/building-spark.html

Thanks
-Nitin

On Thu, Oct 29, 2015 at 4:13 PM, Aadi Thakar 
wrote:

> Hello, my name is Aaditya Thakkar and I am a second year undergraduate ICT
> student at DA-IICT, Gandhinagar, India. I have quite lately been interested
> in contributing towards the open source organization and I find your
> organization the most appropriate one.
>
> I request you to please guide me through how to install your code-base and
> how to get started to your organization.
> Thanking you,
> Aaditya Thakkar.
>



-- 
Regards
Nitin Goyal


Re: Running individual test classes

2015-11-03 Thread Nitin Goyal
In maven, you might want to try following :-

-DwildcardSuites=org.apache.spark.ml.ProbabilisticClassifierSuite

On Tue, Nov 3, 2015 at 2:42 PM, Michael Armbrust 
wrote:

> In SBT:
>
> build/sbt "mllib/test-only *ProbabilisticClassifierSuite"
>
> On Tue, Nov 3, 2015 at 9:27 AM, Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> Hi all,
>>
>> I'm new to contributing to Spark (and Apache projects in general); I've
>> started working on SPARK-7425
>> <https://issues.apache.org/jira/browse/SPARK-7425> and have implemented
>> what looks like a viable solution. Now I'd like to test it, however I'm
>> having some trouble running an individual test class to quickly iterate
>> over it; I tried running
>>
>> mvn -Dtest=org.apache.spark.ml.ProbabilisticClassifierSuite test
>>
>> and (without the fully qualified class name)
>>
>> mvn -Dtest=ProbabilisticClassifierSuite test
>>
>> but both commands resulted in running all tests, both when launching
>> Maven from the project root and from the MLlib module root. I've tried to
>> look this up in the mailing list archives but haven't had luck so far.
>>
>> How can I run a single test suite? Thanks in advance!
>>
>> --
>> BR,
>> Stefano Baghino
>>
>
>


-- 
Regards
Nitin Goyal


Re: Secondary Indexing of RDDs?

2015-12-14 Thread Nitin Goyal
Spar SQL's in-memory cache stores statistics per column which in turn is
used to skip batches(default size 1) within partition

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala#L25

Hope this helps

Thanks
-Nitin

On Tue, Dec 15, 2015 at 12:28 AM, Michael Segel 
wrote:

> Hi,
>
> This may be a silly question… couldn’t find the answer on my own…
>
> I’m trying to find out if anyone has implemented secondary indexing on
> Spark’s RDDs.
>
> If anyone could point me to some references, it would be helpful.
>
> I’ve seen some stuff on Succinct Spark (see:
> https://amplab.cs.berkeley.edu/succinct-spark-queries-on-compressed-rdds/
>  )
> but was more interested in integration with SparkSQL and SparkSQL support
> for secondary indexing.
>
> Also the reason I’m posting this to the dev list is that there’s more to
> this question …
>
>
> Thx
>
> -Mike
>
>


Re: Ever increasing physical memory for a Spark Application in YARN

2016-05-03 Thread Nitin Goyal
Hi Daniel,

I could indeed discover the problem in my case and it turned out to be a
bug at parquet side and I had raised and contributed to the following issue
:-

https://issues.apache.org/jira/browse/PARQUET-353

Hope this helps!

Thanks
-Nitin


On Mon, May 2, 2016 at 9:15 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> Hi Nitin,
> Sorry for waking up this ancient thread. That's a fantastic set of JVM
> flags! We just hit the same problem, but we haven't even discovered all
> those flags for limiting memory growth. I wanted to ask if you ever
> discovered anything further?
>
> I see you also set -XX:NewRatio=3. This is a very important flag since
> Spark 1.6.0. With unified memory management with the default
> spark.memory.fraction=0.75 the cache will fill up 75% of the heap. The
> default NewRatio is 2, so the cache will not fit in the old generation
> pool, constantly triggering full GCs. With NewRatio=3 the old generation
> pool is 75% of the heap, so it (just) fits the cache. We find this makes a
> very significant performance difference in practice.
>
> Perhaps this should be documented somewhere. Or the default
> spark.memory.fraction should be 0.66, so that it works out with the default
> JVM flags.
>
> On Mon, Jul 27, 2015 at 6:08 PM, Nitin Goyal 
> wrote:
>
>> I am running a spark application in YARN having 2 executors with Xms/Xmx
>> as
>> 32 Gigs and spark.yarn.excutor.memoryOverhead as 6 gigs.
>>
>> I am seeing that the app's physical memory is ever increasing and finally
>> gets killed by node manager
>>
>> 2015-07-25 15:07:05,354 WARN
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container [pid=10508,containerID=container_1437828324746_0002_01_03]
>> is
>> running beyond physical memory limits. Current usage: 38.0 GB of 38 GB
>> physical memory used; 39.5 GB of 152 GB virtual memory used. Killing
>> container.
>> Dump of the process-tree for container_1437828324746_0002_01_03 :
>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>> |- 10508 9563 10508 10508 (bash) 0 0 9433088 314 /bin/bash -c
>> /usr/java/default/bin/java -server -XX:OnOutOfMemoryError='kill %p'
>> -Xms32768m -Xmx32768m  -Dlog4j.configuration=log4j-executor.properties
>> -XX:MetaspaceSize=512m -XX:+UseG1GC -XX:+PrintGCTimeStamps
>> -XX:+PrintGCDateStamps -XX:+PrintGCDetails -Xloggc:gc.log
>> -XX:AdaptiveSizePolicyOutputInterval=1  -XX:+UseGCLogFileRotation
>> -XX:GCLogFileSize=500M -XX:NumberOfGCLogFiles=1
>> -XX:MaxDirectMemorySize=3500M -XX:NewRatio=3
>> -Dcom.sun.management.jmxremote
>> -Dcom.sun.management.jmxremote.port=36082
>> -Dcom.sun.management.jmxremote.authenticate=false
>> -Dcom.sun.management.jmxremote.ssl=false -XX:NativeMemoryTracking=detail
>> -XX:ReservedCodeCacheSize=100M -XX:MaxMetaspaceSize=512m
>> -XX:CompressedClassSpaceSize=256m
>>
>> -Djava.io.tmpdir=/data/yarn/datanode/nm-local-dir/usercache/admin/appcache/application_1437828324746_0002/container_1437828324746_0002_01_03/tmp
>> '-Dspark.driver.port=43354'
>>
>> -Dspark.yarn.app.container.log.dir=/opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03
>> org.apache.spark.executor.CoarseGrainedExecutorBackend
>> akka.tcp://sparkDriver@nn1:43354/user/CoarseGrainedScheduler 1 dn3 6
>> application_1437828324746_0002 1>
>>
>> /opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03/stdout
>> 2>
>>
>> /opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03/stderr
>>
>>
>> I diabled YARN's parameter "yarn.nodemanager.pmem-check-enabled" and
>> noticed
>> that physical memory usage went till 40 gigs
>>
>> I checked the total RSS in /proc/pid/smaps and it was same value as
>> physical
>> memory reported by Yarn and seen in top command.
>>
>> I checked that its not a problem with the heap but something is increasing
>> in off heap/ native memory. I used tools like Visual VM but didn't find
>> anything that's increasing there. MaxDirectMmeory also didn't exceed
>> 600MB.
>> Peak number of active threads was 70-80 and thread stack size didn't
>> exceed
>> 100MB. MetaspaceSize was around 60-70MB.
>>
>> FYI, I am on Spark 1.2 and Hadoop 2.4.0 and my spark application is based
>> on
>> Spark SQL and it's an HDFS