Spark SQL - Long running job
Hi All, I intend to build a long running spark application which fetches data/tuples from parquet, does some processing(time consuming) and then cache the processed table (InMemoryColumnarTableScan). My use case is good retrieval time for SQL query(benefits of Spark SQL optimizer) and data compression(in-built in in-memory caching). Now the problem is that if my driver goes down, I will have to fetch the data again for all the tables and compute it and cache which is time consuming. Is it possible to persist processed/cached RDDs on disk such that my system up time is less when restarted after failure/going down? On a side note, the data processing contains a shuffle step which creates huge temporary shuffle files on local disk in temp folder and as per current logic, shuffle files don't get deleted for running executors. This is leading to my local disk getting filled up quickly and going out of space as its a long running spark job. (running spark in yarn-client mode btw). Thanks -Nitin -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Long-running-job-tp10717.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Spark SQL - Long running job
I believe calling processedSchemaRdd.persist(DISK) and processedSchemaRdd.checkpoint() only persists data and I will lose all the RDD metadata and when I re-start my driver, that data is kind of useless for me (correct me if I am wrong). I thought of doing processedSchemaRdd.saveAsParquetFile (hdfs file system) but I fear that in case my "HDFS block size" > "partition file size", I will get more partitions when reading instead of original schemaRdd. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Long-running-job-tp10717p10727.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Does Spark delete shuffle files of lost executor in running system(on YARN)?
Hi All, I noticed that Spark doesn't delete local shuffle files of a lost executor in a running system(running in yarn-client mode). For long running system, this might fill up disk space in case of frequent executor failures. Can we delete these files when executor loss reported to driver? Thanks -Nitin -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Does-Spark-delete-shuffle-files-of-lost-executor-in-running-system-on-YARN-tp10755.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Continuous warning while consuming using new kafka-spark010 API
Hi All, I am using the new kafka-spark010 API to consume messages from Kafka (brokers running kafka 0.10.0.1). I am seeing continuous following warning only when producer is writing messages to kafka in parallel (increased spark.streaming.kafka.consumer.poll.ms to 1024 ms as well) :- 16/09/19 16:44:53 WARN TaskSetManager: Lost task 97.0 in stage 32.0 (TID 4942, host-3): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-example topic2 8 1052989 after polling for 1024 while at same time, I see this in spark UI corresponding to that job topic: topic2partition: 8offsets: 1051731 to 1066124 Code :- val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams.asScala) ) stream.foreachRDD {rdd => rdd.filter(_ => false).collect} Has anyone encountered this with the new API? Is this the expected behaviour or am I missing something here? -- Regards Nitin Goyal
Parquet-like partitioning support in spark SQL's in-memory columnar cache
Hi, Do we have any plan of supporting parquet-like partitioning support in Spark SQL in-memory cache? Something like one RDD[CachedBatch] per in-memory cache partition. -Nitin
Re: Parquet-like partitioning support in spark SQL's in-memory columnar cache
+Cheng Hi Reynold, I think you are referring to bucketing in in-memory columnar cache. I am proposing that if we have a parquet structure like following :- //file1/id=1/ //file1/id=2/ and if we read and cache it, it should create 2 RDD[CachedBatch] (each per value of "id") Is this what you were refering to originally? Thanks -Nitin On Fri, Nov 25, 2016 at 11:29 AM, Reynold Xin wrote: > It's already there isn't it? The in-memory columnar cache format. > > > On Thu, Nov 24, 2016 at 9:06 PM, Nitin Goyal > wrote: > >> Hi, >> >> Do we have any plan of supporting parquet-like partitioning support in >> Spark SQL in-memory cache? Something like one RDD[CachedBatch] per >> in-memory cache partition. >> >> >> -Nitin >> > > -- Regards Nitin Goyal
Guidance for becoming Spark contributor
Hi Spark Dev Team, I want to start contributing to Spark Open source. This is the first time I will be doing any open source contributions. It would be great if I can get some guidance on where I can start with. Thanks, - Nitin
ClosureCleaner slowing down Spark SQL queries
Hi All, I am running a SQL query (spark version 1.2) on a table created from unionAll of 3 schema RDDs which gets executed in roughly 400ms (200ms at driver and roughly 200ms at executors). If I run same query on a table created from unionAll of 27 schema RDDS, I see that executors time is same(because of concurrency and nature of my query) but driver time shoots to 600ms (and total query time being = 600 + 200 = 800ms). I attached JProfiler and found that ClosureCleaner clean method is taking time at driver(some issue related to URLClassLoader) and it linearly increases with number of RDDs being union-ed on which query is getting fired. This is causing my query to take a huge amount of time where I expect the query to be executed within 400ms irrespective of number of RDDs (since I have executors available to cater my need). PFB the links of screenshots from Jprofiler :- http://pasteboard.co/MnQtB4o.png http://pasteboard.co/MnrzHwJ.png Any help/suggestion to fix this will be highly appreciated since this needs to be fixed for production Thanks in Advance, Nitin -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/ClosureCleaner-slowing-down-Spark-SQL-queries-tp12466.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: ClosureCleaner slowing down Spark SQL queries
Hi Ted, Thanks a lot for replying. First of all, moving to 1.4.0 RC2 is not easy for us as migration cost is big since lot has changed in Spark SQL since 1.2. Regarding SPARK-7233, I had already looked at it few hours back and it solves the problem for concurrent queries but my problem is just for a single query. I also looked at the fix's code diff and it wasn't related to the problem which seems to exist in Closure Cleaner code. Thanks -Nitin -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/ClosureCleaner-slowing-down-Spark-SQL-queries-tp12466p12468.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: ClosureCleaner slowing down Spark SQL queries
Thanks Josh and Yin. Created following JIRA for the same :- https://issues.apache.org/jira/browse/SPARK-7970 Thanks -Nitin -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/ClosureCleaner-slowing-down-Spark-SQL-queries-tp12466p12515.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Ever increasing physical memory for a Spark Application in YARN
I am running a spark application in YARN having 2 executors with Xms/Xmx as 32 Gigs and spark.yarn.excutor.memoryOverhead as 6 gigs. I am seeing that the app's physical memory is ever increasing and finally gets killed by node manager 2015-07-25 15:07:05,354 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=10508,containerID=container_1437828324746_0002_01_03] is running beyond physical memory limits. Current usage: 38.0 GB of 38 GB physical memory used; 39.5 GB of 152 GB virtual memory used. Killing container. Dump of the process-tree for container_1437828324746_0002_01_03 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 10508 9563 10508 10508 (bash) 0 0 9433088 314 /bin/bash -c /usr/java/default/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms32768m -Xmx32768m -Dlog4j.configuration=log4j-executor.properties -XX:MetaspaceSize=512m -XX:+UseG1GC -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCDetails -Xloggc:gc.log -XX:AdaptiveSizePolicyOutputInterval=1 -XX:+UseGCLogFileRotation -XX:GCLogFileSize=500M -XX:NumberOfGCLogFiles=1 -XX:MaxDirectMemorySize=3500M -XX:NewRatio=3 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=36082 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:NativeMemoryTracking=detail -XX:ReservedCodeCacheSize=100M -XX:MaxMetaspaceSize=512m -XX:CompressedClassSpaceSize=256m -Djava.io.tmpdir=/data/yarn/datanode/nm-local-dir/usercache/admin/appcache/application_1437828324746_0002/container_1437828324746_0002_01_03/tmp '-Dspark.driver.port=43354' -Dspark.yarn.app.container.log.dir=/opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@nn1:43354/user/CoarseGrainedScheduler 1 dn3 6 application_1437828324746_0002 1> /opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03/stdout 2> /opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03/stderr I diabled YARN's parameter "yarn.nodemanager.pmem-check-enabled" and noticed that physical memory usage went till 40 gigs I checked the total RSS in /proc/pid/smaps and it was same value as physical memory reported by Yarn and seen in top command. I checked that its not a problem with the heap but something is increasing in off heap/ native memory. I used tools like Visual VM but didn't find anything that's increasing there. MaxDirectMmeory also didn't exceed 600MB. Peak number of active threads was 70-80 and thread stack size didn't exceed 100MB. MetaspaceSize was around 60-70MB. FYI, I am on Spark 1.2 and Hadoop 2.4.0 and my spark application is based on Spark SQL and it's an HDFS read/write intensive application and caches data in Spark SQL's in-memory caching Any help would be highly appreciated. Or any hint that where should I look to debug memory leak or if any tool already there. Let me know if any other information is needed. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Ever-increasing-physical-memory-for-a-Spark-Application-in-YARN-tp13446.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [ compress in-memory column storage used in sparksql cache table ]
I think spark sql's in-memory columnar cache already does compression. Check out classes in following path :- https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression Although compression ratio is not as good as Parquet. Thanks -Nitin -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/compress-in-memory-column-storage-used-in-sparksql-cache-table-tp13932p13937.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Operations with cached RDD
The problem is not that zipWithIndex is executed again. "groupBy" triggered hash partitioning on your keys and a shuffle happened due to that and that's why you are seeing 2 stages. You can confirm this by clicking on latter "zipWithIndex" stage and input data has "(memory)" written which means input data has been fetched from memory (your cached RDD). As far as lineage/call site is concerned, I think there was a change in spark 1.3 which excluded some classes from appearing in call site (I know that some Spark SQL related were removed for sure). Thanks -Nitin On Sat, Oct 10, 2015 at 5:05 AM, Ulanov, Alexander wrote: > Dear Spark developers, > > > > I am trying to understand how Spark UI displays operation with the cached > RDD. > > > > For example, the following code caches an rdd: > > >> val rdd = sc.parallelize(1 to 5, 5).zipWithIndex.cache > > >> rdd.count > > The Jobs tab shows me that the RDD is evaluated: > > : 1 count at :24 2015/10/09 16:15:430.4 > s 1/1 > > : 0 zipWithIndex at :21 2015/10/09 16:15:38 > 0.6 s 1/1 > > An I can observe this rdd in the Storage tab of Spark UI: > > : ZippedWithIndexRDD Memory Deserialized 1x Replicated > > > > Then I want to make an operation over the cached RDD. I run the following > code: > > >> val g = rdd.groupByKey() > > >> g.count > > The Jobs tab shows me a new Job: > > : 2 count at :26 > > Inside this Job there are two stages: > > : 3 count at :26 +details 2015/10/09 16:16:18 0.2 s > 5/5 > > : 2 zipWithIndex at :21 > > It shows that zipWithIndex is executed again. It does not seem to be > reasonable, because the rdd is cached, and zipWithIndex is already executed > previously. > > > > Could you explain why if I perform an operation followed by an action on a > cached RDD, then the last operation in the lineage of the cached RDD is > shown to be executed in the Spark UI? > > > > > > Best regards, Alexander > -- Regards Nitin Goyal
Re: want to contribute
You both can check out following links :- https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark http://spark.apache.org/docs/latest/building-spark.html Thanks -Nitin On Thu, Oct 29, 2015 at 4:13 PM, Aadi Thakar wrote: > Hello, my name is Aaditya Thakkar and I am a second year undergraduate ICT > student at DA-IICT, Gandhinagar, India. I have quite lately been interested > in contributing towards the open source organization and I find your > organization the most appropriate one. > > I request you to please guide me through how to install your code-base and > how to get started to your organization. > Thanking you, > Aaditya Thakkar. > -- Regards Nitin Goyal
Re: Running individual test classes
In maven, you might want to try following :- -DwildcardSuites=org.apache.spark.ml.ProbabilisticClassifierSuite On Tue, Nov 3, 2015 at 2:42 PM, Michael Armbrust wrote: > In SBT: > > build/sbt "mllib/test-only *ProbabilisticClassifierSuite" > > On Tue, Nov 3, 2015 at 9:27 AM, Stefano Baghino < > stefano.bagh...@radicalbit.io> wrote: > >> Hi all, >> >> I'm new to contributing to Spark (and Apache projects in general); I've >> started working on SPARK-7425 >> <https://issues.apache.org/jira/browse/SPARK-7425> and have implemented >> what looks like a viable solution. Now I'd like to test it, however I'm >> having some trouble running an individual test class to quickly iterate >> over it; I tried running >> >> mvn -Dtest=org.apache.spark.ml.ProbabilisticClassifierSuite test >> >> and (without the fully qualified class name) >> >> mvn -Dtest=ProbabilisticClassifierSuite test >> >> but both commands resulted in running all tests, both when launching >> Maven from the project root and from the MLlib module root. I've tried to >> look this up in the mailing list archives but haven't had luck so far. >> >> How can I run a single test suite? Thanks in advance! >> >> -- >> BR, >> Stefano Baghino >> > > -- Regards Nitin Goyal
Re: Secondary Indexing of RDDs?
Spar SQL's in-memory cache stores statistics per column which in turn is used to skip batches(default size 1) within partition https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala#L25 Hope this helps Thanks -Nitin On Tue, Dec 15, 2015 at 12:28 AM, Michael Segel wrote: > Hi, > > This may be a silly question… couldn’t find the answer on my own… > > I’m trying to find out if anyone has implemented secondary indexing on > Spark’s RDDs. > > If anyone could point me to some references, it would be helpful. > > I’ve seen some stuff on Succinct Spark (see: > https://amplab.cs.berkeley.edu/succinct-spark-queries-on-compressed-rdds/ > ) > but was more interested in integration with SparkSQL and SparkSQL support > for secondary indexing. > > Also the reason I’m posting this to the dev list is that there’s more to > this question … > > > Thx > > -Mike > >
Re: Ever increasing physical memory for a Spark Application in YARN
Hi Daniel, I could indeed discover the problem in my case and it turned out to be a bug at parquet side and I had raised and contributed to the following issue :- https://issues.apache.org/jira/browse/PARQUET-353 Hope this helps! Thanks -Nitin On Mon, May 2, 2016 at 9:15 PM, Daniel Darabos < daniel.dara...@lynxanalytics.com> wrote: > Hi Nitin, > Sorry for waking up this ancient thread. That's a fantastic set of JVM > flags! We just hit the same problem, but we haven't even discovered all > those flags for limiting memory growth. I wanted to ask if you ever > discovered anything further? > > I see you also set -XX:NewRatio=3. This is a very important flag since > Spark 1.6.0. With unified memory management with the default > spark.memory.fraction=0.75 the cache will fill up 75% of the heap. The > default NewRatio is 2, so the cache will not fit in the old generation > pool, constantly triggering full GCs. With NewRatio=3 the old generation > pool is 75% of the heap, so it (just) fits the cache. We find this makes a > very significant performance difference in practice. > > Perhaps this should be documented somewhere. Or the default > spark.memory.fraction should be 0.66, so that it works out with the default > JVM flags. > > On Mon, Jul 27, 2015 at 6:08 PM, Nitin Goyal > wrote: > >> I am running a spark application in YARN having 2 executors with Xms/Xmx >> as >> 32 Gigs and spark.yarn.excutor.memoryOverhead as 6 gigs. >> >> I am seeing that the app's physical memory is ever increasing and finally >> gets killed by node manager >> >> 2015-07-25 15:07:05,354 WARN >> >> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: >> Container [pid=10508,containerID=container_1437828324746_0002_01_03] >> is >> running beyond physical memory limits. Current usage: 38.0 GB of 38 GB >> physical memory used; 39.5 GB of 152 GB virtual memory used. Killing >> container. >> Dump of the process-tree for container_1437828324746_0002_01_03 : >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE >> |- 10508 9563 10508 10508 (bash) 0 0 9433088 314 /bin/bash -c >> /usr/java/default/bin/java -server -XX:OnOutOfMemoryError='kill %p' >> -Xms32768m -Xmx32768m -Dlog4j.configuration=log4j-executor.properties >> -XX:MetaspaceSize=512m -XX:+UseG1GC -XX:+PrintGCTimeStamps >> -XX:+PrintGCDateStamps -XX:+PrintGCDetails -Xloggc:gc.log >> -XX:AdaptiveSizePolicyOutputInterval=1 -XX:+UseGCLogFileRotation >> -XX:GCLogFileSize=500M -XX:NumberOfGCLogFiles=1 >> -XX:MaxDirectMemorySize=3500M -XX:NewRatio=3 >> -Dcom.sun.management.jmxremote >> -Dcom.sun.management.jmxremote.port=36082 >> -Dcom.sun.management.jmxremote.authenticate=false >> -Dcom.sun.management.jmxremote.ssl=false -XX:NativeMemoryTracking=detail >> -XX:ReservedCodeCacheSize=100M -XX:MaxMetaspaceSize=512m >> -XX:CompressedClassSpaceSize=256m >> >> -Djava.io.tmpdir=/data/yarn/datanode/nm-local-dir/usercache/admin/appcache/application_1437828324746_0002/container_1437828324746_0002_01_03/tmp >> '-Dspark.driver.port=43354' >> >> -Dspark.yarn.app.container.log.dir=/opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03 >> org.apache.spark.executor.CoarseGrainedExecutorBackend >> akka.tcp://sparkDriver@nn1:43354/user/CoarseGrainedScheduler 1 dn3 6 >> application_1437828324746_0002 1> >> >> /opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03/stdout >> 2> >> >> /opt/hadoop/logs/userlogs/application_1437828324746_0002/container_1437828324746_0002_01_03/stderr >> >> >> I diabled YARN's parameter "yarn.nodemanager.pmem-check-enabled" and >> noticed >> that physical memory usage went till 40 gigs >> >> I checked the total RSS in /proc/pid/smaps and it was same value as >> physical >> memory reported by Yarn and seen in top command. >> >> I checked that its not a problem with the heap but something is increasing >> in off heap/ native memory. I used tools like Visual VM but didn't find >> anything that's increasing there. MaxDirectMmeory also didn't exceed >> 600MB. >> Peak number of active threads was 70-80 and thread stack size didn't >> exceed >> 100MB. MetaspaceSize was around 60-70MB. >> >> FYI, I am on Spark 1.2 and Hadoop 2.4.0 and my spark application is based >> on >> Spark SQL and it's an HDFS