ROSE: Spark + R on the JVM.
Hi all, I'd like to share news of the recent release of a new Spark package, [ROSE](http://spark-packages.org/package/onetapbeyond/opencpu-spark-executor). ROSE is a Scala library offering access to the full scientific computing power of the R programming language to Apache Spark batch and streaming applications on the JVM. Where Apache SparkR lets data scientists use Spark from R, ROSE is designed to let Scala and Java developers use R from Spark. The project is available and documented [on GitHub](https://github.com/onetapbeyond/opencpu-spark-executor) and I would encourage you to [take a look](https://github.com/onetapbeyond/opencpu-spark-executor). Any feedback, questions etc very welcome. David "All that is gold does not glitter, Not all those who wander are lost."
saveAsTextFile hangs with hdfs
I have a simple spark job that seems to hang when saving to hdfs. When looking at the spark web ui, the job reached 97 of 100 tasks completed. I need some help determining why the job appears to hang. The job hangs on the "saveAsTextFile()" call. https://www.dropbox.com/s/fdp7ck91hhm9w68/Screenshot%202014-08-19%2010.53.24.png The job is pretty simple: JavaRDD analyticsLogs = context .textFile(Joiner.on(",").join(hdfs.glob("/spark-dfs", ".*\\.log$")), 4); JavaRDD flyweights = analyticsLogs .map(line -> { try { AnalyticsLog log = GSON.fromJson(line, AnalyticsLog.class); AnalyticsLogFlyweight flyweight = new AnalyticsLogFlyweight(); flyweight.ipAddress = log.getIpAddress(); flyweight.time = log.getTime(); flyweight.trackingId = log.getTrackingId(); return flyweight; } catch (Exception e) { LOG.error("error parsing json", e); return null; } }); JavaRDD filtered = flyweights .filter(log -> log != null); JavaPairRDD partitioned = filtered .mapToPair((AnalyticsLogFlyweight log) -> new Tuple2<>(log.trackingId, log)) .partitionBy(new HashPartitioner(100)).cache(); Ordering ordering = Ordering.natural().nullsFirst().onResultOf(new Function() { public Long apply(AnalyticsLogFlyweight log) { return log.time; } }); JavaPairRDD> stringIterableJavaPairRDD = partitioned.groupByKey(); JavaPairRDD stringIntegerJavaPairRDD = stringIterableJavaPairRDD.mapToPair((log) -> { List sorted = Lists.newArrayList(log._2()); sorted.forEach(l -> LOG.info("sorted {}", l)); return new Tuple2<>(log._1(), sorted.size()); }); String outputPath = "/summarized/groupedByTrackingId4"; hdfs.rm(outputPath, true); stringIntegerJavaPairRDD.saveAsTextFile(String.format("%s/%s", hdfs.getUrl(), outputPath)); Thanks in advance, David
sortByKey trouble
Hi, Does anybody know how to use sortbykey in scala on a RDD like : val rddToSave = file.map(l => l.split("\\|")).map(r => (r(34)+"-"+r(3), r(4), r(10), r(12))) besauce, i received ann error "sortByKey is not a member of ord.apache.spark.rdd.RDD[(String,String,String,String)]. What i try do do is sort on the first element. Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-trouble-tp14989.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sortByKey trouble
thank's i've already try this solution but it does not compile (in Eclipse) I'm surprise to see that in Spark-shell, sortByKey works fine on 2 solutions : (String,String,String,String) (String,(String,String,String)) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-trouble-tp14989p15002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
foreachPartition: write to multiple files
Hi, I want to write my RDDs to multiples files based on a key value. So, i used groupByKey and iterate over partitions. Here is a the code : rdd.map(f => (f.substring(0,4), f)).groupByKey().foreachPartition(iterator => iterator.map { case (key, values) => val fs: FileSystem = FileSystem.get(new Configuration()) val outputFile = fs.create(new Path("/my_path/" + key + ".txt")) values.foreach { x => outputFile.write(x.getBytes()) } outputFile.close() } ) I don't see any error on the spark-shell LOG. But, no file is written. Does any body know where i missed something ? thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/foreachPartition-write-to-multiple-files-tp15925.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: foreachPartition: write to multiple files
Hi, I finally found a solution after reading the post : http://apache-spark-user-list.1001560.n3.nabble.com/how-to-split-RDD-by-key-and-save-to-different-path-td11887.html#a11983 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/foreachPartition-write-to-multiple-files-tp15925p15937.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Key-Value decomposition
Hi, I'm a newbie in Spark and faces the following use case : val data = Array ( "A", "1;2;3") val rdd = sc.parallelize(data) // Something here to produce RDD of (Key,value) // ( "A", "1") , ("A", "2"), ("A", "3) Does anybody know how to do ? Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decomposition-tp17966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Key-Value decomposition
Hi, But i've only one RDD. Hre is a more complete exemple : my rdd is something like ("A", "1;2;3"), ("B", "2;5;6"), ("C", "3;2;1") And i expect to have the following result : ("A",1) , ("A",2) , ("A",3) , ("B",2) , ("B",5) , ("B",6) , ("C",3) , ("C",2) , ("C",1) Any idea about how can i achieve this ? Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decomposition-tp17966p18036.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Key-Value decomposition
Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decomposition-tp17966p18050.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL (1.0)
Hi, I build 2 tables from files. Table F1 join with table F2 on c5=d4. F1 has 46730613 rows F2 has 3386740 rows All keys d4 exists in F1.c5, so i expect to retrieve 46730613 rows. But it returns only 3437 rows // --- begin code --- val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD val rddFile = sc.textFile("hdfs://referential/F1/part-*") case class F1(c1:String, c2:String,c3:Double, c3:String, c5: String) val stkrdd = rddFile.map(x => x.split("|")).map(f => F1(f(44),f(3),f(10).toDouble, "",f(2))) stkrdd.registerAsTable("F1") sqlContext.cacheTable("F1") val prdfile = sc.textFile("hdfs://referential/F2/part-*") case class F2(d1: String, d2:String, d3:String,d4:String) val productrdd = prdfile.map(x => x.split("|")).map(f => F2(f(0),f(2),f(101),f(3))) productrdd.registerAsTable("F2") sqlContext.cacheTable("F2") val resrdd = sqlContext.sql("Select count(*) from F1, F2 where F1.c5 = F2.d4 ").collect() // --- end of code --- Does anybody know what i missed ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-tp19651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Join returns less rows that expected
Hi, I have 2 files which come from csv import of 2 Oracle tables. F1 has 46730613 rows F2 has 3386740 rows I build 2 tables with spark. Table F1 join with table F2 on c1=d1. All keys F2.d1 exists in F1.c1, so i expect to retrieve 46730613 rows. But it returns only 3437 rows // --- begin code --- val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD val rddFile = sc.textFile("hdfs://referential/F1/part-*") case class F1(c1:String, c2:String,c3:Double, c3:String, c5: String) val stkrdd = rddFile.map(x => x.split("|")).map(f => F1(f(44),f(3),f(10).toDouble, "",f(2))) stkrdd.registerAsTable("F1") sqlContext.cacheTable("F1") val prdfile = sc.textFile("hdfs://referential/F2/part-*") case class F2(d1: String, d2:String, d3:String,d4:String) val productrdd = prdfile.map(x => x.split("|")).map(f => F2(f(0),f(2),f(101),f(3))) productrdd.registerAsTable("F2") sqlContext.cacheTable("F2") val resrdd = sqlContext.sql("Select count(*) from F1, F2 where F1.c1 = F2.d1 ").count() // --- end of code --- Does anybody know what i missed ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-returns-less-rows-that-expected-tp19731.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark streaming kafa best practices ?
hi, What is the bet way to process a batch window in SparkStreaming : kafkaStream.foreachRDD(rdd => { rdd.collect().foreach(event => { // process the event process(event) }) }) Or kafkaStream.foreachRDD(rdd => { rdd.map(event => { // process the event process(event) }).collect() }) thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafa-best-practices-tp20470.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark steaming : work with collect() but not without collect()
Hi, We use the following Spark Streaming code to collect and process Kafka event : kafkaStream.foreachRDD(rdd => { rdd.collect().foreach(event => { process(event._1, event._2) }) }) This work fine. But without /collect()/ function, the following exception is raised for call to function process: *Loss was due to java.lang.ExceptionInInitializerError* We attempt to rewrite like this but the same exception is raised : kafkaStream.foreachRDD(rdd => { rdd.foreachPartition(iter => iter.foreach (event => { process(event._1, event._2) }) ) }) Does anybody can explain to us why and how to solve this issue ? Thank's Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-steaming-work-with-collect-but-not-without-collect-tp20622.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Job is not able to perform Broadcast Join
After adding the sequential ids you might need a repartition? I've found using monotically increasing id before that the df goes to a single partition. Usually becomes clear in the spark ui though On Tue, 6 Oct 2020, 20:38 Sachit Murarka, wrote: > Yes, Even I tried the same first. Then I moved to join method because > shuffle spill was happening because row num without partition happens on > single task. Instead of processinf entire dataframe on single task. I have > broken down that into df1 and df2 and joining. > Because df2 is having very less data set since it has 2 cols only. > > Thanks > Sachit > > On Wed, 7 Oct 2020, 01:04 Eve Liao, wrote: > >> Try to avoid broadcast. Thought this: >> https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6 >> could be helpful. >> >> On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka >> wrote: >> >>> Thanks Eve for response. >>> >>> Yes I know we can use broadcast for smaller datasets,I increased the >>> threshold (4Gb) for the same then also it did not work. and the df3 is >>> somewhat greater than 2gb. >>> >>> Trying by removing broadcast as well.. Job is running since 1 hour. Will >>> let you know. >>> >>> >>> Thanks >>> Sachit >>> >>> On Wed, 7 Oct 2020, 00:41 Eve Liao, wrote: >>> How many rows does df3 have? Broadcast joins are a great way to append data stored in relatively *small* single source of truth data files to large DataFrames. DataFrames up to 2GB can be broadcasted so a data file with tens or even hundreds of thousands of rows is a broadcast candidate. Your broadcast variable is probably too large. On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka wrote: > Hello Users, > > I am facing an issue in spark job where I am doing row number() > without partition by clause because I need to add sequential increasing > IDs. > But to avoid the large spill I am not doing row number() over the > complete data frame. > > Instead I am applying monotically_increasing id on actual data set , > then create a new data frame from original data frame which will have > just monotically_increasing id. > > So DF1 = All columns + monotically_increasing_id > DF2 = Monotically_increasingID > > Now I am applying row number() on DF2 since this is a smaller > dataframe. > > DF3 = Monotically_increasingID + Row_Number_ID > > Df.join(broadcast(DF3)) > > This will give me sequential increment id in the original Dataframe. > > But below is the stack trace. > > py4j.protocol.Py4JJavaError: An error occurred while calling > o180.parquet. > : org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:28
Spark 3.0.1 Structured streaming - checkpoints fail
Hello, I have an issue with my Pyspark job related to checkpoint. Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4): java.lang.IllegalStateException: Error reading delta file file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of HDFSStateStoreProvider[id = (op=0,part=3),dir = file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta does not exist* This job is based on Spark 3.0.1 and Structured Streaming This Spark cluster (1 driver and 6 executors) works without hdfs. And we don't want to manage an hdfs cluster if possible. Is it necessary to have a distributed filesystem ? What are the different solutions/workarounds ? Thanks in advance David
Re: Spark 3.0.1 Structured streaming - checkpoints fail
Thanks. My Spark applications run on nodes based on docker images but this is a standalone mode (1 driver - n workers) Can we use S3 directly with consistency addon like s3guard (s3a) or AWS Consistent view <https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html> ? Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh a écrit : > Yes. It is necessary to have a distributed file system because all the > workers need to read/write to the checkpoint. The distributed file system > has to be immediately consistent: When one node writes to it, the other > nodes should be able to read it immediately > > The solutions/workarounds depend on where you are hosting your Spark > application. > > > > *From: *David Morin > *Date: *Wednesday, December 23, 2020 at 11:08 AM > *To: *"user@spark.apache.org" > *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints fail > > > > *CAUTION*: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > Hello, > > > > I have an issue with my Pyspark job related to checkpoint. > > > > Caused by: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost > task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4): > java.lang.IllegalStateException: Error reading delta file > file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of > HDFSStateStoreProvider[id = (op=0,part=3),dir = > file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: > *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta > does not exist* > > > > This job is based on Spark 3.0.1 and Structured Streaming > > This Spark cluster (1 driver and 6 executors) works without hdfs. And we > don't want to manage an hdfs cluster if possible. > > Is it necessary to have a distributed filesystem ? What are the different > solutions/workarounds ? > > > > Thanks in advance > > David >
Re: Spark 3.0.1 Structured streaming - checkpoints fail
Does it work with the standard AWS S3 solution and its new consistency model <https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/> ? Le mer. 23 déc. 2020 à 18:48, David Morin a écrit : > Thanks. > My Spark applications run on nodes based on docker images but this is a > standalone mode (1 driver - n workers) > Can we use S3 directly with consistency addon like s3guard (s3a) or AWS > Consistent view > <https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html> > ? > > Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh a > écrit : > >> Yes. It is necessary to have a distributed file system because all the >> workers need to read/write to the checkpoint. The distributed file system >> has to be immediately consistent: When one node writes to it, the other >> nodes should be able to read it immediately >> >> The solutions/workarounds depend on where you are hosting your Spark >> application. >> >> >> >> *From: *David Morin >> *Date: *Wednesday, December 23, 2020 at 11:08 AM >> *To: *"user@spark.apache.org" >> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints fail >> >> >> >> *CAUTION*: This email originated from outside of the organization. Do >> not click links or open attachments unless you can confirm the sender and >> know the content is safe. >> >> >> >> Hello, >> >> >> >> I have an issue with my Pyspark job related to checkpoint. >> >> >> >> Caused by: org.apache.spark.SparkException: Job aborted due to stage >> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost >> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4): >> java.lang.IllegalStateException: Error reading delta file >> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of >> HDFSStateStoreProvider[id = (op=0,part=3),dir = >> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: >> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta >> does not exist* >> >> >> >> This job is based on Spark 3.0.1 and Structured Streaming >> >> This Spark cluster (1 driver and 6 executors) works without hdfs. And we >> don't want to manage an hdfs cluster if possible. >> >> Is it necessary to have a distributed filesystem ? What are the different >> solutions/workarounds ? >> >> >> >> Thanks in advance >> >> David >> >
Re: Spark 3.0.1 Structured streaming - checkpoints fail
Thanks Jungtaek Ok I got it. I'll test it and check if the loss of efficiency is acceptable. Le mer. 23 déc. 2020 à 23:29, Jungtaek Lim a écrit : > Please refer my previous answer - > https://lists.apache.org/thread.html/r7dfc9e47cd9651fb974f97dde756013fd0b90e49d4f6382d7a3d68f7%40%3Cuser.spark.apache.org%3E > Probably we may want to add it in the SS guide doc. We didn't need it as > it just didn't work with eventually consistent model, and now it works > anyway but is very inefficient. > > > On Thu, Dec 24, 2020 at 6:16 AM David Morin > wrote: > >> Does it work with the standard AWS S3 solution and its new >> consistency model >> <https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/> >> ? >> >> Le mer. 23 déc. 2020 à 18:48, David Morin a >> écrit : >> >>> Thanks. >>> My Spark applications run on nodes based on docker images but this is a >>> standalone mode (1 driver - n workers) >>> Can we use S3 directly with consistency addon like s3guard (s3a) or AWS >>> Consistent view >>> <https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html> >>> ? >>> >>> Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh a >>> écrit : >>> >>>> Yes. It is necessary to have a distributed file system because all the >>>> workers need to read/write to the checkpoint. The distributed file system >>>> has to be immediately consistent: When one node writes to it, the other >>>> nodes should be able to read it immediately >>>> >>>> The solutions/workarounds depend on where you are hosting your Spark >>>> application. >>>> >>>> >>>> >>>> *From: *David Morin >>>> *Date: *Wednesday, December 23, 2020 at 11:08 AM >>>> *To: *"user@spark.apache.org" >>>> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints >>>> fail >>>> >>>> >>>> >>>> *CAUTION*: This email originated from outside of the organization. Do >>>> not click links or open attachments unless you can confirm the sender and >>>> know the content is safe. >>>> >>>> >>>> >>>> Hello, >>>> >>>> >>>> >>>> I have an issue with my Pyspark job related to checkpoint. >>>> >>>> >>>> >>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage >>>> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost >>>> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4): >>>> java.lang.IllegalStateException: Error reading delta file >>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of >>>> HDFSStateStoreProvider[id = (op=0,part=3),dir = >>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: >>>> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta >>>> does not exist* >>>> >>>> >>>> >>>> This job is based on Spark 3.0.1 and Structured Streaming >>>> >>>> This Spark cluster (1 driver and 6 executors) works without hdfs. And >>>> we don't want to manage an hdfs cluster if possible. >>>> >>>> Is it necessary to have a distributed filesystem ? What are the >>>> different solutions/workarounds ? >>>> >>>> >>>> >>>> Thanks in advance >>>> >>>> David >>>> >>>
S3a Committer
Hi, I have some issues at the moment with S3 API of Openstack Swift (S3a). This one is eventually consistent and it causes lots of issues with my distributed jobs in Spark. Is the S3A committer able to fix that ? Or an "S3guard like" implementation is the only way ? David
Re: S3a Committer
Yes, that's true but this is not (yet) the case of the Openstack Swift S3 API Le mar. 2 févr. 2021 à 21:41, Henoc a écrit : > S3 is strongly consistent now > https://aws.amazon.com/s3/consistency/ > > Regards, > Henoc > > On Tue, Feb 2, 2021, 10:27 PM David Morin > wrote: > >> Hi, >> >> I have some issues at the moment with S3 API of Openstack Swift (S3a). >> This one is eventually consistent and it causes lots of issues with my >> distributed jobs in Spark. >> Is the S3A committer able to fix that ? Or an "S3guard like" >> implementation is the only way ? >> >> David >> >
Missing stack function from SQL functions API
I noticed that the stack SQL function is missing from the functions API. Could we add it?
Re: Performance of PySpark jobs on the Kubernetes cluster
Hi Mich, I don't quite understand why the driver node is using so much CPU, but it may be unrelated to your executors being underused. About your executors being underused, I would check that your job generated enough tasks. Then I would check spark.executor.cores and spark.tasks.cpus parameters to see if I can give more work to the executors. Cheers, David Le mar. 10 août 2021 à 12:20, Khalid Mammadov a écrit : > Hi Mich > > I think you need to check your code. > If code does not use PySpark API effectively you may get this. I.e. if you > use pure Python/pandas api rather than Pyspark i.e. > transform->transform->action. e.g df.select(..).withColumn(...)...count() > > Hope this helps to put you on right direction. > > Cheers > Khalid > > > > > On Mon, 9 Aug 2021, 20:20 Mich Talebzadeh, > wrote: > >> Hi, >> >> I have a basic question to ask. >> >> I am running a Google k8s cluster (AKA GKE) with three nodes each having >> configuration below >> >> e2-standard-2 (2 vCPUs, 8 GB memory) >> >> >> spark-submit is launched from another node (actually a data proc single >> node that I have just upgraded to e2-custom (4 vCPUs, 8 GB mem). We call >> this the launch node >> >> OK I know that the cluster is not much but Google was complaining about >> the launch node hitting 100% cpus. So I added two more cpus to it. >> >> It appears that despite using k8s as the computational cluster, the >> burden falls upon the launch node! >> >> The cpu utilisation for launch node shown below >> >> [image: image.png] >> The dip is when 2 more cpus were added to it so it had to reboot. so >> around %70 usage >> >> The combined cpu usage for GKE nodes is shown below: >> >> [image: image.png] >> >> Never goes above 20%! >> >> I can see that the drive and executors as below: >> >> k get pods -n spark >> NAME READY STATUSRESTARTS >> AGE >> pytest-c958c97b2c52b6ed-driver 1/1 Running 0 >> 69s >> randomdatabigquery-e68a8a7b2c52f468-exec-1 1/1 Running 0 >> 51s >> randomdatabigquery-e68a8a7b2c52f468-exec-2 1/1 Running 0 >> 51s >> randomdatabigquery-e68a8a7b2c52f468-exec-3 0/1 Pending 0 >> 51s >> >> It is a PySpark 3.1.1 image using java 8 and pushing random data >> generated into Google BigQuery data warehouse. The last executor (exec-3) >> seems to be just pending. The spark-submit is as below: >> >> spark-submit --verbose \ >>--properties-file ${property_file} \ >>--master k8s://https://$KUBERNETES_MASTER_IP:443 \ >>--deploy-mode cluster \ >>--name pytest \ >>--conf >> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \ >>--py-files $CODE_DIRECTORY/DSBQ.zip \ >>--conf spark.kubernetes.namespace=$NAMESPACE \ >>--conf spark.executor.memory=5000m \ >>--conf spark.network.timeout=300 \ >>--conf spark.executor.instances=3 \ >>--conf spark.kubernetes.driver.limit.cores=1 \ >>--conf spark.driver.cores=1 \ >>--conf spark.executor.cores=1 \ >>--conf spark.executor.memory=2000m \ >>--conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \ >>--conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \ >>--conf spark.kubernetes.container.image=${IMAGEGCP} \ >>--conf >> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \ >>--conf >> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ >>--conf >> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >> \ >>--conf spark.sql.execution.arrow.pyspark.enabled="true" \ >>$CODE_DIRECTORY/${APPLICATION} >> >> Aren't the driver and executors running on K8s cluster? So why is the >> launch node heavily used but k8s cluster is underutilized? >> >> Thanks >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >
Trying to hash cross features with mllib
Hello everyone, In MLLib, I’m trying to rely essentially on pipelines to create features out of the Titanic dataset, and show-case the power of feature hashing. I want to: - Apply bucketization on some columns (QuantileDiscretizer is fine) - Then I want to cross all my columns with each other to have cross features. - Then I would like to hash all of these cross features into a vector. - Then give it to a logistic regression. Looking at the documentation, it looks like the only way to hash features is the *FeatureHasher* transformation. It takes multiple columns as input, type can be numeric, bool, string (but no vector/array). But now I’m left wondering how I can create my cross-feature columns. I’m looking at a transformation that could take two columns as input, and return a numeric, bool, or string. I didn't manage to find anything that does the job. There are multiple transformations such as VectorAssembler, that operate on vector, but this is not a typeaccepted by the FeatureHasher. Of course, I could try to combine columns directly in my dataframe (before the pipeline kicks-in), but then I would not be able to benefit any more from QuantileDiscretizer and other cool functions. Am I missing something in the transformation api ? Or is my approach to hashing wrong ? Or should we consider to extend the api somehow ? Thank you, kind regards, David
Re: Trying to hash cross features with mllib
Hello Sean, Thank you for the heads-up ! Interaction transform won't help for my use case as it returns a vector that I won't be able to hash. I will definitely dig further into custom transformations though. Thanks ! David Le ven. 1 oct. 2021 à 15:49, Sean Owen a écrit : > Are you looking for > https://spark.apache.org/docs/latest/ml-features.html#interaction ? > That's the closest built in thing I can think of. Otherwise you can make > custom transformations. > > On Fri, Oct 1, 2021, 8:44 AM David Diebold > wrote: > >> Hello everyone, >> >> In MLLib, I’m trying to rely essentially on pipelines to create features >> out of the Titanic dataset, and show-case the power of feature hashing. I >> want to: >> >> - Apply bucketization on some columns (QuantileDiscretizer is >> fine) >> >> - Then I want to cross all my columns with each other to have >> cross features. >> >> - Then I would like to hash all of these cross features into a >> vector. >> >> - Then give it to a logistic regression. >> >> Looking at the documentation, it looks like the only way to hash features >> is the *FeatureHasher* transformation. It takes multiple columns as >> input, type can be numeric, bool, string (but no vector/array). >> >> But now I’m left wondering how I can create my cross-feature columns. I’m >> looking at a transformation that could take two columns as input, and >> return a numeric, bool, or string. I didn't manage to find anything that >> does the job. There are multiple transformations such as VectorAssembler, >> that operate on vector, but this is not a typeaccepted by the FeatureHasher. >> >> Of course, I could try to combine columns directly in my dataframe >> (before the pipeline kicks-in), but then I would not be able to benefit any >> more from QuantileDiscretizer and other cool functions. >> >> >> Am I missing something in the transformation api ? Or is my approach to >> hashing wrong ? Or should we consider to extend the api somehow ? >> >> >> >> Thank you, kind regards, >> >> David >> >
question about data skew and memory issues
Hello all, I was wondering if it possible to encounter out of memory exceptions on spark executors when doing some aggregation, when a dataset is skewed. Let's say we have a dataset with two columns: - key : int - value : float And I want to aggregate values by key. Let's say that we have a tons of key equal to 0. Is it possible to encounter some out of memory exception after the shuffle ? My expectation would be that the executor responsible of aggregating the '0' partition could indeed have some oom exception if it tries to put all the files of this partition in memory before processing them. But why would it need to put them in memory when doing in aggregation ? It looks to me that aggregation can be performed in a stream fashion, so I would not expect any oom at all.. Thank you in advance for your lights :) David
Re: Pyspark debugging best practices
Hello Andy, Are you sure you want to perform lots of join operations, and not simple unions ? Are you doing inner joins or outer joins ? Can you provide us with a rough amount of your list size plus each individual dataset size ? Have a look at execution plan would help, maybe the high amount of join operations makes execution plan too complicated at the end of the day ; checkpointing could help there ? Cheers, David Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson a écrit : > Hi Gourav > > I will give databricks a try. > > Each data gets loaded into a data frame. > I select one column from the data frame > I join the column to the accumulated joins from previous data frames in > the list > > To debug. I think am gaining to put an action and log statement after each > join. I do not think it will change the performance. I believe the physical > plan will be the same how ever hopefully it will shed some light. > > At the very least I will know if it making progress or not. And hopefully > where it is breaking > > Happy new year > > Andy > > On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta > wrote: > >> Hi Andrew, >> >> Any chance you might give Databricks a try in GCP? >> >> The above transformations look complicated to me, why are you adding >> dataframes to a list? >> >> >> Regards, >> Gourav Sengupta >> >> >> >> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson >> wrote: >> >>> Hi >>> >>> >>> >>> I am having trouble debugging my driver. It runs correctly on smaller >>> data set but fails on large ones. It is very hard to figure out what the >>> bug is. I suspect it may have something do with the way spark is installed >>> and configured. I am using google cloud platform dataproc pyspark >>> >>> >>> >>> The log messages are not helpful. The error message will be something >>> like >>> "User application exited with status 1" >>> >>> >>> >>> And >>> >>> >>> >>> jsonPayload: { >>> >>> class: "server.TThreadPoolServer" >>> >>> filename: "hive-server2.log" >>> >>> message: "Error occurred during processing of message." >>> >>> thread: "HiveServer2-Handler-Pool: Thread-40" >>> >>> } >>> >>> >>> >>> I am able to access the spark history server however it does not capture >>> anything if the driver crashes. I am unable to figure out how to access >>> spark web UI. >>> >>> >>> >>> My driver program looks something like the pseudo code bellow. A long >>> list of transforms with a single action, (i.e. write) at the end. Adding >>> log messages is not helpful because of lazy evaluations. I am tempted to >>> add something like >>> >>> >>> >>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and >>> inline some sort of diagnostic message. >>> >>> >>> >>> What do you think? >>> >>> >>> >>> Is there a better way to debug this? >>> >>> >>> >>> Kind regards >>> >>> >>> >>> Andy >>> >>> >>> >>> def run(): >>> >>> listOfDF = [] >>> >>> for filePath in listOfFiles: >>> >>> df = spark.read.load( filePath, ...) >>> >>> listOfDF.append(df) >>> >>> >>> >>> >>> >>> list2OfDF = [] >>> >>> for df in listOfDF: >>> >>> df2 = df.select( ) >>> >>> lsit2OfDF.append( df2 ) >>> >>> >>> >>> # will setting list to None free cache? >>> >>> # or just driver memory >>> >>> listOfDF = None >>> >>> >>> >>> >>> >>> df3 = list2OfDF[0] >>> >>> >>> >>> for i in range( 1, len(list2OfDF) ): >>> >>> df = list2OfDF[i] >>> >>> df3 = df3.join(df ...) >>> >>> >>> >>> # will setting to list to None free cache? >>> >>> # or just driver memory >>> >>> List2OfDF = None >>> >>> >>> >>> >>> >>> lots of narrow transformations on d3 >>> >>> >>> >>> return df3 >>> >>> >>> >>> def main() : >>> >>> df = run() >>> >>> df.write() >>> >>> >>> >>> >>> >>> >>> >>
Re: groupMapReduce
Hello, In RDD api, you must be looking for reduceByKey. Cheers Le ven. 14 janv. 2022 à 11:56, frakass a écrit : > Is there a RDD API which is similar to Scala's groupMapReduce? > https://blog.genuine.com/2019/11/scalas-groupmap-and-groupmapreduce/ > > Thank you. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Question about spark.sql min_by
Hello all, I'm trying to use the spark.sql min_by aggregation function with pyspark. I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2 I have a dataframe made of these columns: - productId : int - sellerId : int - price : double For each product, I want to get the seller who sells the product for the cheapest price. Naive approach would be to do this, but I would expect two shuffles: import spark.sql.functions as F cheapest_prices_df = df.groupby('productId').agg(F.min('price').alias('price')) cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId', 'price']) I would had loved to do this instead : import spark.sql.functions as F cheapest_sellers_df = df.groupby('productId').agg(F.min('price'), F.min_by('sellerId', 'price')) Unfortunately min_by does not seem available in pyspark sql functions, whereas I can see it in the doc : https://spark.apache.org/docs/latest/api/sql/index.html I have managed to use min_by with this approach but it looks slow (maybe because of temp table creation ?): df.createOrReplaceTempView("table") cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId, min(price) from table group by productId") Is there a way I can rely on min_by directly in groupby ? Is there some code missing in pyspark wrapper to make min_by visible somehow ? Thank you in advance for your help. Cheers David
Re: Question about spark.sql min_by
Thank you for your answers. Indeed windowing should help there. Also, I just realized maybe I can try to create a struct column with both price and sellerId and apply min() on it, ordering would consider price first for the ordering (https://stackoverflow.com/a/52669177/2015762) Cheers! Le lun. 21 févr. 2022 à 16:12, ayan guha a écrit : > Why this can not be done by window function? Or is min by is just a short > hand? > > On Tue, 22 Feb 2022 at 12:42 am, Sean Owen wrote: > >> From the source code, looks like this function was added to pyspark in >> Spark 3.3, up for release soon. It exists in SQL. You can still use it in >> SQL with `spark.sql(...)` in Python though, not hard. >> >> On Mon, Feb 21, 2022 at 4:01 AM David Diebold >> wrote: >> >>> Hello all, >>> >>> I'm trying to use the spark.sql min_by aggregation function with pyspark. >>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2 >>> >>> I have a dataframe made of these columns: >>> - productId : int >>> - sellerId : int >>> - price : double >>> >>> For each product, I want to get the seller who sells the product for the >>> cheapest price. >>> >>> Naive approach would be to do this, but I would expect two shuffles: >>> >>> import spark.sql.functions as F >>> cheapest_prices_df = >>> df.groupby('productId').agg(F.min('price').alias('price')) >>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId', >>> 'price']) >>> >>> I would had loved to do this instead : >>> >>> import spark.sql.functions as F >>> cheapest_sellers_df = df.groupby('productId').agg(F.min('price'), >>> F.min_by('sellerId', 'price')) >>> >>> Unfortunately min_by does not seem available in pyspark sql functions, >>> whereas I can see it in the doc : >>> https://spark.apache.org/docs/latest/api/sql/index.html >>> >>> I have managed to use min_by with this approach but it looks slow (maybe >>> because of temp table creation ?): >>> >>> df.createOrReplaceTempView("table") >>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) >>> sellerId, min(price) from table group by productId") >>> >>> Is there a way I can rely on min_by directly in groupby ? >>> Is there some code missing in pyspark wrapper to make min_by visible >>> somehow ? >>> >>> Thank you in advance for your help. >>> >>> Cheers >>> David >>> >> -- > Best Regards, > Ayan Guha >
Question about bucketing and custom partitioners
Hello, I have a few questions related to bucketing and custom partitioning in dataframe api. I am considering bucketing to perform one-side free shuffle join in incremental jobs, but there is one thing that I'm not happy with. Data is likely to grow/skew over time. At some point, i would need to change amount of buckets which would provoke shuffle. Instead of this, I would like to use a custom partitioner, that would replace shuffle by narrow transformation. That is something that was feasible with RDD developer api. For example, I could use such partitioning scheme: partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) / (Int.maxValue - Int.minValue) When I multiply amount of partitions by 2 each new partition depends only on one partition from parent (=> narrow transformation) So, here are my questions : 1/ Is it possible to use custom partitioner when saving a dataframe with bucketing ? 2/ Still with the API dataframe, is it possible to apply custom partitioner to a dataframe ? Is it possible to repartition the dataframe with a narrow transformation like what could be done with RDD ? Is there some sort of dataframe developer API ? Do you have any pointers on this ? Thanks ! David
Writing protobuf RDD to parquet
Hello, I'm trying to write to parquet some RDD[T] where T is a protobuf message, in scala. I am wondering what is the best option to do this, and I would be interested by your lights. So far, I see two possibilities: - use PairRDD method *saveAsNewAPIHadoopFile*, and I guess I need to call *ParquetOutputFormat.setWriteSupportClass *and *ProtoParquetOutputFormat.setProtobufClass *before. But in that case, I'm not sure I have much control on how to partition files in different folders on file system. - or convert the RDD to dataframe then use *write.parquet ; *in that case, I have more control, in case rely on *partitionBy *to arrange the files in different folders. But I'm not sure there is some built-in way to convert rdd of protobuf to dataframe in spark ? I would need to rely on this : https://github.com/saurfang/sparksql-protobuf. What do you think ? Kind regards, David
RDD boundaries and triggering processing using tags in the data
Hi All, I'm new to Spark and I'd like some help understanding if a particular use case would be a good fit for Spark Streaming. I have an imaginary stream of sensor data consisting of integers 1-10. Every time the sensor reads "10" I'd like to average all the numbers that were received since the last "10" example input: 10 5 8 4 6 2 1 2 8 8 8 1 6 9 1 3 10 1 3 10 ... desired output: 4.8, 2.0 I'm confused about what happens if sensor readings fall into different RDDs. RDD1: 10 5 8 4 6 2 1 2 8 8 8 RDD2: 1 6 9 1 3 10 1 3 10 output: ???, 2.0 My imaginary sensor doesn't read at fixed time intervals, so breaking the stream into RDDs by time interval won't ensure the data is packaged properly. Additionally, multiple sensors are writing to the same stream (though I think flatMap can parse the origin stream into streams for individual sensors, correct?). My best guess for processing goes like 1) flatMap() to break out individual sensor streams 2) Custom parser to accumulate input data until "10" is found, then create a new output RDD for each sensor and data grouping 3) average the values from step 2 I would greatly appreciate pointers to some specific documentation or examples if you have seen something like this before. Thanks, David -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-boundaries-and-triggering-processing-using-tags-in-the-data-tp23060.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql - reading data from sql tables having space in column names
I am having the same problem reading JSON. There does not seem to be a way of selecting a field that has a space, "Executor Info" from the Spark logs. I suggest that we open a JIRA ticket to address this issue. On Jun 2, 2015 10:08 AM, "ayan guha" wrote: > I would think the easiest way would be to create a view in DB with column > names with no space. > > In fact, you can "pass" a sql in place of a real table. > > From documentation: "The JDBC table that should be read. Note that > anything that is valid in a `FROM` clause of a SQL query can be used. For > example, instead of a full table you could also use a subquery in > parentheses." > > Kindly let the community know if this works > > On Tue, Jun 2, 2015 at 6:43 PM, Sachin Goyal > wrote: > >> Hi, >> >> We are using spark sql (1.3.1) to load data from Microsoft sql server >> using jdbc (as described in >> https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases >> ). >> >> It is working fine except when there is a space in column names (we can't >> modify the schemas to remove space as it is a legacy database). >> >> Sqoop is able to handle such scenarios by enclosing column names in '[ ]' >> - the recommended method from microsoft sql server. ( >> https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java >> - line no 319) >> >> Is there a way to handle this in spark sql? >> >> Thanks, >> sachin >> > > > > -- > Best Regards, > Ayan Guha >
Re: Spark performance
You can certainly query over 4 TB of data with Spark. However, you will get an answer in minutes or hours, not in milliseconds or seconds. OLTP databases are used for web applications, and typically return responses in milliseconds. Analytic databases tend to operate on large data sets, and return responses in seconds, minutes or hours. When running batch jobs over large data sets, Spark can be a replacement for analytic databases like Greenplum or Netezza. On Sat, Jul 11, 2015 at 8:53 AM, Roman Sokolov wrote: > Hello. Had the same question. What if I need to store 4-6 Tb and do > queries? Can't find any clue in documentation. > Am 11.07.2015 03:28 schrieb "Mohammed Guller" : > >> Hi Ravi, >> >> First, Neither Spark nor Spark SQL is a database. Both are compute >> engines, which need to be paired with a storage system. Seconds, they are >> designed for processing large distributed datasets. If you have only >> 100,000 records or even a million records, you don’t need Spark. A RDBMS >> will perform much better for that volume of data. >> >> >> >> Mohammed >> >> >> >> *From:* Ravisankar Mani [mailto:rrav...@gmail.com] >> *Sent:* Friday, July 10, 2015 3:50 AM >> *To:* user@spark.apache.org >> *Subject:* Spark performance >> >> >> >> Hi everyone, >> >> I have planned to move mssql server to spark?. I have using around >> 50,000 to 1l records. >> >> The spark performance is slow when compared to mssql server. >> >> >> >> What is the best data base(Spark or sql) to store or retrieve data around >> 50,000 to 1l records ? >> >> regards, >> >> Ravi >> >> >> > -- ### Confidential e-mail, for recipient's (or recipients') eyes only, not for distribution. ###
Re: No. of Task vs No. of Executors
This is likely due to data skew. If you are using key-value pairs, one key has a lot more records, than the other keys. Do you have any groupBy operations? David On Tue, Jul 14, 2015 at 9:43 AM, shahid wrote: > hi > > I have a 10 node cluster i loaded the data onto hdfs, so the no. of > partitions i get is 9. I am running a spark application , it gets stuck on > one of tasks, looking at the UI it seems application is not using all nodes > to do calculations. attached is the screen shot of tasks, it seems tasks > are > put on each node more then once. looking at tasks 8 tasks get completed > under 7-8 minutes and one task takes around 30 minutes so causing the delay > in results. > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n23824/Screen_Shot_2015-07-13_at_9.png > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- ### Confidential e-mail, for recipient's (or recipients') eyes only, not for distribution. ###
Unable to Limit UI to localhost interface
Greetings to all, I've search around the mailing list, but it would seem that (nearly?) everyone has the opposite problem as mine. I made a stab at looking in the source for an answer, but I figured I might as well see if anyone else has run into the same problem as I. I'm trying to limit my Master/Worker UI to run only on localhost. As it stands, I have the following two environment variables set in my spark-env.sh: SPARK_LOCAL_IP=127.0.0.1 SPARK_MASTER_IP=127.0.0.1 and my slaves file contains one line: 127.0.0.1 The problem is that when I run "start-all.sh", I can nmap my box's public interface and get the following: PORT STATE SERVICE 22/tcp open ssh 8080/tcp open http-proxy 8081/tcp open blackice-icecap Furthermore, I can go to my box's public IP at port 8080 in my browser and get the master node's UI. The UI even reports that the URL/REST URLs to be 127.0.0.1: Spark Master at spark://127.0.0.1:7077 URL: spark://127.0.0.1:7077 REST URL: spark://127.0.0.1:6066 (cluster mode) I'd rather not have spark available in any way to the outside world without an explicit SSH tunnel. There are variables to do with setting the Web UI port, but I'm not concerned with the port, only the network interface to which the Web UI binds. Any help would be greatly appreciated.
Re: Unable to Limit UI to localhost interface
/etc/hosts 127.0.0.1 localhost conf/slaves 127.0.0.1 On Mon, Mar 28, 2016 at 5:36 PM, Mich Talebzadeh wrote: > in your /etc/hosts what do you have for localhost > > 127.0.0.1 localhost.localdomain localhost > > conf/slave should have one entry in your case > > cat slaves > # A Spark Worker will be started on each of the machines listed below. > localhost > ... > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 28 March 2016 at 15:32, David O'Gwynn wrote: > >> Greetings to all, >> >> I've search around the mailing list, but it would seem that (nearly?) >> everyone has the opposite problem as mine. I made a stab at looking in the >> source for an answer, but I figured I might as well see if anyone else has >> run into the same problem as I. >> >> I'm trying to limit my Master/Worker UI to run only on localhost. As it >> stands, I have the following two environment variables set in my >> spark-env.sh: >> >> SPARK_LOCAL_IP=127.0.0.1 >> SPARK_MASTER_IP=127.0.0.1 >> >> and my slaves file contains one line: 127.0.0.1 >> >> The problem is that when I run "start-all.sh", I can nmap my box's public >> interface and get the following: >> >> PORT STATE SERVICE >> 22/tcp open ssh >> 8080/tcp open http-proxy >> 8081/tcp open blackice-icecap >> >> Furthermore, I can go to my box's public IP at port 8080 in my browser >> and get the master node's UI. The UI even reports that the URL/REST URLs to >> be 127.0.0.1: >> >> Spark Master at spark://127.0.0.1:7077 >> URL: spark://127.0.0.1:7077 >> REST URL: spark://127.0.0.1:6066 (cluster mode) >> >> I'd rather not have spark available in any way to the outside world >> without an explicit SSH tunnel. >> >> There are variables to do with setting the Web UI port, but I'm not >> concerned with the port, only the network interface to which the Web UI >> binds. >> >> Any help would be greatly appreciated. >> >> >
Re: Unable to Limit UI to localhost interface
Thanks much, Akhil. iptables is certainly a bandaid, but from an OpSec perspective, it's troubling. Is there any way to limit which interfaces the WebUI listens on? Is there a Jetty configuration that I'm missing? Thanks again for your help, David On Wed, Mar 30, 2016 at 2:25 AM, Akhil Das wrote: > In your case, you will be able to see the webui (unless restricted with > iptables) but you won't be able to submit jobs to that machine from a > remote machine since the spark master is spark://127.0.0.1:7077 > > Thanks > Best Regards > > On Tue, Mar 29, 2016 at 8:12 PM, David O'Gwynn wrote: > >> /etc/hosts >> >> 127.0.0.1 localhost >> >> conf/slaves >> 127.0.0.1 >> >> >> On Mon, Mar 28, 2016 at 5:36 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> in your /etc/hosts what do you have for localhost >>> >>> 127.0.0.1 localhost.localdomain localhost >>> >>> conf/slave should have one entry in your case >>> >>> cat slaves >>> # A Spark Worker will be started on each of the machines listed below. >>> localhost >>> ... >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 28 March 2016 at 15:32, David O'Gwynn wrote: >>> >>>> Greetings to all, >>>> >>>> I've search around the mailing list, but it would seem that (nearly?) >>>> everyone has the opposite problem as mine. I made a stab at looking in the >>>> source for an answer, but I figured I might as well see if anyone else has >>>> run into the same problem as I. >>>> >>>> I'm trying to limit my Master/Worker UI to run only on localhost. As it >>>> stands, I have the following two environment variables set in my >>>> spark-env.sh: >>>> >>>> SPARK_LOCAL_IP=127.0.0.1 >>>> SPARK_MASTER_IP=127.0.0.1 >>>> >>>> and my slaves file contains one line: 127.0.0.1 >>>> >>>> The problem is that when I run "start-all.sh", I can nmap my box's >>>> public interface and get the following: >>>> >>>> PORT STATE SERVICE >>>> 22/tcp open ssh >>>> 8080/tcp open http-proxy >>>> 8081/tcp open blackice-icecap >>>> >>>> Furthermore, I can go to my box's public IP at port 8080 in my browser >>>> and get the master node's UI. The UI even reports that the URL/REST URLs to >>>> be 127.0.0.1: >>>> >>>> Spark Master at spark://127.0.0.1:7077 >>>> URL: spark://127.0.0.1:7077 >>>> REST URL: spark://127.0.0.1:6066 (cluster mode) >>>> >>>> I'd rather not have spark available in any way to the outside world >>>> without an explicit SSH tunnel. >>>> >>>> There are variables to do with setting the Web UI port, but I'm not >>>> concerned with the port, only the network interface to which the Web UI >>>> binds. >>>> >>>> Any help would be greatly appreciated. >>>> >>>> >>> >> >
RE: DStream how many RDD's are created by batch
Hi, Time is usually the criteria if I’m understanding your question. An RDD is created for each batch interval. If your interval is 500ms then an RDD would be created every 500ms. If it’s 2 seconds then an RDD is created every 2 seconds. Cheers, David From: Natu Lauchande [mailto:nlaucha...@gmail.com] Sent: Tuesday, April 12, 2016 7:09 AM To: user@spark.apache.org Subject: DStream how many RDD's are created by batch Hi, What's the criteria for the number of RDD's created for each micro bath iteration ? Thanks, Natu
RE: DStream how many RDD's are created by batch
Hi Natu, I believe you are correct one RDD would be created for each file. Cheers, David From: Natu Lauchande [mailto:nlaucha...@gmail.com] Sent: Tuesday, April 12, 2016 1:48 PM To: David Newberger Cc: user@spark.apache.org Subject: Re: DStream how many RDD's are created by batch Hi David, Thanks for you answer. I have a follow up question : I am using textFileStream , and listening in an S3 bucket for new files to process. Files are created every 5 minutes and my batch interval is 2 minutes . Does it mean that each file will be for one RDD ? Thanks, Natu On Tue, Apr 12, 2016 at 7:46 PM, David Newberger mailto:david.newber...@wandcorp.com>> wrote: Hi, Time is usually the criteria if I’m understanding your question. An RDD is created for each batch interval. If your interval is 500ms then an RDD would be created every 500ms. If it’s 2 seconds then an RDD is created every 2 seconds. Cheers, David From: Natu Lauchande [mailto:nlaucha...@gmail.com<mailto:nlaucha...@gmail.com>] Sent: Tuesday, April 12, 2016 7:09 AM To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: DStream how many RDD's are created by batch Hi, What's the criteria for the number of RDD's created for each micro bath iteration ? Thanks, Natu
RE: Spark replacing Hadoop
Can we assume your question is “Will Spark replace Hadoop MapReduce?” or do you literally mean replacing the whole of Hadoop? David From: Ashok Kumar [mailto:ashok34...@yahoo.com.INVALID] Sent: Thursday, April 14, 2016 2:13 PM To: User Subject: Spark replacing Hadoop Hi, I hear that some saying that Hadoop is getting old and out of date and will be replaced by Spark! Does this make sense and if so how accurate is it? Best
RE: Can not set spark dynamic resource allocation
Hi All, The error you are seeing looks really similar to Spark-13514 to me. I could be wrong though https://issues.apache.org/jira/browse/SPARK-13514 Can you check yarn.nodemanager.local-dirs in your YARN configuration for "file://" Cheers! David Newberger -Original Message- From: Cui, Weifeng [mailto:weife...@a9.com] Sent: Friday, May 20, 2016 4:26 PM To: Marcelo Vanzin Cc: Ted Yu; Rodrick Brown; user; Zhao, Jun; Aulakh, Sahib; Song, Yiwei Subject: Re: Can not set spark dynamic resource allocation Sorry, here is the node-manager log. application_1463692924309_0002 is my test. Hope this will help. http://pastebin.com/0BPEcgcW On 5/20/16, 2:09 PM, "Marcelo Vanzin" wrote: >Hi Weifeng, > >That's the Spark event log, not the YARN application log. You get the >latter using the "yarn logs" command. > >On Fri, May 20, 2016 at 1:14 PM, Cui, Weifeng wrote: >> Here is the application log for this spark job. >> >> http://pastebin.com/2UJS9L4e >> >> >> >> Thanks, >> Weifeng >> >> >> >> >> >> From: "Aulakh, Sahib" >> Date: Friday, May 20, 2016 at 12:43 PM >> To: Ted Yu >> Cc: Rodrick Brown , Cui Weifeng >> , user , "Zhao, Jun" >> >> Subject: Re: Can not set spark dynamic resource allocation >> >> >> >> Yes it is yarn. We have configured spark shuffle service w yarn node >> manager but something must be off. >> >> >> >> We will send u app log on paste bin. >> >> Sent from my iPhone >> >> >> On May 20, 2016, at 12:35 PM, Ted Yu wrote: >> >> Since yarn-site.xml was cited, I assume the cluster runs YARN. >> >> >> >> On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown >> wrote: >> >> Is this Yarn or Mesos? For the later you need to start an external >> shuffle service. >> >> Get Outlook for iOS >> >> >> >> >> >> On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" >> >> wrote: >> >> Hi guys, >> >> >> >> Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set >> dynamic resource allocation for spark and we followed the following >> link. After the changes, all spark jobs failed. >> >> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-reso >> urce-allocation >> >> This test was on a test cluster which has 1 master machine (running >> namenode, resourcemanager and hive server), 1 worker machine (running >> datanode and nodemanager) and 1 machine as client( running spark shell). >> >> >> >> What I updated in config : >> >> >> >> 1. Update in spark-defaults.conf >> >> spark.dynamicAllocation.enabled true >> >> spark.shuffle.service.enabledtrue >> >> >> >> 2. Update yarn-site.xml >> >> >> >> yarn.nodemanager.aux-services >> mapreduce_shuffle,spark_shuffle >> >> >> >> yarn.nodemanager.aux-services.spark_shuffle.class >> org.apache.spark.network.yarn.YarnShuffleService >> >> >> >> spark.shuffle.service.enabled >> true >> >> >> 3. Copy spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath >> ($HADOOP_HOME/share/hadoop/yarn/*) in python code >> >> 4. Restart namenode, datanode, resourcemanager, nodemanger... retart >> everything >> >> 5. The config will update in all machines, resourcemanager and nodemanager. >> We update the config in one place and copy to all machines. >> >> >> >> What I tested: >> >> >> >> 1. I started a scala spark shell and check its environment variables, >> spark.dynamicAllocation.enabled is true. >> >> 2. I used the following code: >> >> scala > val line = >> sc.textFile("/spark-events/application_1463681113470_0006") >> >> line: org.apache.spark.rdd.RDD[String] = >> /spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at >> textFile at :27 >> >> scala > line.count # This command just stuck here >> >> >> >> 3. In the beginning, there is only 1 executor(this is for driver) and >> after line.count, I could see 3 executors, then dropped to 1. >> >> 4. Several jobs were launched and all of them failed. Tasks (for all >&
RE: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image
Is https://github.com/alonsoir/awesome-recommendation-engine/blob/master/build.sbt the build.sbt you are using? David Newberger QA Analyst WAND - The Future of Restaurant Technology (W) www.wandcorp.com<http://www.wandcorp.com/> (E) david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com> (P) 952.361.6200 From: Alonso [mailto:alons...@gmail.com] Sent: Tuesday, May 31, 2016 11:11 AM To: user@spark.apache.org Subject: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X as my development machine, and the cdh image to run the code, i upload the code using git to the cdh image, i have modified my /etc/hosts file located in the cdh image with a line like this: 127.0.0.1 quickstart.cloudera quickstart localhost localhost.domain 192.168.30.138 quickstart.cloudera quickstart localhost localhost.domain The cloudera version that i am running is: [cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties # Autogenerated build properties version=2.6.0-cdh5.7.0 git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1 cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76 cloudera.base-branch=cdh5-base-2.6.0 cloudera.build-branch=cdh5-2.6.0_5.7.0 cloudera.pkg.version=2.6.0+cdh5.7.0+1280 cloudera.pkg.release=1.cdh5.7.0.p0.92 cloudera.cdh.release=cdh5.7.0 cloudera.build.time=2016.03.23-18:30:29GMT I can do a ls command in the vmware machine: [cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv -rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 /user/cloudera/ratings.csv I can read its content: [cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l 568454 The code is quite simple, just trying to map its content: val ratingFile="hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv" case class AmazonRating(userId: String, productId: String, rating: Double) val NumRecommendations = 10 val MinRecommendationsPerUser = 10 val MaxRecommendationsPerUser = 20 val MyUsername = "myself" val NumPartitions = 20 println("Using this ratingFile: " + ratingFile) // first create an RDD out of the rating file val rawTrainingRatings = sc.textFile(ratingFile).map { line => val Array(userId, productId, scoreStr) = line.split(",") AmazonRating(userId, productId, scoreStr.toDouble) } // only keep users that have rated between MinRecommendationsPerUser and MaxRecommendationsPerUser products val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => MinRecommendationsPerUser <= r._2.size && r._2.size < MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache() println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}") I am getting this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 0 ratings out of 568454 because if i run the exact code within the spark-shell, i got this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 ratings out of 568454 Why is it working fine within the spark-shell but it is not running fine programmatically in the vmware image? I am running the code using sbt-pack plugin to generate unix commands and run them within the vmware image which has the spark pseudocluster, This is the code i use to instantiate the sparkconf: val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector") .setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val ssc = new StreamingContext(sparkConf, Seconds(2)) //this checkpointdir should be in a conf file, for now it is hardcoded! val streamingCheckpointDir = "/home/cloudera/my-recommendation-spark-engine/checkpoint" ssc.checkpoint(streamingCheckpointDir) I have tried to use this way of setting spark master, but an exception raises, i suspect that this is symptomatic of my problem. //.setMaster("spark://quickstart.cloudera:7077") The exception when i try to use the fully qualified domain name: .setMaster("spark://quickstart.cloudera:7077") java.io.IOException: Failed to connect to quickstart.cloudera/127.0.0.1:7077<http://127.0.0.1:7077> at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) at org.a
RE: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image
Have you tried it without either of the setMaster lines? Also, CDH 5.7 uses spark 1.6.0 with some patches. I would recommend using the cloudera repo for spark files in build sbt. I’d also check other files in the build sbt to see if there are cdh specific versions. David Newberger From: Alonso Isidoro Roman [mailto:alons...@gmail.com] Sent: Tuesday, May 31, 2016 1:23 PM To: David Newberger Cc: user@spark.apache.org Subject: Re: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image Hi David, the one of the develop branch. I think It should be the same, but actually not sure... Regards Alonso Isidoro Roman about.me/alonso.isidoro.roman 2016-05-31 19:40 GMT+02:00 David Newberger mailto:david.newber...@wandcorp.com>>: Is https://github.com/alonsoir/awesome-recommendation-engine/blob/master/build.sbt the build.sbt you are using? David Newberger QA Analyst WAND - The Future of Restaurant Technology (W) www.wandcorp.com<http://www.wandcorp.com/> (E) david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com> (P) 952.361.6200 From: Alonso [mailto:alons...@gmail.com<mailto:alons...@gmail.com>] Sent: Tuesday, May 31, 2016 11:11 AM To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X as my development machine, and the cdh image to run the code, i upload the code using git to the cdh image, i have modified my /etc/hosts file located in the cdh image with a line like this: 127.0.0.1 quickstart.cloudera quickstart localhost localhost.domain 192.168.30.138 quickstart.cloudera quickstart localhost localhost.domain The cloudera version that i am running is: [cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties # Autogenerated build properties version=2.6.0-cdh5.7.0 git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1 cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76 cloudera.base-branch=cdh5-base-2.6.0 cloudera.build-branch=cdh5-2.6.0_5.7.0 cloudera.pkg.version=2.6.0+cdh5.7.0+1280 cloudera.pkg.release=1.cdh5.7.0.p0.92 cloudera.cdh.release=cdh5.7.0 cloudera.build.time=2016.03.23-18:30:29GMT I can do a ls command in the vmware machine: [cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv -rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 /user/cloudera/ratings.csv I can read its content: [cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l 568454 The code is quite simple, just trying to map its content: val ratingFile="hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv" case class AmazonRating(userId: String, productId: String, rating: Double) val NumRecommendations = 10 val MinRecommendationsPerUser = 10 val MaxRecommendationsPerUser = 20 val MyUsername = "myself" val NumPartitions = 20 println("Using this ratingFile: " + ratingFile) // first create an RDD out of the rating file val rawTrainingRatings = sc.textFile(ratingFile).map { line => val Array(userId, productId, scoreStr) = line.split(",") AmazonRating(userId, productId, scoreStr.toDouble) } // only keep users that have rated between MinRecommendationsPerUser and MaxRecommendationsPerUser products val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => MinRecommendationsPerUser <= r._2.size && r._2.size < MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache() println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}") I am getting this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 0 ratings out of 568454 because if i run the exact code within the spark-shell, i got this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 ratings out of 568454 Why is it working fine within the spark-shell but it is not running fine programmatically in the vmware image? I am running the code using sbt-pack plugin to generate unix commands and run them within the vmware image which has the spark pseudocluster, This is the code i use to instantiate the sparkconf: val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector") .setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val ssc = new StreamingContext(sparkConf, Seconds(2)) //this checkpointdir should be in a conf file
RE: About a problem running a spark job in a cdh-5.7.0 vmware image.
Alonso, The CDH VM uses YARN and the default deploy mode is client. I’ve been able to use the CDH VM for many learning scenarios. http://www.cloudera.com/documentation/enterprise/latest.html http://www.cloudera.com/documentation/enterprise/latest/topics/spark.html David Newberger From: Alonso [mailto:alons...@gmail.com] Sent: Friday, June 3, 2016 5:39 AM To: user@spark.apache.org Subject: About a problem running a spark job in a cdh-5.7.0 vmware image. Hi, i am developing a project that needs to use kafka, spark-streaming and spark-mllib, this is the github project<https://github.com/alonsoir/awesome-recommendation-engine/tree/develop>. I am using a vmware cdh-5.7-0 image, with 4 cores and 8 GB of ram, the file that i want to use is only 16 MB, if i finding problems related with resources because the process outputs this message: .set("spark.driver.allowMultipleContexts", "true") <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> 16/06/03 11:58:09 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources when i go to spark-master page, i can see this: Spark Master at spark://192.168.30.137:7077 URL: spark://192.168.30.137:7077 REST URL: spark://192.168.30.137:6066 (cluster mode) Alive Workers: 0 Cores in use: 0 Total, 0 Used Memory in use: 0.0 B Total, 0.0 B Used Applications: 2 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE Workers Worker Id Address State Cores Memory Running Applications Application ID Name Cores Memory per Node Submitted Time User State Duration app-20160603115752-0001 (kill) AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:52 cloudera WAITING 2.0 min app-20160603115751- (kill) AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:51 cloudera WAITING 2.0 min And this is the spark-worker output: Spark Worker at 192.168.30.137:7078 ID: worker-20160603115937-192.168.30.137-7078 Master URL: Cores: 4 (0 Used) Memory: 6.7 GB (0.0 B Used) Back to Master Running Executors (0) ExecutorID Cores State Memory Job Details Logs It is weird isn't ? master url is not set up and there is not any ExecutorID, Cores, so on so forth... If i do a ps xa | grep spark, this is the output: [cloudera@quickstart bin]$ ps xa | grep spark 6330 ?Sl 0:11 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Dspark.deploy.defaultCores=4 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master 6674 ?Sl 0:12 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /etc/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Dspark.history.fs.logDirectory=hdfs:///user/spark/applicationHistory -Dspark.history.ui.port=18088 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.history.HistoryServer 8153 pts/1Sl+0:14 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /home/cloudera/awesome-recommendation-engine/target/pack/lib/* -Dprog.home=/home/cloudera/awesome-recommendation-engine/target/pack -Dprog.version=1.0-SNAPSHOT example.spark.AmazonKafkaConnector 192.168.1.35:9092 amazonRatingsTopic 8413 ?Sl 0:04 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker spark://quickstart.cloudera:7077 8619 pts/3S+ 0:00 grep spark master is set up with four cores and 1 GB and worker has not any dedicated core and it is using 1GB, that is weird isn't ? I have configured the vmware image with 4 cores (from eight)
RE: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1
What does your processing time look like. Is it consistently within that 20sec micro batch window? David Newberger From: Adrian Tanase [mailto:atan...@adobe.com] Sent: Friday, June 3, 2016 8:14 AM To: user@spark.apache.org Cc: Cosmin Ciobanu Subject: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1 Hi all, Trying to repost this question from a colleague on my team, somehow his subscription is not active: http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html Appreciate any thoughts, -adrian
RE: About a problem running a spark job in a cdh-5.7.0 vmware image.
Alonso, I could totally be misunderstanding something or missing a piece of the puzzle however remove .setMaster. If you do that it will run with whatever the CDH VM is setup for which in the out of the box default case is YARN and Client. val sparkConf = new SparkConf().setAppName(“Some App thingy thing”) From the Spark 1.6.0 Scala API Documentation: https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.SparkConf “ Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. Most of the time, you would create a SparkConf object with new SparkConf(), which will load values from any spark.* Java system properties set in your application as well. In this case, parameters you set directly on the SparkConf object take priority over system properties. For unit tests, you can also call new SparkConf(false) to skip loading external settings and get the same configuration no matter what the system properties are. All setter methods in this class support chaining. For example, you can write new SparkConf().setMaster("local").setAppName("My app"). Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user. Spark does not support modifying the configuration at runtime. “ David Newberger From: Alonso Isidoro Roman [mailto:alons...@gmail.com] Sent: Friday, June 3, 2016 10:37 AM To: David Newberger Cc: user@spark.apache.org Subject: Re: About a problem running a spark job in a cdh-5.7.0 vmware image. Thank you David, so, i would have to change the way that i am creating SparkConf object, isn't? I can see in this link<http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html#concept_ysw_lnp_h5> that the way to run a spark job using YARN is using this kind of command: spark-submit --class org.apache.spark.examples.SparkPi --master yarn \ --deploy-mode client SPARK_HOME/lib/spark-examples.jar 10 Can i use this way programmatically? maybe changing setMaster? to something like setMaster("yarn:quickstart.cloudera:8032")? I have seen the port in this guide: http://www.cloudera.com/documentation/enterprise/5-6-x/topics/cdh_ig_ports_cdh5.html
RE: Spark Streaming - long garbage collection time
Have you tried UseG1GC in place of UseConcMarkSweepGC? This article really helped me with GC a few short weeks ago https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html David Newberger -Original Message- From: Marco1982 [mailto:marco.plata...@yahoo.it] Sent: Friday, June 3, 2016 2:19 PM To: user@spark.apache.org Subject: Spark Streaming - long garbage collection time Hi all, I'm running a Spark Streaming application with 1-hour batches to join two data feeds and write the output to disk. The total size of one data feed is about 40 GB per hour (split in multiple files), while the size of the second data feed is about 600-800 MB per hour (also split in multiple files). Due to application constraints, I may not be able to run smaller batches. Currently, it takes about 20 minutes to produce the output in a cluster with 140 cores and 700 GB of RAM. I'm running 7 workers and 28 executors, each with 5 cores and 22 GB of RAM. I execute mapToPair(), filter(), and reduceByKeyAndWindow(1 hour batch) on the 40 GB data feed. Most of the computation time is spent on these operations. What worries me is the Garbage Collection (GC) execution time per executor, which goes from 25 secs to 9.2 mins. I attach two screenshots below: one lists the GC time and one prints out GC comments for a single executor. I anticipate that the executor that spends 9.2 mins in doing garbage collection is eventually killed by the Spark driver. I think these numbers are too high. Do you have any suggestion about keeping GC time low? I'm already using Kryo Serializer, ++UseConcMarkSweepGC, and spark.rdd.compress=true. Is there anything else that would help? Thanks <http://apache-spark-user-list.1001560.n3.nabble.com/file/n27087/gc_time.png> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n27087/executor_16.png> -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-long-garbage-collection-time-tp27087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.
I was going to ask if you had 2 jobs running. If the checkpointing for both are setup to look at the same location I could see an error like this happening. Do both spark jobs have a reference to a checkpointing dir? David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] Sent: Friday, June 3, 2016 3:20 PM To: user @spark Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. OK I was running two spark streaming jobs, one using streaming data from Kafka and another from twitter in local mode on the same node. It is possible that the directory /user/hduser/checkpoint/temp is shared by both spark streaming jobs any experience on this please? Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/> On 3 June 2016 at 20:48, Mich Talebzadeh mailto:mich.talebza...@gmail.com>> wrote: Hi, Just started seeing these errors: 16/06/03 20:30:01 ERROR DFSClient: Failed to close inode 806125 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-907736468_1, pendingcreates: 1] at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) Sounds like a connection is left open but cannot establish why! Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>
RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.
Hi Mich, My gut says you are correct that each application should have its own checkpoint directory. Though honestly I’m a bit fuzzy on checkpointing still as I’ve not worked with it much yet. Cheers, David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] Sent: Friday, June 3, 2016 3:40 PM To: David Newberger Cc: user @spark Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. Hi David yes they do The first streaming job does val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") And the twitter does /** Returns the HDFS URL */ def getCheckpointDirectory(): String = { try { val name : String = Seq("bash", "-c", "curl -s http://169.254.169.254/latest/meta-data/hostname";) !! ; println("Hostname = " + name) "hdfs://" + name.trim + ":9000/checkpoint/" } catch { case e: Exception => { "./checkpoint/" } } I need to change one of these. Actually a better alternative would be that each application has its own checkpoint? THanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/> On 3 June 2016 at 21:23, David Newberger mailto:david.newber...@wandcorp.com>> wrote: I was going to ask if you had 2 jobs running. If the checkpointing for both are setup to look at the same location I could see an error like this happening. Do both spark jobs have a reference to a checkpointing dir? David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>] Sent: Friday, June 3, 2016 3:20 PM To: user @spark Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. OK I was running two spark streaming jobs, one using streaming data from Kafka and another from twitter in local mode on the same node. It is possible that the directory /user/hduser/checkpoint/temp is shared by both spark streaming jobs any experience on this please? Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/> On 3 June 2016 at 20:48, Mich Talebzadeh mailto:mich.talebza...@gmail.com>> wrote: Hi, Just started seeing these errors: 16/06/03 20:30:01 ERROR DFSClient: Failed to close inode 806125 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-907736468_1, pendingcreates: 1] at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) Sounds like a connection is left open but cannot establish why! Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>
RE: Creating a Hive table through Spark and potential locking issue (a bug)
Could you be looking at 2 jobs trying to use the same file and one getting to it before the other and finally removing it? David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] Sent: Wednesday, June 8, 2016 1:33 PM To: user; user @spark Subject: Creating a Hive table through Spark and potential locking issue (a bug) Hi, I noticed an issue with Spark creating and populating a Hive table. The process as I see is as follows: 1. Spark creates the Hive table. In this case an ORC table in a Hive Database 2. Spark uses JDBC connection to get data out from an Oracle 3. I create a temp table in Spark through (registerTempTable) 4. Spark populates that table. That table is actually created in hdfs dfs -ls /tmp/hive/hduser drwx-- - hduser supergroup /tmp/hive/hduser/b1ea6829-790f-4b37-a0ff-3ed218388059 1. However, The original table itself does not have any locking on it! 2. I log in into Hive and drop that table 3. hive> drop table dummy; OK 1. That table is dropped OK 2. Spark crashes with message Started at [08/06/2016 18:37:53.53] 16/06/08 19:13:46 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/hive/warehouse/oraclehadoop.db/dummy/.hive-staging_hive_2016-06-08_18-38-08_804_3299712811201460314-1/-ext-1/_temporary/0/_temporary/attempt_201606081838_0001_m_00_0/part-0 (inode 831621): File does not exist. Holder DFSClient_NONMAPREDUCE_-1836386597_1 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy22.addBlock(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy23.addBlock(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1532) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1349) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588) 16/06/08 19:13:46 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job Suggested solution. In a concurrent env, Spark should apply locks in order to prevent such operations. Locks are kept in Hive meta data table HIVE_LOCKS HTH Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>
RE: streaming example has error
Have you tried to “set spark.driver.allowMultipleContexts = true”? David Newberger From: Lee Ho Yeung [mailto:jobmatt...@gmail.com] Sent: Tuesday, June 14, 2016 8:34 PM To: user@spark.apache.org Subject: streaming example has error when simulate streaming with nc -lk got error below, then i try example, martin@ubuntu:~/Downloads$ /home/martin/Downloads/spark-1.6.1/bin/run-example streaming.NetworkWordCount localhost Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/06/14 18:33:06 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath. 16/06/14 18:33:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/06/14 18:33:06 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.157.134 instead (on interface eth0) 16/06/14 18:33:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 16/06/14 18:33:13 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes got error too. import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", ) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) ssc.start() ssc.awaitTermination() scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ scala> import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.StreamingContext._ scala> val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@67bcaf<mailto:org.apache.spark.SparkConf@67bcaf> scala> val ssc = new StreamingContext(conf, Seconds(1)) 16/06/14 18:28:44 WARN AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040<http://SelectChannelConnector@0.0.0.0:4040>: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:433) at sun.nio.ch.Net.bind(Net.java:425) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:252) at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262) at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1988) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1979) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:262) at org.apache.spark.ui.WebUI.bind(WebUI.scala:136) at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481) at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext.(SparkContext.scala:481) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874) at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:81) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
RE: Handle empty kafka in Spark Streaming
If you're asking how to handle no messages in a batch window then I would add an isEmpty check like: dStream.foreachRDD(rdd => { if (!rdd.isEmpty()) ... } Or something like that. David Newberger -Original Message- From: Yogesh Vyas [mailto:informy...@gmail.com] Sent: Wednesday, June 15, 2016 6:31 AM To: user Subject: Handle empty kafka in Spark Streaming Hi, Does anyone knows how to handle empty Kafka while Spark Streaming job is running ? Regards, Yogesh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Handle empty kafka in Spark Streaming
Hi Yogesh, I'm not sure if this is possible or not. I'd be interested in knowing. My gut thinks it would be an anti-pattern if it's possible to do something like this and that's why I handle it in either the foreachRDD or foreachPartition. The way I look at spark streaming is as an application which is always running and doing something like windowed batching or microbatching or whatever I'm trying to accomplish. IF an RDD I get from Kafka is empty then I don't run the rest of the job. IF the RDD I'm get from Kafka has some number of events then I'll process the RDD further. David Newberger -Original Message- From: Yogesh Vyas [mailto:informy...@gmail.com] Sent: Wednesday, June 15, 2016 8:30 AM To: David Newberger Subject: Re: Handle empty kafka in Spark Streaming I am looking for something which checks the JavaPairReceiverInputDStreambefore further going for any operations. For example, if I have get JavaPairReceiverInputDStream in following manner: JavaPairReceiverInputDStream message=KafkaUtils.createStream(ssc, zkQuorum, group, topics, StorageLevel.MEMORY_AND_DISK_SER()); Then I would like check whether message is empty or not. If it not empty then go for further operations else wait for some data in Kafka. On Wed, Jun 15, 2016 at 6:31 PM, David Newberger wrote: > If you're asking how to handle no messages in a batch window then I would add > an isEmpty check like: > > dStream.foreachRDD(rdd => { > if (!rdd.isEmpty()) > ... > } > > Or something like that. > > > David Newberger > > -Original Message- > From: Yogesh Vyas [mailto:informy...@gmail.com] > Sent: Wednesday, June 15, 2016 6:31 AM > To: user > Subject: Handle empty kafka in Spark Streaming > > Hi, > > Does anyone knows how to handle empty Kafka while Spark Streaming job is > running ? > > Regards, > Yogesh > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For > additional commands, e-mail: user-h...@spark.apache.org >
RE: Limit pyspark.daemon threads
Have you tried setting spark.cores.max “When running on a standalone deploy cluster<http://spark.apache.org/docs/latest/spark-standalone.html> or a Mesos cluster in "coarse-grained" sharing mode<http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-run-modes>, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will bespark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos.” David Newberger From: agateaaa [mailto:agate...@gmail.com] Sent: Wednesday, June 15, 2016 4:39 PM To: Gene Pang Cc: Sven Krasser; Carlile, Ken; user Subject: Re: Limit pyspark.daemon threads Thx Gene! But my concern is with CPU usage not memory. I want to see if there is anyway to control the number of pyspark.daemon processes that get spawned. We have some restriction on number of CPU's we can use on a node, and number of pyspark.daemon processes that get created dont seem to honor spark.executor.cores property setting Thanks! On Wed, Jun 15, 2016 at 1:53 PM, Gene Pang mailto:gene.p...@gmail.com>> wrote: As Sven mentioned, you can use Alluxio to store RDDs in off-heap memory, and you can then share that RDD across different jobs. If you would like to run Spark on Alluxio, this documentation can help: http://www.alluxio.org/documentation/master/en/Running-Spark-on-Alluxio.html Thanks, Gene On Tue, Jun 14, 2016 at 12:44 AM, agateaaa mailto:agate...@gmail.com>> wrote: Hi, I am seeing this issue too with pyspark (Using Spark 1.6.1). I have set spark.executor.cores to 1, but I see that whenever streaming batch starts processing data, see python -m pyspark.daemon processes increase gradually to about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon takes up around 100 % CPU) After the processing is done 4 pyspark.daemon processes go away and we are left with one till the next batch run. Also sometimes the CPU usage for executor process spikes to about 800% even though spark.executor.core is set to 1 e.g. top output PID USER PR NI VIRT RES SHR S %CPU %MEMTIME+ COMMAND 19634 spark 20 0 8871420 1.790g 32056 S 814.1 2.9 0:39.33 /usr/lib/j+ <--EXECUTOR 13897 spark 20 0 46576 17916 6720 S 100.0 0.0 0:00.17 python -m + <--pyspark.daemon 13991 spark 20 0 46524 15572 4124 S 98.0 0.0 0:08.18 python -m + <--pyspark.daemon 14488 spark 20 0 46524 15636 4188 S 98.0 0.0 0:07.25 python -m + <--pyspark.daemon 14514 spark 20 0 46524 15636 4188 S 94.0 0.0 0:06.72 python -m + <--pyspark.daemon 14526 spark 20 0 48200 17172 4092 S 0.0 0.0 0:00.38 python -m + <--pyspark.daemon Is there any way to control the number of pyspark.daemon processes that get spawned ? Thank you Agateaaa On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser mailto:kras...@gmail.com>> wrote: Hey Ken, 1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap storage option using Alluxio, formerly Tachyon, with which I have no experience however.) 2. The worker memory setting is not a hard maximum unfortunately. What happens is that during aggregation the Python daemon will check its process size. If the size is larger than this setting, it will start spilling to disk. I've seen many occasions where my daemons grew larger. Also, you're relying on Python's memory management to free up space again once objects are evicted. In practice, leave this setting reasonably small but make sure there's enough free memory on the machine so you don't run into OOM conditions. If the lower memory setting causes strains for your users, make sure they increase the parallelism of their jobs (smaller partitions meaning less data is processed at a time). 3. I believe that is the behavior you can expect when setting spark.executor.cores. I've not experimented much with it and haven't looked at that part of the code, but what you describe also reflects my understanding. Please share your findings here, I'm sure those will be very helpful to others, too. One more suggestion for your users is to move to the Pyspark DataFrame API. Much of the processing will then happen in the JVM, and you will bump into fewer Python resource contention issues. Best, -Sven On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken mailto:carli...@janelia.hhmi.org>> wrote: This is extremely helpful! I’ll have to talk to my users about how the python memory limit should be adjusted and what their expectations are. I’m fairly certain we bumped it up in the dark past when jobs were failing because of insufficient memory for the python processes. So just to make sure I’m understanding correctly: * JVM memory (set by SPARK_EXECUTOR_MEMORY and/or SPARK_WORKER_MEMORY?) is wher
RE: streaming example has error
Try adding wordCounts.print() before ssc.start() David Newberger From: Lee Ho Yeung [mailto:jobmatt...@gmail.com] Sent: Wednesday, June 15, 2016 9:16 PM To: David Newberger Cc: user@spark.apache.org Subject: Re: streaming example has error got another error StreamingContext: Error starting the context, marking it as stopped /home/martin/Downloads/spark-1.6.1/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount").set("spark.driver.allowMultipleContexts", "true") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", ) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) ssc.start() ssc.awaitTermination() scala> val pairs = words.map(word => (word, 1)) pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@61a5e7<mailto:org.apache.spark.streaming.dstream.MappedDStream@61a5e7> scala> val wordCounts = pairs.reduceByKey(_ + _) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@a522f1<mailto:org.apache.spark.streaming.dstream.ShuffledDStream@a522f1> scala> ssc.start() 16/06/15 19:14:10 ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute at scala.Predef$.require(Predef.scala:233) at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:44) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:46) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:52) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58) at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:60) at $line42.$read$$iwC$$iwC$$iwC$$iwC.(:62) at $line42.$read$$iwC$$iwC$$iwC.(:64) at $line42.$read$$iwC$$iwC.(:66) at $line42.$read$$iwC.(:68) at $line42.$read.(:70) at $line42.$read$.(:74) at $line42.$read$.() at $line42.$eval$.(:7) at $line42.$eval$.() at $line42.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org<http://org.apache.spark.repl.SparkILoop.org>$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$proces
RE: difference between dataframe and dataframwrite
DataFrame is a collection of data which is organized into named columns. DataFrame.write is an interface for saving the contents of a DataFrame to external storage. Hope this helps David Newberger From: pseudo oduesp [mailto:pseudo20...@gmail.com] Sent: Thursday, June 16, 2016 9:43 AM To: user@spark.apache.org Subject: difference between dataframe and dataframwrite hi, what is difference between dataframe and dataframwrite ?
RE: HBase-Spark Module
Hi Ben, This seems more like a question for community.cloudera.com. However, it would be in hbase not spark I believe. https://repository.cloudera.com/artifactory/webapp/#/artifacts/browse/tree/General/cloudera-release-repo/org/apache/hbase/hbase-spark David Newberger -Original Message- From: Benjamin Kim [mailto:bbuil...@gmail.com] Sent: Friday, July 29, 2016 12:57 PM To: user@spark.apache.org Subject: HBase-Spark Module I would like to know if anyone has tried using the hbase-spark module? I tried to follow the examples in conjunction with CDH 5.8.0. I cannot find the HBaseTableCatalog class in the module or in any of the Spark jars. Can someone help? Thanks, Ben - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Need Advice: Spark-Streaming Setup
Hi, I'm currently working on Spark, HBase-Setup which processes log files (~10 GB/day). These log files are persisted hourly on n > 10 application servers and copied to a 4 node hdfs. Our current spark-job aggregates single visits (based on a session-uuid) across all application-servers on a daily basis. Visits are filtered (only about 1% of data remains) and stored in an HBase for further processing. Currently there is no use of the Spark-Streaming API, i.e. a cronjob runs every day and fires the visit calculation. Questions 1) Ist it really necessary to store the log files in the HDFS or can spark somehow read the files from a local file system and distribute the data to the other nodes? Rationale: The data is (probably) only read once during the visit calculation which defies the purpose of a dfs. 2) If the raw log files have to be in the HDFS, I have to remove the files from the HDFS after processing them, so COPY -> PROCESS -> REMOVE. Is this the way to go? 3) Before I can process a visit for an hour. I have to wait until all log files of all application servers have been copied to the HDFS. It doesn't seem like StreamingContext.fileStream can wait for more sophisticated patterns, e.g. ("context*/logs-2016-08-01-15"). Do you guys have a recommendation to solve this problem? One possible solution: After the files have been copied, create an additional file that indicates spark that all files are available? If you have any questions, please don't hesitate to ask. Thanks, David
Spark on YARN multitenancy
Hello Spark experts, We are currently evaluating Spark on our cluster that already supports MRv2 over YARN. We have noticed a problem with running jobs concurrently, in particular that a running Spark job will not release its resources until the job is finished. Ideally, if two people run any combination of MRv2 and Spark jobs, the resources should be fairly distributed. I have noticed a feature called "dynamic resource allocation" in Spark 1.2, but this does not seem to be solving the problem, because it releases resources only when Spark is IDLE, not while it's BUSY. What I am looking for is similar approch to MapReduce where a new user obtains fair share of resources I haven't been able to locate any further information on this matter. On the other hand, I feel this must be pretty common issue for a lot of users. So, 1. What is your experience when dealing with multitenant (multiple users) Spark cluster with YARN? 2. Is Spark architectually adept to support releasing resources while it's busy? Is this a planned feature or is it something that conflicts with the idea of Spark executors? Thanks
RE: fishing for help!
Hi Eran, Based on the limited information the first things that come to my mind are Processor, RAM, and Disk speed. David Newberger QA Analyst WAND - The Future of Restaurant Technology (W) www.wandcorp.com<http://www.wandcorp.com/> (E) david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com> (P) 952.361.6200 From: Eran Witkon [mailto:eranwit...@gmail.com] Sent: Monday, December 21, 2015 6:54 AM To: user Subject: fishing for help! Hi, I know it is a wide question but can you think of reasons why a pyspark job which runs on from server 1 using user 1 will run faster then the same job when running on server 2 with user 1 Eran
Fat jar can't find jdbc
Hi Everyone, I'm building a prototype that fundamentally grabs data from a MySQL instance, crunches some numbers, and then moves it on down the pipeline. I've been using SBT with assembly tool to build a single jar for deployment. I've gone through the paces of stomping out many dependency problems and have come down to one last (hopefully) zinger. java.lang.ClassNotFoundException: Failed to load class for data source: > jdbc. > > at > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67) > > at > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87) > > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) > > at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203) > > at her.recommender.getDataframe(her.recommender.scala:45) > > at her.recommender.getRecommendations(her.recommender.scala:60) > I'm assuming this has to do with mysql-connector because this is the problem I run into when I'm working with spark-shell and I forget to include my classpath with my mysql-connect jar file. I've tried: - Using different versions of mysql-connector-java in my build.sbt file - Copying the connector jar to my_project/src/main/lib - Copying the connector jar to my_project/lib <-- (this is where I keep my build.sbt) Everything loads fine and works, except my call that does "sqlContext.load("jdbc", myOptions)". I know this is a total newbie question but in my defense, I'm fairly new to Scala, and this is my first go at deploying a fat jar with sbt-assembly. Thanks for any advice! -- David Yerrington yerrington.net
Re: Fat jar can't find jdbc
Sure here it is: import AssemblyKeys._ assemblySettings // assemblyJarName in assembly := "recommender.jar" test in assembly := {} organization := "com.example" version := "0.1" scalaVersion := "2.11.6" scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8") libraryDependencies ++= { val akkaV = "2.3.9" val sprayV = "1.3.3" Seq( "org.apache.spark" %% "spark-core" % "1.5.2", "org.apache.spark" %% "spark-sql" % "1.5.2", "org.apache.spark" %% "spark-hive" % "1.5.2", "org.apache.spark" %% "spark-streaming" % "1.5.2", "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2", "org.apache.spark" %% "spark-streaming-flume" % "1.5.2", "org.apache.spark" %% "spark-mllib" % "1.5.2", "org.apache.commons" % "commons-lang3" % "3.0", "io.spray"%% "spray-can" % sprayV, "io.spray"%% "spray-routing" % sprayV, "io.spray"%% "spray-testkit" % sprayV % "test", "io.spray"%% "spray-json"% "1.3.2", "com.typesafe.akka" %% "akka-actor"% akkaV, "com.typesafe.akka" %% "akka-testkit" % akkaV % "test", "com.zaxxer" % "HikariCP-java6"% "2.3.3", "com.typesafe.slick" %% "slick" % "3.1.0", "org.specs2" %% "specs2-core" % "2.3.11" % "test", "mysql" % "mysql-connector-java" % "5.1.35", "org.slf4j" % "slf4j-nop" % "1.6.4", "net.liftweb" %% "lift-json" % "2.6+", "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.4.0" ) } // For the jackson resolves business resolvers += "Sonatype OSS Snapshots" at " https://oss.sonatype.org/content/repositories/snapshots"; mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.startsWith("META-INF") => MergeStrategy.discard case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList("org", "apache", xs @ _*) => MergeStrategy.first case PathList("org", "jboss", xs @ _*) => MergeStrategy.first case "about.html" => MergeStrategy.rename case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } } Revolver.settings Also, when I run "assembly", I see this, without warning / errors: ... [info] Including: datanucleus-rdbms-3.2.9.jar [info] Including: reactive-streams-1.0.0.jar *[info] Including: mysql-connector-java-5.1.35.jar* [info] Including: commons-pool-1.5.4.jar [info] Including: commons-dbcp-1.4.jar ... On Tue, Dec 22, 2015 at 12:04 AM, Vijay Kiran wrote: > Can you paste your libraryDependencies from build.sbt ? > > ./Vijay > > > On 22 Dec 2015, at 06:12, David Yerrington wrote: > > > > Hi Everyone, > > > > I'm building a prototype that fundamentally grabs data from a MySQL > instance, crunches some numbers, and then moves it on down the pipeline. > I've been using SBT with assembly tool to build a single jar for deployment. > > > > I've gone through the paces of stomping out many dependency problems and > have come down to one last (hopefully) zinger. > > > > java.lang.ClassNotFoundException: Failed to load class for data source: > jdbc. > > > > at > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67) > > > > at > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87) > > > > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) > > > > at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203) > > > > at her.recommender.getDataframe(her.recommender.scala:45) > > > > at her.recommender.getRecommendations(her.recommender.scala:60) > > > > > > I'm assuming this has to do with mysql-connector because this is the > problem I run into when I'm working with spark-sh
Re: Fat jar can't find jdbc
Igor, I think it's available. After I extract the jar file, I see a directory with class files that look very relevant in "/com/mysql/jdbc". After reading this, I started to wonder if MySQL connector was really the problem. Perhaps it's something to do with SQLcontext? I just wired a test endpoint to run a very basic mysql query, outside of Spark, and it worked just fine (yay!). I copied and pasted this example to verify my MySQL connector availability, and it worked just fine: https://mkaz.github.io/2011/05/27/using-scala-with-jdbc-to-connect-to-mysql/ As far as the Maven manifest goes, I'm really not sure. I will research it though. Now I'm wondering if my mergeStrategy is to blame? I'm going to try there next. Thank you for the help! On Tue, Dec 22, 2015 at 1:18 AM, Igor Berman wrote: > David, can you verify that mysql connector classes indeed in your single > jar? > open it with zip tool available at your platform > > another options that might be a problem - if there is some dependency in > MANIFEST(not sure though this is the case of mysql connector) then it might > be broken after preparing single jar > so you need to verify that it's ok(in maven usually it's possible to > define merging policy for resources while creating single jar) > > On 22 December 2015 at 10:04, Vijay Kiran wrote: > >> Can you paste your libraryDependencies from build.sbt ? >> >> ./Vijay >> >> > On 22 Dec 2015, at 06:12, David Yerrington >> wrote: >> > >> > Hi Everyone, >> > >> > I'm building a prototype that fundamentally grabs data from a MySQL >> instance, crunches some numbers, and then moves it on down the pipeline. >> I've been using SBT with assembly tool to build a single jar for deployment. >> > >> > I've gone through the paces of stomping out many dependency problems >> and have come down to one last (hopefully) zinger. >> > >> > java.lang.ClassNotFoundException: Failed to load class for data source: >> jdbc. >> > >> > at >> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67) >> > >> > at >> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87) >> > >> > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) >> > >> > at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203) >> > >> > at her.recommender.getDataframe(her.recommender.scala:45) >> > >> > at her.recommender.getRecommendations(her.recommender.scala:60) >> > >> > >> > I'm assuming this has to do with mysql-connector because this is the >> problem I run into when I'm working with spark-shell and I forget to >> include my classpath with my mysql-connect jar file. >> > >> > I've tried: >> > • Using different versions of mysql-connector-java in my >> build.sbt file >> > • Copying the connector jar to my_project/src/main/lib >> > • Copying the connector jar to my_project/lib <-- (this is where >> I keep my build.sbt) >> > Everything loads fine and works, except my call that does >> "sqlContext.load("jdbc", myOptions)". I know this is a total newbie >> question but in my defense, I'm fairly new to Scala, and this is my first >> go at deploying a fat jar with sbt-assembly. >> > >> > Thanks for any advice! >> > >> > -- >> > David Yerrington >> > yerrington.net >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- David Yerrington yerrington.net
Problem About Worker System.out
I have used Spark 1.4 for 6 months. Thanks all the members of this community for your great work.I have a question about the logging issue. I hope this question can be solved. The program is running under this configurations: YARN Cluster, YARN-client mode. In Scala,writing a code like:rdd.map( a => println(a) ); will get the output about the value of a in our console. However,in Java (1.7),writing rdd.map( new Function(){ @Override public Integer call(Integer a) throws Exception { System.out.println(a); }});won't get the output in our console. The configuration is the same. I have try this code but not work either: rdd.map( new Function(){ @Override public Integer call(Integer a) throws Exception {org.apache.log4j.Logger log = Logger.getLogger(this.getClass()); log.info(a); log.warn(a); log.error(a); log.fatal(a); }}); No output either:final org.apache.log4j.Logger log = Logger.getLogger(this.getClass()); rdd.map( new Function(){ @Override public Integer call(Integer a) throws Exception { log.info(a); log.warn(a); log.error(a); log.fatal(a); }}); It seems that the output of stdout in worker doesn't send the output back to our driver.I am wonder why it works in scala but not in java.Is there a simple way to make java work like scala? Thanks.
FW: Problem About Worker System.out
Thanks. Can we use a slf4j/log4j logger to transfer our message from a worker to a driver?I saw some discussions say that we can use this code to transfer their message:object Holder extends Serializable { @transient lazy val log = Logger.getLogger(getClass.getName) } val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element => Holder.log.info(element) }ref: http://stackoverflow.com/questions/29208844/apache-spark-logging-within-scala Is this a traditional way?Or Spark has a SocketAppender for developer?Date: Mon, 28 Dec 2015 17:52:17 +0800 Subject: Re: Problem About Worker System.out From: sai.sai.s...@gmail.com To: david_john_2...@outlook.com CC: user@spark.apache.org Stdout will not be sent back to driver, no matter you use Scala or Java. You must do something wrongly that makes you think it is an expected behavior. On Mon, Dec 28, 2015 at 5:33 PM, David John wrote: I have used Spark 1.4 for 6 months. Thanks all the members of this community for your great work.I have a question about the logging issue. I hope this question can be solved. The program is running under this configurations: YARN Cluster, YARN-client mode. In Scala,writing a code like:rdd.map( a => println(a) ); will get the output about the value of a in our console. However,in Java (1.7),writing rdd.map( new Function(){ @Override public Integer call(Integer a) throws Exception { System.out.println(a); }});won't get the output in our console. The configuration is the same. I have try this code but not work either: rdd.map( new Function(){ @Override public Integer call(Integer a) throws Exception {org.apache.log4j.Logger log = Logger.getLogger(this.getClass()); log.info(a); log.warn(a); log.error(a); log.fatal(a); }}); No output either:final org.apache.log4j.Logger log = Logger.getLogger(this.getClass()); rdd.map( new Function(){ @Override public Integer call(Integer a) throws Exception { log.info(a); log.warn(a); log.error(a); log.fatal(a); }}); It seems that the output of stdout in worker doesn't send the output back to our driver.I am wonder why it works in scala but not in java.Is there a simple way to make java work like scala? Thanks.
Using Experminal Spark Features
Hi All, I've been looking at the Direct Approach for streaming Kafka integration (http://spark.apache.org/docs/latest/streaming-kafka-integration.html) because it looks like a good fit for our use cases. My concern is the feature is experimental according to the documentation. Has anyone used this approach yet and if so what has you experience been with using it? If it helps we'd be looking to implement it using Scala. Secondly, in general what has people experience been with using experimental features in Spark? Cheers, David Newberger
Re: [discuss] dropping Python 2.6 support
FWIW, RHEL 6 still uses Python 2.6, although 2.7.8 and 3.3.2 are available through Red Hat Software Collections. See: https://www.softwarecollections.org/en/ I run an academic compute cluster on RHEL 6. We do, however, provide Python 2.7.x and 3.5.x via modulefiles. On Tue, Jan 5, 2016 at 8:45 AM, Nicholas Chammas wrote: > +1 > > Red Hat supports Python 2.6 on REHL 5 until 2020 > <https://alexgaynor.net/2015/mar/30/red-hat-open-source-community/>, but > otherwise yes, Python 2.6 is ancient history and the core Python developers > stopped supporting it in 2013. REHL 5 is not a good enough reason to > continue support for Python 2.6 IMO. > > We should aim to support Python 2.7 and Python 3.3+ (which I believe we > currently do). > > Nick > > On Tue, Jan 5, 2016 at 8:01 AM Allen Zhang wrote: > >> plus 1, >> >> we are currently using python 2.7.2 in production environment. >> >> >> >> >> >> 在 2016-01-05 18:11:45,"Meethu Mathew" 写道: >> >> +1 >> We use Python 2.7 >> >> Regards, >> >> Meethu Mathew >> >> On Tue, Jan 5, 2016 at 12:47 PM, Reynold Xin wrote: >> >>> Does anybody here care about us dropping support for Python 2.6 in Spark >>> 2.0? >>> >>> Python 2.6 is ancient, and is pretty slow in many aspects (e.g. json >>> parsing) when compared with Python 2.7. Some libraries that Spark depend on >>> stopped supporting 2.6. We can still convince the library maintainers to >>> support 2.6, but it will be extra work. I'm curious if anybody still uses >>> Python 2.6 to run Spark. >>> >>> Thanks. >>> >>> >>> >> -- David Chin, Ph.D. david.c...@drexel.eduSr. Systems Administrator, URCF, Drexel U. http://www.drexel.edu/research/urcf/ https://linuxfollies.blogspot.com/ +1.215.221.4747 (mobile) https://github.com/prehensilecode
ROSE: Spark + R on the JVM, now available.
Hi all, I'd like to share news of the recent release of a new Spark package, [ROSE](http://spark-packages.org/package/onetapbeyond/opencpu-spark-executor). ROSE is a Scala library offering access to the full scientific computing power of the R programming language to Apache Spark batch and streaming applications on the JVM. Where Apache SparkR lets data scientists use Spark from R, ROSE is designed to let Scala and Java developers use R from Spark. The project is available and documented [on GitHub](https://github.com/onetapbeyond/opencpu-spark-executor) and I would encourage you to [take a look](https://github.com/onetapbeyond/opencpu-spark-executor). Any feedback, questions etc very welcome. David "All that is gold does not glitter, Not all those who wander are lost."
Re: ROSE: Spark + R on the JVM.
Hi Corey, > Would you mind providing a link to the github? Sure, here is the github link you're looking for: https://github.com/onetapbeyond/opencpu-spark-executor David "All that is gold does not glitter, Not all those who wander are lost." Original Message Subject: Re: ROSE: Spark + R on the JVM. Local Time: January 12 2016 12:32 pm UTC Time: January 12 2016 5:32 pm From: cjno...@gmail.com To: themarchoffo...@protonmail.com CC: user@spark.apache.org,d...@spark.apache.org David, Thank you very much for announcing this! It looks like it could be very useful. Would you mind providing a link to the github? On Tue, Jan 12, 2016 at 10:03 AM, David wrote: Hi all, I'd like to share news of the recent release of a new Spark package, ROSE. ROSE is a Scala library offering access to the full scientific computing power of the R programming language to Apache Spark batch and streaming applications on the JVM. Where Apache SparkR lets data scientists use Spark from R, ROSE is designed to let Scala and Java developers use R from Spark. The project is available and documented on GitHub and I would encourage you to take a look. Any feedback, questions etc very welcome. David "All that is gold does not glitter, Not all those who wander are lost."
Re: ROSE: Spark + R on the JVM.
Hi Richard, > Would it be possible to access the session API from within ROSE, > to get for example the images that are generated by R / openCPU Technically it would be possible although there would be some potentially significant runtime costs per task in doing so, primarily those related to extracting image data from the R session, serializing and then moving that data across the cluster for each and every image. From a design perspective ROSE was intended to be used within Spark scale applications where R object data was seen as the primary task output. An output in a format that could be rapidly serialized and easily processed. Are there real world use cases where Spark scale applications capable of generating 10k, 100k, or even millions of image files would actually need to capture and store images? If so, how practically speaking, would these images ever be used? I'm just not sure. Maybe you could describe your own use case to provide some insights? > and the logging to stdout that is logged by R? If you are referring to the R console output (generated within the R session during the execution of an OCPUTask) then this data could certainly (optionally) be captured and returned on an OCPUResult. Again, can you provide any details for how you might use this console output in a real world application? As an aside, for simple standalone Spark applications that will only ever run on a single host (no cluster) you could consider using an alternative library called fluent-r. This library is also available under my GitHub repo, [see here](https://github.com/onetapbeyond/fluent-r). The fluent-r library already has support for the retrieval of R objects, R console output and R graphics device image/plots. However it is not as lightweight as ROSE and it not designed to work in a clustered environment. ROSE on the other hand is designed for scale. David "All that is gold does not glitter, Not all those who wander are lost." Original Message Subject: Re: ROSE: Spark + R on the JVM. Local Time: January 12 2016 6:56 pm UTC Time: January 12 2016 11:56 pm From: rsiebel...@gmail.com To: m...@vijaykiran.com CC: cjno...@gmail.com,themarchoffo...@protonmail.com,user@spark.apache.org,d...@spark.apache.org Hi, this looks great and seems to be very usable. Would it be possible to access the session API from within ROSE, to get for example the images that are generated by R / openCPU and the logging to stdout that is logged by R? thanks in advance, Richard On Tue, Jan 12, 2016 at 10:16 PM, Vijay Kiran wrote: I think it would be this: https://github.com/onetapbeyond/opencpu-spark-executor > On 12 Jan 2016, at 18:32, Corey Nolet wrote: > > David, > > Thank you very much for announcing this! It looks like it could be very > useful. Would you mind providing a link to the github? > > On Tue, Jan 12, 2016 at 10:03 AM, David > wrote: > Hi all, > > I'd like to share news of the recent release of a new Spark package, ROSE. > > ROSE is a Scala library offering access to the full scientific computing > power of the R programming language to Apache Spark batch and streaming > applications on the JVM. Where Apache SparkR lets data scientists use Spark > from R, ROSE is designed to let Scala and Java developers use R from Spark. > > The project is available and documented on GitHub and I would encourage you > to take a look. Any feedback, questions etc very welcome. > > David > > "All that is gold does not glitter, Not all those who wander are lost." > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ROSE: Spark + R on the JVM.
Hi Richard, Thanks for providing the background on your application. > the user types or copy-pastes his R code, > the system should then send this R code (using ROSE) to R Unfortunately this type of ad hoc R analysis is not supported. ROSE supports the execution of any R function or script within an existing R package on CRAN, Bioc, or github. It does not support the direct execution of arbitrary blocks of R code as you described. You may want to look at [DeployR](http://deployr.revolutionanalytics.com/), it's an open source R integration server that provides APIs in Java, JavaScript and .NET that can easily support your use case. The outputs of your DeployR integration could then become inputs to your data processing system. David "All that is gold does not glitter, Not all those who wander are lost." Original Message Subject: Re: ROSE: Spark + R on the JVM. Local Time: January 13 2016 3:18 am UTC Time: January 13 2016 8:18 am From: rsiebel...@gmail.com To: themarchoffo...@protonmail.com CC: m...@vijaykiran.com,cjno...@gmail.com,user@spark.apache.org,d...@spark.apache.org Hi David, the use case is that we're building a data processing system with an intuitive user interface where Spark is used as the data processing framework. We would like to provide a HTML user interface to R where the user types or copy-pastes his R code, the system should then send this R code (using ROSE) to R, process it and give the results back to the user. The RDD would be used so that the data can be further processed by the system but we would like to also show or be able to show the messages printed to STDOUT and also the images (plots) that are generated by R. The plots seems to be available in the OpenCPU API, see below Inline image 1 So the case is not that we're trying to process millions of images but rather that we would like to show the generated plots (like a regression plot) that's generated in R to the user. There could be several plots generated by the code, but certainly not thousands or even hundreds, only a few. Hope that this would be possible using ROSE because it seems a really good fit, thanks in advance, Richard On Wed, Jan 13, 2016 at 3:39 AM, David Russell wrote: Hi Richard, > Would it be possible to access the session API from within ROSE, > to get for example the images that are generated by R / openCPU Technically it would be possible although there would be some potentially significant runtime costs per task in doing so, primarily those related to extracting image data from the R session, serializing and then moving that data across the cluster for each and every image. From a design perspective ROSE was intended to be used within Spark scale applications where R object data was seen as the primary task output. An output in a format that could be rapidly serialized and easily processed. Are there real world use cases where Spark scale applications capable of generating 10k, 100k, or even millions of image files would actually need to capture and store images? If so, how practically speaking, would these images ever be used? I'm just not sure. Maybe you could describe your own use case to provide some insights? > and the logging to stdout that is logged by R? If you are referring to the R console output (generated within the R session during the execution of an OCPUTask) then this data could certainly (optionally) be captured and returned on an OCPUResult. Again, can you provide any details for how you might use this console output in a real world application? As an aside, for simple standalone Spark applications that will only ever run on a single host (no cluster) you could consider using an alternative library called fluent-r. This library is also available under my GitHub repo, [see here](https://github.com/onetapbeyond/fluent-r). The fluent-r library already has support for the retrieval of R objects, R console output and R graphics device image/plots. However it is not as lightweight as ROSE and it not designed to work in a clustered environment. ROSE on the other hand is designed for scale. David "All that is gold does not glitter, Not all those who wander are lost." Original Message Subject: Re: ROSE: Spark + R on the JVM. Local Time: January 12 2016 6:56 pm UTC Time: January 12 2016 11:56 pm From: rsiebel...@gmail.com To: m...@vijaykiran.com CC: cjno...@gmail.com,themarchoffo...@protonmail.com,user@spark.apache.org,d...@spark.apache.org Hi, this looks great and seems to be very usable. Would it be possible to access the session API from within ROSE, to get for example the images that are generated by R / openCPU and the logging to stdout that is logged by R? thanks in advance, Richard On Tue, Jan 12, 2016 at 10:16 PM, Vijay Kiran wrote: I think it would be this: https://github.com/onetapbeyond/opencpu-sp
Re: Kafka Streaming and partitioning
Yep that's exactly what we want. Thanks for all the info Cody. Dave. On 13 Jan 2016 18:29, "Cody Koeninger" wrote: > The idea here is that the custom partitioner shouldn't actually get used > for repartitioning the kafka stream (because that would involve a shuffle, > which is what you're trying to avoid). You're just assigning a partitioner > because you know how it already is partitioned. > > > On Wed, Jan 13, 2016 at 11:22 AM, Dave wrote: > >> So for case 1 below >> - subclass or modify the direct stream and kafkardd. They're private, so >> you'd need to rebuild just the external kafka project, not all of spark >> When the data is read from Kafka it will be partitioned correctly with >> the Custom Partitioner passed in to the new direct stream and kafka RDD >> implementations. >> >> For case 2 >> - write a wrapper subclass of rdd that takes a given custom partitioner >> and rdd in the constructor, overrides partitioner, and delegates every >> other method to the wrapped rdd. This should be possible without >> modification to any existing spark code. You'd use it something like >> Am I correct in saying that the data from Kafka will not be read into >> memory in the cluster (kafka server is not located on the Spark Cluster in >> my use case) until the following code is executed >> stream.transform { rdd => >> val wrapped = YourWrapper(cp, rdd) >> wrapped.join(reference) >> } >> In which case it will run through the partitioner of the wrapped RDD when >> it arrives in the cluster for the first time i.e. no shuffle. >> >> Thanks, >> Dave. >> >> >> >> On 13/01/16 17:00, Cody Koeninger wrote: >> >> In the case here of a kafkaRDD, the data doesn't reside on the cluster, >> it's not cached by default. If you're running kafka on the same nodes as >> spark, then data locality would play a factor, but that should be handled >> by the existing getPreferredLocations method. >> >> On Wed, Jan 13, 2016 at 10:46 AM, Dave wrote: >> >>> Thanks Cody, appreciate the response. >>> >>> With this pattern the partitioners will now match when the join is >>> executed. >>> However, does the wrapper RDD not need to set the partition meta data on >>> the wrapped RDD in order to allow Spark to know where the data for each >>> partition resides in the cluster. >>> >>> Thanks, >>> Dave. >>> >>> >>> On 13/01/16 16:21, Cody Koeninger wrote: >>> >>> If two rdds have an identical partitioner, joining should not involve a >>> shuffle. >>> >>> You should be able to override the partitioner without calling >>> partitionBy. >>> >>> Two ways I can think of to do this: >>> - subclass or modify the direct stream and kafkardd. They're private, >>> so you'd need to rebuild just the external kafka project, not all of spark >>> >>> - write a wrapper subclass of rdd that takes a given custom partitioner >>> and rdd in the constructor, overrides partitioner, and delegates every >>> other method to the wrapped rdd. This should be possible without >>> modification to any existing spark code. You'd use it something like >>> >>> val cp = YourCustomPartitioner(...) >>> val reference = YourReferenceRDD(cp, ...) >>> val stream = KafkaUtils >>> >>> stream.transform { rdd => >>> val wrapped = YourWrapper(cp, rdd) >>> wrapped.join(reference) >>> } >>> >>> >>> I haven't had reason to do either one of those approaches, so YMMV, but >>> I believe others have >>> >>> >>> >>> >>> On Wed, Jan 13, 2016 at 3:40 AM, ddav < >>> dave.davo...@gmail.com> wrote: >>> Hi, I have the following use case: 1. Reference data stored in an RDD that is persisted and partitioned using a simple custom partitioner. 2. Input stream from kafka that uses the same partitioner algorithm as the ref data RDD - this partitioning is done in kafka. I am using kafka direct streams so the number of kafka partitions map to the number of partitions in the spark RDD. From testing and the documentation I see Spark does not know anything about how the data has been partitioned in kafka. In my use case I need to join the reference data RDD and the input stream RDD. Due to the fact I have manually ensured the incoming data from kafka uses the same partitioning algorithm I know the data has been grouped correctly in the input stream RDD in Spark but I cannot do a join without a shuffle step due to the fact Spark has no knowledge of how the data has been partitioned. I have two ways to do this. 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD followed by a join. This results in a shuffle of the input stream RDD and then the co-partitioned join to take place. 2. Call join on the reference data RDD passing in the input stream RDD. Spark will do a shuffle under the hood in this case and the join will take place. The join will do its best to run on a node that has l
Re: rdd.foreach return value
The foreach operation on RDD has a void (Unit) return type. See attached. So there is no return value to the driver. David "All that is gold does not glitter, Not all those who wander are lost." Original Message Subject: rdd.foreach return value Local Time: January 18 2016 10:34 pm UTC Time: January 19 2016 3:34 am From: charles.up...@gmail.com To: user@spark.apache.org code snippet the 'print' actually print info on the worker node, but I feel confused where the 'return' value goes to. for I get nothing on the driver node. -- -- a spark lover, a quant, a developer and a good man. http://github.com/litaotao foreach.png Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLlib OneVsRest causing intermittent exceptions
Hi, I've run into an exception using MLlib OneVsRest with logistic regression (v1.6.0, but also in previous versions). The issue is intermittent. When running multiclass classification with K-fold cross validation, there are scenarios where the split does not contain instances for every target label. In such cases, an ArrayIndexOutOfBoundsException is generated. I've tried to reproduce the problem in a simple SBT project here: https://github.com/junglebarry/SparkOneVsRestTest I don't imagine this is typical - it first surfaced when running over a dataset with some very rare classes. I'm happy to look into patching the code, but I first wanted to confirm that the problem was real, and that I wasn't somehow misunderstanding how I should be using OneVsRest. Any guidance would be appreciated - I'm new to the list. Many thanks, David
Re: MLlib OneVsRest causing intermittent exceptions
Hi Ram, I didn't include an explicit label column in my reproduction as I thought it superfluous. However, in my original use-case, I was using a StringIndexer, where the labels were indexed across the entire dataset (training+validation+test). The (indexed) label column was then explicitly provided to the OneVsRest instance. Here's the abridged version: val textDocuments = ??? // real data here // Index labels, adding metadata to the label column. // Fit on whole dataset to include all labels in index. val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("labelIndexed") .fit(textDocuments) val lrClassifier = new LogisticRegression() val classifier = new OneVsRest() .setClassifier(lrClassifier) .setLabelCol(labelIndexer.getOutputCol) // ... There's an explicit reference to the label column, and when created, that column contains all possible values of the label (it's `fit` over all data). It looks to me like StringIndexer computes label metadata at that point (in `transform`) and attaches it to the column. This way, I'd hope that even once TrainValidationSplit returns a subset dataframe - which may not contain all labels - the metadata on the column should still contain all labels. Does my use of StringIndexer count as "metadata", here? If so, I still see the exception as before. I've pushed a new example using StringIndexer to my earlier repo, so you can see the code and issue. I'm happy to try a simpler method for providing column metadata, if one is available. Thanks, David On Mon, Jan 25, 2016 at 11:13 PM Ram Sriharsha wrote: > Hi David > > What happens if you provide the class labels via metadata instead of > letting OneVsRest determine the labels? > > Ram > > On Mon, Jan 25, 2016 at 3:06 PM, David Brooks wrote: > >> Hi, >> >> I've run into an exception using MLlib OneVsRest with logistic regression >> (v1.6.0, but also in previous versions). >> >> The issue is intermittent. When running multiclass classification with >> K-fold cross validation, there are scenarios where the split does not >> contain instances for every target label. In such cases, an >> ArrayIndexOutOfBoundsException is generated. >> >> I've tried to reproduce the problem in a simple SBT project here: >> >>https://github.com/junglebarry/SparkOneVsRestTest >> >> I don't imagine this is typical - it first surfaced when running over a >> dataset with some very rare classes. >> >> I'm happy to look into patching the code, but I first wanted to confirm >> that the problem was real, and that I wasn't somehow misunderstanding how I >> should be using OneVsRest. >> >> Any guidance would be appreciated - I'm new to the list. >> >> Many thanks, >> David >> > > > > -- > Ram Sriharsha > Architect, Spark and Data Science > Hortonworks, 2550 Great America Way, 2nd Floor > Santa Clara, CA 95054 > Ph: 408-510-8635 > email: har...@apache.org > > [image: https://www.linkedin.com/in/harsha340] > <https://www.linkedin.com/in/harsha340> <https://twitter.com/halfabrane> > <https://github.com/harsha2010/> > >
Re: MLlib OneVsRest causing intermittent exceptions
Hi again Ram, Sorry, I was too hasty in my previous response. I've done a bit more digging through the code, and StringIndexer does indeed provide metadata, as a NominalAttribute with a known number of class labels. I don't think the issue is related to the use of metadata, however. It seems to me to be caused by the interaction between OneVsRest and TrainValidationSplit. For rare target classes under OneVsRest, it seems quite possible for this random-split approach to select a training subset where all items belong to non-target classes - all of which are given the same class label by OneVsRest. In this case, we start training LogisticRegression on data of a single class, which seems odd. The exception stems from there. The cause looks to me to be that OneVsRest.fit runs binary classifications from 0 to numClasses (OneVsRest.scala:209), and this seems incompatible with the random split, which cannot guarantee training examples for all labels in the range. It might be preferable to iterate over the observed labels in the training set, rather than all labels in the range. I don't know the performance effects of that change, but it does look incompatible with using the label metadata as a shortcut. Do you agree that there is an issue here? Would you accept contributions to the code to remedy it? I'd gladly take a look if I can be of help. Many thanks, David On Tue, Jan 26, 2016 at 1:29 PM David Brooks wrote: > Hi Ram, > > I didn't include an explicit label column in my reproduction as I thought > it superfluous. However, in my original use-case, I was using a > StringIndexer, where the labels were indexed across the entire dataset > (training+validation+test). The (indexed) label column was then explicitly > provided to the OneVsRest instance. > > Here's the abridged version: > > val textDocuments = ??? // real data here > > // Index labels, adding metadata to the label column. > // Fit on whole dataset to include all labels in index. > val labelIndexer = new StringIndexer() > .setInputCol("label") > .setOutputCol("labelIndexed") > .fit(textDocuments) > > val lrClassifier = new LogisticRegression() > > val classifier = new OneVsRest() > .setClassifier(lrClassifier) > .setLabelCol(labelIndexer.getOutputCol) > > // ... > > > There's an explicit reference to the label column, and when created, that > column contains all possible values of the label (it's `fit` over all > data). It looks to me like StringIndexer computes label metadata at that > point (in `transform`) and attaches it to the column. This way, I'd hope > that even once TrainValidationSplit returns a subset dataframe - which > may not contain all labels - the metadata on the column should still > contain all labels. > > Does my use of StringIndexer count as "metadata", here? If so, I still > see the exception as before. > > I've pushed a new example using StringIndexer to my earlier repo, so you > can see the code and issue. I'm happy to try a simpler method for > providing column metadata, if one is available. > > Thanks, > David > > On Mon, Jan 25, 2016 at 11:13 PM Ram Sriharsha > wrote: > >> Hi David >> >> What happens if you provide the class labels via metadata instead of >> letting OneVsRest determine the labels? >> >> Ram >> >> On Mon, Jan 25, 2016 at 3:06 PM, David Brooks wrote: >> >>> Hi, >>> >>> I've run into an exception using MLlib OneVsRest with logistic >>> regression (v1.6.0, but also in previous versions). >>> >>> The issue is intermittent. When running multiclass classification with >>> K-fold cross validation, there are scenarios where the split does not >>> contain instances for every target label. In such cases, an >>> ArrayIndexOutOfBoundsException is generated. >>> >>> I've tried to reproduce the problem in a simple SBT project here: >>> >>>https://github.com/junglebarry/SparkOneVsRestTest >>> >>> I don't imagine this is typical - it first surfaced when running over a >>> dataset with some very rare classes. >>> >>> I'm happy to look into patching the code, but I first wanted to confirm >>> that the problem was real, and that I wasn't somehow misunderstanding how I >>> should be using OneVsRest. >>> >>> Any guidance would be appreciated - I'm new to the list. >>> >>> Many thanks, >>> David >>> >> >> >> >> -- >> Ram Sriharsha >> Architect, Spark and Data Science >> Hortonworks, 2550 Great America Way, 2nd Floor >> Santa Clara, CA 95054 >> Ph: 408-510-8635 >> email: har...@apache.org >> >> [image: https://www.linkedin.com/in/harsha340] >> <https://www.linkedin.com/in/harsha340> <https://twitter.com/halfabrane> >> <https://github.com/harsha2010/> >> >>
Re: MLlib OneVsRest causing intermittent exceptions
Hi Ram, Joseph, That's right, but I will clarify: (a) a random split can generate a training set that does not contain some rare class (b) when LogisticRegression is run over a dataframe where all instances have the same class label, it throws an ArrayIndexOutOfBoundsException. When (a) occurs, (b) is the consequence. The rare class is missing from the training set, so you would not expect OneVsRest to train a binary classifier on it; however, because OneVsRest trains binary classifiers on all class labels in the range (0 to numClasses), it *will* train a binary classifier on the missing class, which leads to the exception from (b). A concrete example: - class labels 0, 1, 2, 3 are present in dataset (*numClasses* = 4); - 0, 2, 3 are in the training set after random split (no *1*); - The range (0 to 4) is used to train binary classifiers on each of 0, *1*, 2, 3 - As soon as the classifier is trained on *1*, the exception is thrown I'd suggest: 1. In LogisticRegression, where numClasses == 1, thrown a more meaningful validation exception (avoiding the more cryptic ArrayIndexOutOfBoundsException) 2. Only run OneVsRest for class labels that appear in the dataframe, rather than all labels in the Range(0, numClasses). I created a few simple test cases for running from SBT, like this one <https://github.com/junglebarry/SparkOneVsRestTest/blob/master/src/main/scala/SparkOneVsRestTest_2_Errors.scala>, but I've turned them into gists now for spark-shell: - LogisticRegression throwing ArrayIndexOutOfBoundsException <https://gist.github.com/junglebarry/a7cedce6eaf978d7b9ee> - OneVsRest throwing ArrayIndexOutOfBoundsException <https://gist.github.com/junglebarry/66234edfebaad6254ebe> (with a simulated missing class from a Range) - OneVsRest throwing ArrayIndexOutOfBoundsException with random split <https://gist.github.com/junglebarry/6073aa474d89f3322063>. Only exceptions in 2/3 of cases, due to randomness. If these look good as test cases, I'll take a look at filing JIRAs and getting patches tomorrow morning. It's late here! Thanks for the swift response, David On Tue, Jan 26, 2016 at 11:09 PM Ram Sriharsha wrote: > Hi David > > If I am reading the email right, there are two problems here right? > a) for rare classes the random split will likely miss the rare class. > b) if it misses the rare class an exception is thrown > > I thought the exception stems from b), is that right?... i wouldn't expect > an exception to be thrown in the case the training dataset is missing the > rare class. > could you reproduce this in a simple snippet of code that we can quickly > test on the shell? > > > > > On Tue, Jan 26, 2016 at 3:02 PM, Ram Sriharsha > wrote: > >> Hey David, Yeah absolutely!, feel free to create a JIRA and attach your >> patch to it. We can help review it and pull in the fix... happy to accept >> contributions! >> ccing Joseph who is one of the maintainers of MLLib as well.. when >> creating the JIRA can you attach a simple test case? >> >> On Tue, Jan 26, 2016 at 2:59 PM, David Brooks wrote: >> >>> Hi again Ram, >>> >>> Sorry, I was too hasty in my previous response. I've done a bit more >>> digging through the code, and StringIndexer does indeed provide metadata, >>> as a NominalAttribute with a known number of class labels. I don't think >>> the issue is related to the use of metadata, however. >>> >>> It seems to me to be caused by the interaction between OneVsRest and >>> TrainValidationSplit. For rare target classes under OneVsRest, it seems >>> quite possible for this random-split approach to select a training subset >>> where all items belong to non-target classes - all of which are given the >>> same class label by OneVsRest. In this case, we start training >>> LogisticRegression on data of a single class, which seems odd. The >>> exception stems from there. >>> >>> The cause looks to me to be that OneVsRest.fit runs binary >>> classifications from 0 to numClasses (OneVsRest.scala:209), and this seems >>> incompatible with the random split, which cannot guarantee training >>> examples for all labels in the range. It might be preferable to iterate >>> over the observed labels in the training set, rather than all labels in the >>> range. I don't know the performance effects of that change, but it does >>> look incompatible with using the label metadata as a shortcut. >>> >>> Do you agree that there is an issue here? Would you accept >>> contributions to the code to remedy it? I'd gladly take a look if I can be >>> of hel
Re: MLlib OneVsRest causing intermittent exceptions
Hi Ram, Yes, I complete agree. An exception is poor way to handle this case, and training on a dataset of zero labels and no one labels should simply work without exceptions. Fortunately, it looks like someone else has recently patched the problem with LogisticRegression: https://github.com/apache/spark/commit/2388de51912efccaceeb663ac56fc500a79d2ceb This should resolve the issue I'm experiencing. I'll get hold of a build from source and try it out. Thanks for all your help! David On Wed, Jan 27, 2016 at 12:51 AM Ram Sriharsha wrote: > btw, OneVsRest is using the labels in the dataset that is fed to the fit > method, in case the metadata is missing. > So if the metadata contains a label, we expect that label to be present in > the dataset passed to the fit method. > If you want OneVsRest to compute the labels you can leave the label > metadata empty in which case we first compute the # of > labels in the training dataset. > > If the training dataset contains a given label, then logistic regression > should work fine regardless of the rarity of that label (performance might > be bad but it won't throw an exception afaik) > > if the training dataset does not contain a given label but the metadata > does, then we do end up training classifiers which will never see that > label. > But even here, what gets passed to the underlying classifier is a dataset > with only say zero labels and no one labels. > A classifier should be able to handle this... but if it cannot for some > reason, we can have a check in OneVsRest that doesn't train that classifier > > On Tue, Jan 26, 2016 at 4:33 PM, Ram Sriharsha > wrote: > >> Hey David >> >> In your scenario, OneVsRest is training a classifier for 1 vs not 1... >> and the input dataset for fit (or train) has labeled data for label 1 >> >> But the underlying binary classifier (LogisticRegression) uses sampling >> to determine the subset of data to sample during each iteration and it is >> possible that this sample does not include any examples with label 1 (ie >> numClasses = 1) >> >> So the examples it selects in that iteration only include 0 labeled data >> and nothing with label 1. >> >> But why should it throw an exception? if it does, then i would think we >> need to fix the issue in the underlying algorithm instead of the >> reduction somehow knowing that the binary classifier is sampling from the >> training dataset. >> >> Or am I misunderstanding the issue here? >> >> I'll take a look at the gist you linked when i get a chance , thanks! >> >> Ram >> >> On Tue, Jan 26, 2016 at 4:06 PM, David Brooks wrote: >> >>> Hi Ram, Joseph, >>> >>> That's right, but I will clarify: >>> >>> (a) a random split can generate a training set that does not contain >>> some rare class >>> (b) when LogisticRegression is run over a dataframe where all instances >>> have the same class label, it throws an ArrayIndexOutOfBoundsException. >>> >>> When (a) occurs, (b) is the consequence. The rare class is missing from >>> the training set, so you would not expect OneVsRest to train a binary >>> classifier on it; however, because OneVsRest trains binary classifiers on >>> all class labels in the range (0 to numClasses), it *will* train a >>> binary classifier on the missing class, which leads to the exception from >>> (b). >>> >>> A concrete example: >>> >>>- class labels 0, 1, 2, 3 are present in dataset (*numClasses* = 4); >>>- 0, 2, 3 are in the training set after random split (no *1*); >>>- The range (0 to 4) is used to train binary classifiers on each of >>>0, *1*, 2, 3 >>>- As soon as the classifier is trained on *1*, the exception is >>>thrown >>> >>> I'd suggest: >>> >>>1. In LogisticRegression, where numClasses == 1, thrown a more >>>meaningful validation exception (avoiding the more cryptic >>>ArrayIndexOutOfBoundsException) >>>2. Only run OneVsRest for class labels that appear in the dataframe, >>>rather than all labels in the Range(0, numClasses). >>> >>> I created a few simple test cases for running from SBT, like this one >>> <https://github.com/junglebarry/SparkOneVsRestTest/blob/master/src/main/scala/SparkOneVsRestTest_2_Errors.scala>, >>> but I've turned them into gists now for spark-shell: >>> >>>- LogisticRegression throwing ArrayIndexOutOfBoundsException >>><https://gist.github.com/junglebarry/
[ANNOUNCE] New SAMBA Package = Spark + AWS Lambda
Hi all, Just sharing news of the release of a newly available Spark package, SAMBA <https://github.com/onetapbeyond/lambda-spark-executor>. <http://spark-packages.org/package/onetapbeyond/opencpu-spark-executor> https://github.com/onetapbeyond/lambda-spark-executor SAMBA is an Apache Spark package offering seamless integration with the AWS Lambda <https://aws.amazon.com/lambda/> compute service for Spark batch and streaming applications on the JVM. Within traditional Spark deployments RDD tasks are executed using fixed compute resources on worker nodes within the Spark cluster. With SAMBA, application developers can delegate selected RDD tasks to execute using on-demand AWS Lambda compute infrastructure in the cloud. Not unlike the recently released ROSE <https://github.com/onetapbeyond/opencpu-spark-executor> package that extends the capabilities of traditional Spark applications with support for CRAN R analytics, SAMBA provides another (hopefully) useful extension for Spark application developers on the JVM. SAMBA Spark Package: https://github.com/onetapbeyond/lambda-spark-executor <https://github.com/onetapbeyond/lambda-spark-executor> ROSE Spark Package: https://github.com/onetapbeyond/opencpu-spark-executor <https://github.com/onetapbeyond/opencpu-spark-executor> Questions, suggestions, feedback welcome. David -- "*All that is gold does not glitter,** Not all those who wander are lost."*
Re: Guidelines for writing SPARK packages
Hi Praveen, The basic requirements for releasing a Spark package on spark-packages.org are as follows: 1. The package content must be hosted by GitHub in a public repo under the owner's account. 2. The repo name must match the package name. 3. The master branch of the repo must contain "README.md" and "LICENSE". Per the doc on spark-packages.org site an example package that meets those requirements can be found at https://github.com/databricks/spark-avro. My own recently released SAMBA package also meets these requirements: https://github.com/onetapbeyond/lambda-spark-executor. As you can see there is nothing in this list of requirements that demands the implementation of specific interfaces. What you'll need to implement will depend entirely on what you want to accomplish. If you want to register a release for your package you will also need to push the artifacts for your package to Maven central. David On Mon, Feb 1, 2016 at 7:03 AM, Praveen Devarao wrote: > Hi, > > Is there any guidelines or specs to write a Spark package? I would > like to implement a spark package and would like to know the way it needs to > be structured (implement some interfaces etc) so that it can plug into Spark > for extended functionality. > > Could any one help me point to docs or links on the above? > > Thanking You > > Praveen Devarao -- "All that is gold does not glitter, Not all those who wander are lost." - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [ANNOUNCE] New SAMBA Package = Spark + AWS Lambda
Hi Ben, > My company uses Lamba to do simple data moving and processing using python > scripts. I can see using Spark instead for the data processing would make it > into a real production level platform. That may be true. Spark has first class support for Python which should make your life easier if you do go this route. Once you've fleshed out your ideas I'm sure folks on this mailing list can provide helpful guidance based on their real world experience with Spark. > Does this pave the way into replacing > the need of a pre-instantiated cluster in AWS or bought hardware in a > datacenter? In a word, no. SAMBA is designed to extend-not-replace the traditional Spark computation and deployment model. At it's most basic, the traditional Spark computation model distributes data and computations across worker nodes in the cluster. SAMBA simply allows some of those computations to be performed by AWS Lambda rather than locally on your worker nodes. There are I believe a number of potential benefits to using SAMBA in some circumstances: 1. It can help reduce some of the workload on your Spark cluster by moving that workload onto AWS Lambda, an infrastructure on-demand compute service. 2. It allows Spark applications written in Java or Scala to make use of libraries and features offered by Python and JavaScript (Node.js) today, and potentially, more libraries and features offered by additional languages in the future as AWS Lambda language support evolves. 3. It provides a simple, clean API for integration with REST APIs that may be a benefit to Spark applications that form part of a broader data pipeline or solution. > If so, then this would be a great efficiency and make an easier > entry point for Spark usage. I hope the vision is to get rid of all cluster > management when using Spark. You might find one of the hosted Spark platform solutions such as Databricks or Amazon EMR that handle cluster management for you a good place to start. At least in my experience, they got me up and running without difficulty. David - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
add kafka streaming jars when initialising the sparkcontext in python
I have no problems when submitting the task using spark-submit. The --jars option with the list of jars required is successful and I see in the output the jars being added: 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/usr/lib/spark/extras/lib/spark-streaming-kafka.jar at http://192.168.10.4:33820/jars/spark-streaming-kafka.jar with timestamp 1455102864058 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/opt/kafka/libs/scala-library-2.10.1.jar at http://192.168.10.4:33820/jars/scala-library-2.10.1.jar with timestamp 1455102864077 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/opt/kafka/libs/kafka_2.10-0.8.1.1.jar at http://192.168.10.4:33820/jars/kafka_2.10-0.8.1.1.jar with timestamp 1455102864085 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/opt/kafka/libs/metrics-core-2.2.0.jar at http://192.168.10.4:33820/jars/metrics-core-2.2.0.jar with timestamp 1455102864086 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/usr/share/java/mysql.jar at http://192.168.10.4:33820/jars/mysql.jar with timestamp 1455102864090 But when I try to programmatically create a context in python (I want to set up some tests) I don't see this and I end up with class not found errors. Trying to cover all bases even though I suspect that it's redundant when running local I've tried: spark_conf = SparkConf() spark_conf.setMaster('local[4]') spark_conf.set('spark.executor.extraLibraryPath', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') spark_conf.set('spark.executor.extraClassPath', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') spark_conf.set('spark.driver.extraClassPath', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') spark_conf.set('spark.driver.extraLibraryPath', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') self.spark_context = SparkContext(conf=spark_conf) But still I get the same failure to find the same class: Py4JJavaError: An error occurred while calling o30.loadClass. : java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper The class is certainly in the spark_streaming_kafka.jar and is present in the filesystem at that location. I'm under the impression that were I using java I'd be able to use the addJars method on the conf to take care of this but there doesn't appear to be anything that corresponds for python. Hacking about I found that adding: spark_conf.set('spark.jars', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') got the logging to admit to adding the jars to the http server (just as for the spark submit output above) but leaving the other config options in place or removing them the class is still not found. Is this not possible in python? Incidentally, I have tried SPARK_CLASSPATH (getting the message that it's deprecated and ignored anyway) and I cannot find anything else to try. Can anybody help? David K.
How to add kafka streaming jars when initialising the sparkcontext in python
I have no problems when submitting the task using spark-submit. The --jars option with the list of jars required is successful and I see in the output the jars being added: 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/usr/lib/spark/extras/lib/spark-streaming-kafka.jar at http://192.168.10.4:33820/jars/spark-streaming-kafka.jar with timestamp 1455102864058 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/opt/kafka/libs/scala-library-2.10.1.jar at http://192.168.10.4:33820/jars/scala-library-2.10.1.jar with timestamp 1455102864077 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/opt/kafka/libs/kafka_2.10-0.8.1.1.jar at http://192.168.10.4:33820/jars/kafka_2.10-0.8.1.1.jar with timestamp 1455102864085 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/opt/kafka/libs/metrics-core-2.2.0.jar at http://192.168.10.4:33820/jars/metrics-core-2.2.0.jar with timestamp 1455102864086 16/02/10 11:14:24 INFO spark.SparkContext: Added JAR file:/usr/share/java/mysql.jar at http://192.168.10.4:33820/jars/mysql.jar with timestamp 1455102864090 But when I try to programmatically create a context in python (I want to set up some tests) I don't see this and I end up with class not found errors. Trying to cover all bases even though I suspect that it's redundant when running local I've tried: spark_conf = SparkConf() spark_conf.setMaster('local[4]') spark_conf.set('spark.executor.extraLibraryPath', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') spark_conf.set('spark.executor.extraClassPath', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') spark_conf.set('spark.driver.extraClassPath', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') spark_conf.set('spark.driver.extraLibraryPath', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') self.spark_context = SparkContext(conf=spark_conf) But still I get the same failure to find the same class: Py4JJavaError: An error occurred while calling o30.loadClass. : java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper The class is certainly in the spark_streaming_kafka.jar and is present in the filesystem at that location. I'm under the impression that were I using java I'd be able to use the addJars method on the conf to take care of this but there doesn't appear to be anything that corresponds for python. Hacking about I found that adding: spark_conf.set('spark.jars', '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,' '/opt/kafka/libs/scala-library-2.10.1.jar,' '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,' '/opt/kafka/libs/metrics-core-2.2.0.jar,' '/usr/share/java/mysql.jar') got the logging to admit to adding the jars to the http server (just as for the spark submit output above) but leaving the other config options in place or removing them the class is still not found. Is this not possible in python? Incidentally, I have tried SPARK_CLASSPATH (getting the message that it's deprecated and ignored anyway) and I cannot find anything else to try. Can anybody help? David K.
Re: How to avoid Spark shuffle spill memory?
Hi unk1102, Try adding more memory to your nodes. Are you running Spark in the cloud? If so, increase the memory on your servers. Do you have default parallelism set (spark.default.parallelism)? If so, unset it, and let Spark decided how many partitions to allocate. You can also try refactoring your code to make is use less memory. David On Tue, Oct 6, 2015 at 3:19 PM, unk1102 wrote: > Hi I have a Spark job which runs for around 4 hours and it shared > SparkContext and runs many child jobs. When I see each job in UI I see > shuffle spill of around 30 to 40 GB and because of that many times > executors > gets lost because of using physical memory beyond limits how do I avoid > shuffle spill? I have tried almost all optimisations nothing is helping I > dont cache anything I am using Spark 1.4.1 and also using tungsten,codegen > etc I am using spark.shuffle.storage as 0.2 and spark.storage.memory as > 0.2 > I tried to increase shuffle memory to 0.6 but then it halts in GC pause > causing my executor to timeout and then getting lost eventually. > > Please guide. Thanks in advance. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-Spark-shuffle-spill-memory-tp24960.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- ### Confidential e-mail, for recipient's (or recipients') eyes only, not for distribution. ###
Install via directions in "Learning Spark". Exception when running bin/pyspark
Greetings all, Excited to be learning spark. I am working through the "Learning Spark" book and I am having trouble getting Spark installed and running. This is what I have done so far. I installed Spark from here: http://spark.apache.org/downloads.html selecting 1.5.1, prebuilt for hadoop 2.6 and later, direct download. I untared the download cd downloads tar -xf spark-1.5.1-bin-hadoop2.6.tgz cd spark-1.5.1-bin-hadoop2.6 Next I try running a shell, the example in the book claims we can run in local mode and there should be no need to install hadoop / yarn / mesos or anything else to get started. I have tried the following commands ./bin/pyspark bin/pyspark ./bin/spark-shell bin/spark-shell I am getting an error as follows: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/launcher/Main Caused by: java.lang.ClassNotFoundException: org.apache.spark.launcher.Main at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) About my system. I have a macbook pro OS X Yosemite 10.10.5 I just downloaded and installed the latest Java from Oracle website, I believe this was java8u60 I double checked my python version and it appears to be 2.7.10 I am familiar with command line, and have background in hadoop, but this has me stumped. Thanks in advance, David Bess -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Install-via-directions-in-Learning-Spark-Exception-when-running-bin-pyspark-tp25043.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Install via directions in "Learning Spark". Exception when running bin/pyspark
Got it working! Thank you for confirming my suspicion that this issue was related to Java. When I dug deeper I found multiple versions and some other issues. I worked on it a while before deciding it would be easier to just uninstall all Java and reinstall clean JDK, and now it works perfectly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Install-via-directions-in-Learning-Spark-Exception-when-running-bin-pyspark-tp25043p25049.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OLAP query using spark dataframe with cassandra
; dev >> *Subject:* Re: OLAP query using spark dataframe with cassandra >> >> Is there any distributor supporting these software components in >> combination? If no and your core business is not software then you may want >> to look for something else, because it might not make sense to build up >> internal know-how in all of these areas. >> >> In any case - it depends all highly on your data and queries. You will >> have to do your own experiments. >> >> On 09 Nov 2015, at 07:02, "fightf...@163.com" wrote: >> >> Hi, community >> >> We are specially interested about this featural integration according to >> some slides from [1]. The SMACK(Spark+Mesos+Akka+Cassandra+Kafka) >> >> seems good implementation for lambda architecure in the open-source >> world, especially non-hadoop based cluster environment. As we can see, >> >> the advantages obviously consist of : >> >> 1 the feasibility and scalability of spark datafram api, which can also >> make a perfect complement for Apache Cassandra native cql feature. >> >> 2 both streaming and batch process availability using the ALL-STACK >> thing, cool. >> >> 3 we can both achieve compacity and usability for spark with cassandra, >> including seemlessly integrating with job scheduling and resource >> management. >> >> Only one concern goes to the OLAP query performance issue, which mainly >> caused by frequent aggregation work between daily increased large tables, >> for >> >> both spark sql and cassandra. I can see that the [1] use case facilitates >> FiloDB to achieve columnar storage and query performance, but we had >> nothing more >> >> knowledge. >> >> Question is : Any guy had such use case for now, especially using in your >> production environment ? Would be interested in your architeture for >> designing this >> >> OLAP engine using spark + cassandra. What do you think the comparison >> between the scenario with traditional OLAP cube design? Like Apache Kylin >> or >> >> pentaho mondrian ? >> >> Best Regards, >> >> Sun. >> >> >> [1] >> <http://www.slideshare.net/planetcassandra/cassandra-summit-2014-interactive-olap-queries-using-apache-cassandra-and-spark> >> http://www.slideshare.net/planetcassandra/cassandra-summit-2014-interactive-olap-queries-using-apache-cassandra-and-spark >> >> -- >> fightf...@163.com >> >> >> > -- David Morales de Frías :: +34 607 010 411 :: @dmoralesdf <https://twitter.com/dmoralesdf> <http://www.stratio.com/> Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd <https://twitter.com/StratioBD>*
RE: hdfs-ha on mesos - odd bug
I have verified that this error exists on my system as well, and the suggested workaround also works. Spark version: 1.5.1; 1.5.2 Mesos version: 0.21.1 CDH version: 4.7 I have set up the spark-env.sh to contain HADOOP_CONF_DIR pointing to the correct place, and I have also linked in the hdfs-site.xml file to $SPARK_HOME/conf. I agree that it should work, but it doesn't. I have also tried including the correct Hadoop configuration files in the application jar. Note: it works fine from spark-shell, but it doesn't work from spark-submit Dave -Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Tuesday, September 15, 2015 7:47 PM To: Adrian Bridgett Cc: user Subject: Re: hdfs-ha on mesos - odd bug On Mon, Sep 14, 2015 at 6:55 AM, Adrian Bridgett wrote: > 15/09/14 13:00:25 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID > 0, > 10.1.200.245): java.lang.IllegalArgumentException: > java.net.UnknownHostException: nameservice1 > at > org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) > at > org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxie > s.java:310) This looks like you're trying to connect to an HA HDFS service but you have not provided the proper hdfs-site.xml for your app; then, instead of recognizing "nameservice1" as an HA nameservice, it thinks it's an actual NN address, tries to connect to it, and fails. If you provide the correct hdfs-site.xml to your app (by placing it in $SPARK_HOME/conf or setting HADOOP_CONF_DIR to point to the conf directory), it should work. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Save GraphX to disk
A graph is nodes and vertices. What else are you expecting to save/load? You could save/load the triplets, but that is actually more work to reconstruct the graph than the nodes and vertices separately. Dave From: Gaurav Kumar [mailto:gauravkuma...@gmail.com] Sent: Friday, November 13, 2015 6:09 AM To: user@spark.apache.org Subject: Save GraphX to disk Hi, I was wondering how to save a graph to disk and load it back again. I know how to save vertices and edges to disk and construct the graph from them, not sure if there's any method to save the graph itself to disk. Best Regards, Gaurav Kumar Big Data • Data Science • Photography • Music +91 9953294125
Re: WARN LoadSnappy: Snappy native library not loaded
I ran into this recently. Turned out we had an old org-xerial-snappy.properties file in one of our conf directories that had the setting: # Disables loading Snappy-Java native library bundled in the # snappy-java-*.jar file forcing to load the Snappy-Java native # library from the java.library.path. # org.xerial.snappy.disable.bundled.libs=true When I switched that to false, it made the problem go away. May or may not be your problem of course, but worth a look. HTH, DR On 11/17/2015 05:22 PM, Andy Davidson wrote: I started a spark POC. I created a ec2 cluster on AWS using spark-c2. I have 3 slaves. In general I am running into trouble even with small work loads. I am using IPython notebooks running on my spark cluster. Everything is painfully slow. I am using the standAlone cluster manager. I noticed that I am getting the following warning on my driver console. Any idea what the problem might be? 15/11/17 22:01:59 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 15/11/17 22:03:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/11/17 22:03:05 WARN LoadSnappy: Snappy native library not loaded Here is an overview of my POS app. I have a file on hdfs containing about 5000 twitter status strings. tweetStrings = sc.textFile(dataURL) jTweets = (tweetStrings.map(lambda x: json.loads(x)).take(10)) Generated the following error ³error occurred while calling o78.partitions.: java.lang.OutOfMemoryError: GC overhead limit exceeded² Any idea what we need to do to improve new spark user¹s out of the box experience? Kind regards Andy export PYSPARK_PYTHON=python3.4 export PYSPARK_DRIVER_PYTHON=python3.4 export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN" MASTER_URL=spark://ec2-55-218-207-122.us-west-1.compute.amazonaws.com:7077 numCores=2 $SPARK_ROOT/bin/pyspark --master $MASTER_URL --total-executor-cores $numCores $* - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Passing SPARK_CONF_DIR to slaves in standalone mode under Grid Engine job
Hi, all, I am just setting up to run Spark in standalone mode, as a (Univa) Grid Engine job. I have been able to set up the appropriate environment variables such that the master launches correctly, etc. In my setup, I generate GE job-specific conf and log dirs. However, I am finding that the SPARK_* environment variables are not passed to the worker processes on different physical nodes since they are launched via ssh. I have added echo commands to sbin/start-slaves.sh and sbin/slaves.sh scripts and verified that they the appropriate SPARK_* environment variables set. Since I have a global installation of Spark, I would like not to have all Spark jobs write to $SPARK_HOME/work and logs. I know that ssh can read environment variables from ~/.ssh/environment, but if a user qsubs 2 different Spark jobs, this won't handle it right. I appreciate any suggestions. Thanks, Dave Chin
Spark-Grid Engine light integration writeup
Hello, all: I was able to get Spark 1.4.1 and 1.2.0 standalone to run within a Univa Grid Engine cluster, with some modification to the appropriate sbin scripts. My write-up is at: http://linuxfollies.blogspot.com/2015/08/apache-spark-integration-with-grid.html I'll be glad to get comments from anyone who may be doing something similar. Cheers, Dave -- David Chin, Ph.D. david.c...@drexel.eduSr. Systems Administrator, URCF, Drexel U. http://www.drexel.edu/research/urcf/ https://linuxfollies.blogspot.com/ 215.221.4747 (mobile) https://github.com/prehensilecode
Problem with take vs. takeSample in PySpark
Hi all, I am getting some strange behavior with the RDD take function in PySpark while doing some interactive coding in an IPython notebook. I am running PySpark on Spark 1.2.0 in yarn-client mode if that is relevant. I am using sc.wholeTextFiles and pandas to load a collection of .csv files into an RDD of pandas dataframes. I create an RDD called train_rdd for which each row of the RDD contains a label and pandas dataframe pair: import pandas as pd from StringIO import StringIO rdd = sc.wholeTextFiles(data_path, 1000) train_rdd = rdd.map(lambda x: x[0], pd.read_csv(StringIO(x[1] In order to test out the next steps I want to take, I am trying to use take to select one of the dataframes and apply the desired modifications before writing out the Spark code to apply it to all of the dataframes in parallel. However, when I try to use take like this: label, df = train_rdd.take(1)[0] I get a spark.driver.maxResultSize error (stack trace included at the end of this message). Now, each of these dataframes is only about 100MB in size, so should easily fit on the driver and not go over the maxResultSize limit of 1024MB. If I instead use takeSample, though, there is no problem: label, df = train_rdd.takeSample(False, 1, seed=50)[0] (Here, I have set the seed so that the RDD that is selected is the same one that the take function is trying to load (i.e., the first one), just to ensure that it is not because the specific dataframe take is getting is too large.) Does calling take result in a collect operation being performed before outputting the first item? That would explain to me why this error is occurring, but that seems like poor behavior for the take function. Clearly takeSample is behaving the way I want it to, but it would be nice if I could get this behavior with the take function, or at least without needing to choose an element randomly. I was able to get the behavior I wanted above by just changing the seed until I got the dataframe I wanted, but I don't think that is a good approach in general. Any insight is appreciated. Best, David Montague --- Py4JJavaError Traceback (most recent call last) in () 1 label_s, df_s = train_rdd.takeSample(False, 1, seed=50)[0] > 2 label, df = train_rdd.take(1)[0] /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/rdd.py in take(self, num) 1109 1110 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> res = self.context.runJob(self, takeUpToNumLeft, p, True) 1112 1113 items += res /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 816 # SparkContext#runJob. 817 mappedRDD = rdd.mapPartitions(partitionFunc) --> 818 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 819 return list(mappedRDD._collect_iterator_through_file(it)) 820 /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 177 tasks (1038.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at s
Re: grpah x issue spark 1.3
the code below is taken from the spark website and generates the error detailed Hi using spark 1.3 and trying some sample code: val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof" // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) when i run: graph.numEdges all works well but with graph.numVertices it falls over and i get a whole heap of errors: Failed to open file: /tmp/spark..shuffle_0_21_0.index at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74) at org.apache.spark.network.server.TransporSLF4J: Class path contains multiple SLF4J bindings. Is anyone else experiencing this? Ive tried different graphs and always end up with the same results. thanks On Tue, 18 Aug 2015 at 12:15 am, Sonal Goyal wrote: > I have been using graphx in production on 1.3 and 1.4 with no issues. > What's the exception you see and what are you trying to do? > On Aug 17, 2015 10:49 AM, "dizzy5112" wrote: > >> Hi using spark 1.3 and trying some sample code: >> >> >> when i run: >> >> all works well but with >> >> it falls over and i get a whole heap of errors: >> >> Is anyone else experiencing this? Ive tried different graphs and always >> end >> up with the same results. >> >> thanks >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/grpah-x-issue-spark-1-3-tp24292.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >>
Re: submit_spark_job_to_YARN
Hi Ajay, Are you trying to save to your local file system or to HDFS? // This would save to HDFS under "/user/hadoop/counter" counter.saveAsTextFile("/user/hadoop/counter"); David On Sun, Aug 30, 2015 at 11:21 AM, Ajay Chander wrote: > Hi Everyone, > > Recently we have installed spark on yarn in hortonworks cluster. Now I am > trying to run a wordcount program in my eclipse and I > did setMaster("local") and I see the results that's as expected. Now I want > to submit the same job to my yarn cluster from my eclipse. In storm > basically I was doing the same by using StormSubmitter class and by passing > nimbus & zookeeper host to Config object. I was looking for something > exactly the same. > > When I went through the documentation online, it read that I am suppose to > "export HADOOP_HOME_DIR=path to the conf dir". So now I copied the conf > folder from one of sparks gateway node to my local Unix box. Now I did > export that dir... > > export HADOOP_HOME_DIR=/Users/user1/Documents/conf/ > > And I did the same in .bash_profile too. Now when I do echo > $HADOOP_HOME_DIR, I see the path getting printed in the command prompt. Now > my assumption is, in my program when I change setMaster("local") to > setMaster("yarn-client") my program should pick up the resource mangers i.e > yarn cluster info from the directory which I have exported and the job > should get submitted to resolve manager from my eclipse. But somehow it's > not happening. Please tell me if my assumption is wrong or if I am missing > anything here. > > I have attached the word count program that I was using. Any help is > highly appreciated. > > Thank you, > Ajay > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > -- ### Confidential e-mail, for recipient's (or recipients') eyes only, not for distribution. ###
Event logging not working when worker machine terminated
Our Spark cluster is configured to write application history event logging to a directory on HDFS. This all works fine. (I've tested it with Spark shell.) However, on a large, long-running job that we ran tonight, one of our machines at the cloud provider had issues and had to be terminated and replaced in the middle of the job. The job completed correctly, and shows in state FINISHED in the "Completed Applications" section of the Spark GUI. However, when I try to look at the application's history, the GUI says "Application history not found" and "Application ... is still in progress". The reason appears to be the machine that was terminated. When I click on the executor list for that job, Spark is showing the executor from the terminated machine as still in state RUNNING. Any solution/workaround for this? BTW, I'm running Spark v1.3.0. Thanks, DR - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Event logging not working when worker machine terminated
Standalone. On 09/08/2015 11:18 PM, Jeff Zhang wrote: What cluster mode do you use ? Standalone/Yarn/Mesos ? On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch wrote: Our Spark cluster is configured to write application history event logging to a directory on HDFS. This all works fine. (I've tested it with Spark shell.) However, on a large, long-running job that we ran tonight, one of our machines at the cloud provider had issues and had to be terminated and replaced in the middle of the job. The job completed correctly, and shows in state FINISHED in the "Completed Applications" section of the Spark GUI. However, when I try to look at the application's history, the GUI says "Application history not found" and "Application ... is still in progress". The reason appears to be the machine that was terminated. When I click on the executor list for that job, Spark is showing the executor from the terminated machine as still in state RUNNING. Any solution/workaround for this? BTW, I'm running Spark v1.3.0. Thanks, DR - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Event logging not working when worker machine terminated
Thanks for the info. Do you know if there's a ticket already open for this issue? If so, I'd like to monitor it. Thanks, DR On 09/09/2015 11:50 AM, Charles Chao wrote: I have encountered the same problem after migrating from 1.2.2 to 1.3.0. After some searching this appears to be a bug introduced in 1.3. Hopefully it¹s fixed in 1.4. Thanks, Charles On 9/9/15, 7:30 AM, "David Rosenstrauch" wrote: Standalone. On 09/08/2015 11:18 PM, Jeff Zhang wrote: What cluster mode do you use ? Standalone/Yarn/Mesos ? On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch wrote: Our Spark cluster is configured to write application history event logging to a directory on HDFS. This all works fine. (I've tested it with Spark shell.) However, on a large, long-running job that we ran tonight, one of our machines at the cloud provider had issues and had to be terminated and replaced in the middle of the job. The job completed correctly, and shows in state FINISHED in the "Completed Applications" section of the Spark GUI. However, when I try to look at the application's history, the GUI says "Application history not found" and "Application ... is still in progress". The reason appears to be the machine that was terminated. When I click on the executor list for that job, Spark is showing the executor from the terminated machine as still in state RUNNING. Any solution/workaround for this? BTW, I'm running Spark v1.3.0. Thanks, DR - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Suggestion
Hi there, This is exactly our goal in Stratio Sparkta, a real-time aggregation engine fully developed with spark streaming (and fully open source). Take a look at: - the docs: http://docs.stratio.com/modules/sparkta/development/ - the repository: https://github.com/Stratio/sparkta - and some slides explaining how sparkta was born and what it makes: http://www.slideshare.net/Stratio/strata-sparkta Feel free to ask us anything about the project. 2015-09-15 8:10 GMT+02:00 srungarapu vamsi : > The batch approach i had implemented takes about 10 minutes to complete > all the pre-computation tasks for the one hour worth of data. When i went > through my code, i figured out that most of the time consuming tasks are > the ones, which read data from cassandra and the places where i perform > sparkContex.union(Array[RDD]). > Now the ask is to get the pre computation tasks near real time. So i am > exploring the streaming approach. > > My pre computation tasks not only include just finding the unique numbers > for a given device every minute, every hour, every day but it also includes > the following tasks: > 1. Find the number of unique numbers across a set of devices every minute, > every hour, every day > 2. Find the number of unique numbers which are commonly occurring across a > set of devices every minute, every hour, every day > 3. Find (total time a number occurred across a set of devices)/(total > unique numbers occurred across the set of devices) > The above mentioned pre computation tasks are just a few of what i will be > needing and there are many more coming towards me :) > I see all these problems need more of data parallel approach and hence i > am interested to do this on the spark streaming end. > > > On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke > wrote: > >> Why did you not stay with the batch approach? For me the architecture >> looks very complex for a simple thing you want to achieve. Why don't you >> process the data already in storm ? >> >> Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi >> a écrit : >> >>> I am pretty new to spark. Please suggest a better model for the >>> following use case. >>> >>> I have few (about 1500) devices in field which keep emitting about 100KB >>> of data every minute. The nature of data sent by the devices is just a list >>> of numbers. >>> As of now, we have Storm is in the architecture which receives this >>> data, sanitizes it and writes to cassandra. >>> Now, i have a requirement to process this data. The processing includes >>> finding unique numbers emitted by one or more devices for every minute, >>> every hour, every day, every month. >>> I had implemented this processing part as a batch job execution and now >>> i am interested in making it a streaming application. i.e calculating the >>> processed data as and when devices emit the data. >>> >>> I have the following two approaches: >>> 1. Storm writes the actual data to cassandra and writes a message on >>> Kafka bus that data corresponding to device D and minute M has been written >>> to cassandra >>> >>> Then Spark streaming reads this message from kafka , then reads the data >>> of Device D at minute M from cassandra and starts processing the data. >>> >>> 2. Storm writes the data to both cassandra and kafka, spark reads the >>> actual data from kafka , processes the data and writes to cassandra. >>> The second approach avoids additional hit of reading from cassandra >>> every minute , a device has written data to cassandra at the cost of >>> putting the actual heavy messages instead of light events on kafka. >>> >>> I am a bit confused among the two approaches. Please suggest which one >>> is better and if both are bad, how can i handle this use case? >>> >>> >>> -- >>> /Vamsi >>> >> > > > -- > /Vamsi > -- David Morales de Frías :: +34 607 010 411 :: @dmoralesdf <https://twitter.com/dmoralesdf> <http://www.stratio.com/> Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd <https://twitter.com/StratioBD>*