Do existing R packages work with SparkR data frames
Hello, Is it possible for existing R Machine Learning packages (which work with R data frames) such as bnlearn, to work with SparkR data frames? Or do I need to convert SparkR data frames to R data frames? Is "collect" the function to do the conversion, or how else to do that? Many Thanks, Lan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Do-existing-R-packages-work-with-SparkR-data-frames-tp25772.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ERROR EndpointWriter: AssociationError
Hello, I'm new to Spark, and tried to setup a Spark cluster of 1 master VM SparkV1 and 1 worker VM SparkV4 (the error is the same if I have 2 workers). They are connected without a problem now. But when I submit a job (as in https://spark.apache.org/docs/latest/quick-start.html) at the master: >spark-submit --master spark://SparkV1:7077 examples/src/main/python/pi.py it seems to run ok and returns "Pi is roughly...", but the worker has the following Error: 15/02/07 15:22:33 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@SparkV4:47986] <- [akka.tcp://sparkExecutor@SparkV4:46630]: Error [Shut down address: akka.tcp://sparkExecutor@SparkV4:46630] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@SparkV4:46630 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] More about the setup: each VM has only 4GB RAM, running Ubuntu, using spark-1.2.0, built for Hadoop 2.6.0. I have struggled with this error for a few days. Could anyone please tell me what the problem is and how to fix it? Thanks, Lan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-EndpointWriter-AssociationError-tp21543.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down
Hi Alexey and Daniel, I'm using Spark 1.2.0 and still having the same error, as described below. Do you have any news on this? Really appreciate your responses!!! "a Spark cluster of 1 master VM SparkV1 and 1 worker VM SparkV4 (the error is the same if I have 2 workers). They are connected without a problem now. But when I submit a job (as in https://spark.apache.org/docs/latest/quick-start.html) at the master: >spark-submit --master spark://SparkV1:7077 examples/src/main/python/pi.py it seems to run ok and returns "Pi is roughly...", but the worker has the following Error: 15/02/07 15:22:33 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@SparkV4:47986] <- [akka.tcp://sparkExecutor@SparkV4:46630]: Error [Shut down address: akka.tcp://sparkExecutor@SparkV4:46630] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@SparkV4:46630 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] More about the setup: each VM has only 4GB RAM, running Ubuntu, using spark-1.2.0, built for Hadoop 2.6.0 or 2.4.0. " -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/akka-remote-transport-Transport-InvalidAssociationException-The-remote-system-terminated-the-associan-tp20071p21607.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why is Columnar Parquet used as default for saving Row-based DataFrames/RDD?
Hello, I have the above naive question if anyone could help. Why not using a Row-based File format to save Row-based DataFrames/RDD? Thanks, Lan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-Columnar-Parquet-used-as-default-for-saving-Row-based-DataFrames-RDD-tp22579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
getting WARN ReliableDeliverySupervisor
Hi Expert, Hadoop version: 2.4 Spark version: 1.3.1 I am running the SparkPi example application. bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar 2 The same command sometimes gets WARN ReliableDeliverySupervisor, sometimes does not. Some runs are successful even with the WARN bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar 1 15/07/02 04:38:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Pi is roughly 3.141633956 bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar 2 15/07/02 05:17:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/02 05:17:53 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@hostname:32544] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/07/02 05:18:01 ERROR YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED! Exception in thread "main" java.lang.NullPointerException at org.apache.spark.SparkContext.(SparkContext.scala:544) at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:28) at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar 1 15/07/02 05:23:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/02 05:24:09 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@hostname:15959] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. Pi is roughly 3.141625776 Also, the spark ui only available when I set --master to local. What could have caused those issues ? Thanks, Xiaohe
Re: getting WARN ReliableDeliverySupervisor
Change jdk from 1.8.0_45 to 1.7.0_79 solve this issue. I saw https://issues.apache.org/jira/browse/SPARK-6388 But it is not a problem however. On Thu, Jul 2, 2015 at 1:30 PM, xiaohe lan wrote: > Hi Expert, > > Hadoop version: 2.4 > Spark version: 1.3.1 > > I am running the SparkPi example application. > > bin/spark-submit --class org.apache.spark.examples.SparkPi --master > yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar > 2 > > The same command sometimes gets WARN ReliableDeliverySupervisor, sometimes > does not. > Some runs are successful even with the WARN > > bin/spark-submit --class org.apache.spark.examples.SparkPi --master > yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar > 1 > 15/07/02 04:38:20 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > Pi is roughly 3.141633956 > > bin/spark-submit --class org.apache.spark.examples.SparkPi --master > yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar > 2 > 15/07/02 05:17:42 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 15/07/02 05:17:53 WARN ReliableDeliverySupervisor: Association with remote > system [akka.tcp://sparkYarnAM@hostname:32544] has failed, address is now > gated for [5000] ms. Reason is: [Disassociated]. > 15/07/02 05:18:01 ERROR YarnClientSchedulerBackend: Yarn application has > already exited with state FINISHED! > Exception in thread "main" java.lang.NullPointerException > at org.apache.spark.SparkContext.(SparkContext.scala:544) > at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:28) > at org.apache.spark.examples.SparkPi.main(SparkPi.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) > at > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > bin/spark-submit --class org.apache.spark.examples.SparkPi --master > yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar > 1 > 15/07/02 05:23:51 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 15/07/02 05:24:09 WARN ReliableDeliverySupervisor: Association with remote > system [akka.tcp://sparkYarnAM@hostname:15959] has failed, address is now > gated for [5000] ms. Reason is: [Disassociated]. > Pi is roughly 3.141625776 > > Also, the spark ui only available when I set --master to local. > > What could have caused those issues ? > > Thanks, > Xiaohe >
MLLib + Streaming
Hi, there I hope someone can clarify this for me. It seems that some of the MLlib algorithms such as KMean, Linear Regression and Logistics Regression have a Streaming version, which can do online machine learning. But does that mean other MLLib algorithm cannot be used in Spark streaming applications, such as random forest, SVM, collaborate filtering, etc?? DStreams are essentially a sequence of RDDs. We can use DStream.transform() and DStream.foreachRDD() operations, which allows you access RDDs in a DStream and apply MLLib functions on them. So it looks like all MLLib algorithms should be able to run in the streaming application. Am I wrong? Lan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark ML and Streaming
Hi, there I hope someone can clarify this for me. It seems that some of the MLlib algorithms such as KMean, Linear Regression and Logistics Regression have a Streaming version, which can do online machine learning. But does that mean other MLLib algorithm cannot be used in Spark streaming applications, such as random forest, SVM, collaborate filtering, etc?? DStreams are essentially a sequence of RDDs. We can use DStream.transform() and DStream.foreachRDD() operations, which allows you access RDDs in a DStream and apply MLLib functions on them. So it looks like all MLLib algorithms should be able to run in the streaming application. Am I wrong? Thanks in advance. Lan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark ML and Streaming
Sorry, accidentally sent again. My apology. > On Mar 6, 2016, at 1:22 PM, Lan Jiang wrote: > > Hi, there > > I hope someone can clarify this for me. It seems that some of the MLlib > algorithms such as KMean, Linear Regression and Logistics Regression have a > Streaming version, which can do online machine learning. But does that mean > other MLLib algorithm cannot be used in Spark streaming applications, such as > random forest, SVM, collaborate filtering, etc?? > > DStreams are essentially a sequence of RDDs. We can use DStream.transform() > and DStream.foreachRDD() operations, which allows you access RDDs in a > DStream and apply MLLib functions on them. So it looks like all MLLib > algorithms should be able to run in the streaming application. Am I wrong? > > Thanks in advance. > > Lan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib + Streaming
Thanks, Guru. After reading the implementation of StreamingKMean, StreamingLinearRegressionWithSGD and StreamingLogisticRegressionWithSGD, I reached the same conclusion. But unfortunately, this distinction between true online learning and offline learning are implied in the documentation and I was not sure if my understanding was correct or not. Thanks for confirming this! However, I have a different opinion on your last paragraph — that we cannot hold test data during model training for online learning. Taking StreamingLinearRegressionWithSGD for example, you can certainly split the each micro-batch as 70% — 30% and do evaluation based on the RMSE. At the very beginning, the RMSE will be large. But as more and more micro-batch arrives, you should see RMSE becomes smaller as the weights approach optimal. IMHO, I don’t see much difference regarding holding test data between online and offline learning. Lan > On Mar 6, 2016, at 2:43 AM, Chris Miller wrote: > > Guru:This is a really great response. Thanks for taking the time to explain > all of this. Helpful for me too. > > > -- > Chris Miller > > On Sun, Mar 6, 2016 at 1:54 PM, Guru Medasani <mailto:gdm...@gmail.com>> wrote: > Hi Lan, > > Streaming Means, Linear Regression and Logistic Regression support online > machine learning as you mentioned. Online machine learning is where model is > being trained and updated on every batch of streaming data. These models have > trainOn() and predictOn() methods where you can simply pass in DStreams you > want to train the model on and DStreams you want the model to predict on. So > when the next batch of data arrives model is trained and updated again. In > this case model weights are continually updated and hopefully model performs > better in terms of convergence and accuracy over time. What we are really > trying to do in online learning case is that we are only showing few examples > of the data at a time ( stream of data) and updating the parameters in case > of Linear and Logistic Regression and updating the centers in case of > K-Means. In the case of Linear or Logistic Regression this is possible due to > the optimizer that is chosen for minimizing the cost function which is > Stochastic Gradient Descent. This optimizer helps us to move closer and > closer to the optimal weights after every batch and over the time we will > have a model that has learned how to represent our data and predict well. > > In the scenario of using any MLlib algorithms and doing training with > DStream.transform() and DStream.foreachRDD() operations, when the first batch > of data arrives we build a model, let’s call this model1. Once you have the > model1 you can make predictions on the same DStream or a different DStream > source. But for the next batch if you follow the same procedure and create a > model, let’s call this model2. This model2 will be significantly different > than model1 based on how different the data is in the second DStream vs the > first DStream as it is not continually updating the model. It’s like weight > vectors are jumping from one place to the other for every batch and we never > know if the algorithm is converging to the optimal weights. So I believe it > is not possible to do true online learning with other MLLib models in Spark > Streaming. I am not sure if this is because the models don’t generally > support this streaming scenarios or if the streaming versions simply haven’t > been implemented yet. > > Though technically you can use any of the MLlib algorithms in Spark Streaming > with the procedure you mentioned and make predictions, it is important to > figure out if the model you are choosing can converge by showing only a > subset(batches - DStreams) of the data over time. Based on the algorithm you > choose certain optimizers won’t necessarily be able to converge by showing > only individual data points and require to see majority of the data to be > able to learn optimal weights. In these cases, you can still do offline > learning/training with Spark bach processing using any of the MLlib > algorithms and save those models on hdfs. You can then start a streaming job > and load these saved models into your streaming application and make > predictions. This is traditional offline learning. > > In general, online learning is hard as it’s hard to evaluate since we are not > holding any test data during the model training. We are simply training the > model and predicting. So in the initial batches, results can vary quite a bit > and have significant errors in terms of the predictions. So choosing online > learning vs. offline learning depends on how much tolerance the application > can have towards wild predictions in the beginni
Processing json document
Hi, there Spark has provided json document processing feature for a long time. In most examples I see, each line is a json object in the sample file. That is the easiest case. But how can we process a json document, which does not conform to this standard format (one line per json object)? Here is the document I am working on. First of all, it is multiple lines for one single big json object. The real file can be as long as 20+ G. Within that one single json object, it contains many name/value pairs. The name is some kind of id values. The value is the actual json object that I would like to be part of dataframe. Is there any way to do that? Appreciate any input. { "id1": { "Title":"title1", "Author":"Tom", "Source":{ "Date":"20160506", "Type":"URL" }, "Data":" blah blah"}, "id2": { "Title":"title2", "Author":"John", "Source":{ "Date":"20150923", "Type":"URL" }, "Data":" blah blah "}, "id3: { "Title":"title3", "Author":"John", "Source":{ "Date":"20150902", "Type":"URL" }, "Data":" blah blah "} }
Re: Processing json document
Hi, there, Thank you all for your input. @Hyukjin, as a matter of fact, I have read the blog link you posted before asking the question on the forum. As you pointed out, the link uses wholeTextFiles(0, which is bad in my case, because my json file can be as large as 20G+ and OOM might occur. I am not sure how to extract the value by using textFile call as it will create an RDD of string and treat each line without ordering. It destroys the json context. Large multiline json file with parent node are very common in the real world. Take the common employees json example below, assuming we have millions of employee and it is super large json document, how can spark handle this? This should be a common pattern, shouldn't it? In real world, json document does not always come as cleanly formatted as the spark example requires. { "employees":[ { "firstName":"John", "lastName":"Doe" }, { "firstName":"Anna", "lastName":"Smith" }, { "firstName":"Peter", "lastName":"Jones"} ] } On Thu, Jul 7, 2016 at 1:47 AM, Hyukjin Kwon wrote: > The link uses wholeTextFiles() API which treats each file as each record. > > > 2016-07-07 15:42 GMT+09:00 Jörn Franke : > >> This does not need necessarily the case if you look at the Hadoop >> FileInputFormat architecture then you can even split large multi line Jsons >> without issues. I would need to have a look at it, but one large file does >> not mean one Executor independent of the underlying format. >> >> On 07 Jul 2016, at 08:12, Hyukjin Kwon wrote: >> >> There is a good link for this here, >> http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files >> >> If there are a lot of small files, then it would work pretty okay in a >> distributed manner, but I am worried if it is single large file. >> >> In this case, this would only work in single executor which I think will >> end up with OutOfMemoryException. >> >> Spark JSON data source does not support multi-line JSON as input due to >> the limitation of TextInputFormat and LineRecordReader. >> >> You may have to just extract the values after reading it by textFile.. >> >> >> >> 2016-07-07 14:48 GMT+09:00 Lan Jiang : >> >>> Hi, there >>> >>> Spark has provided json document processing feature for a long time. In >>> most examples I see, each line is a json object in the sample file. That is >>> the easiest case. But how can we process a json document, which does not >>> conform to this standard format (one line per json object)? Here is the >>> document I am working on. >>> >>> First of all, it is multiple lines for one single big json object. The >>> real file can be as long as 20+ G. Within that one single json object, it >>> contains many name/value pairs. The name is some kind of id values. The >>> value is the actual json object that I would like to be part of dataframe. >>> Is there any way to do that? Appreciate any input. >>> >>> >>> { >>> "id1": { >>> "Title":"title1", >>> "Author":"Tom", >>> "Source":{ >>> "Date":"20160506", >>> "Type":"URL" >>> }, >>> "Data":" blah blah"}, >>> >>> "id2": { >>> "Title":"title2", >>> "Author":"John", >>> "Source":{ >>> "Date":"20150923", >>> "Type":"URL" >>> }, >>> "Data":" blah blah "}, >>> >>> "id3: { >>> "Title":"title3", >>> "Author":"John", >>> "Source":{ >>> "Date":"20150902", >>> "Type":"URL" >>> }, >>> "Data":" blah blah "} >>> } >>> >>> >> >
Spark Yarn executor container memory
Hello, My understanding is that YARN executor container memory is based on "spark.executor.memory" + “spark.yarn.executor.memoryOverhead”. The first one is for heap memory and second one is for offheap memory. The spark.executor.memory is used by -Xmx to set the max heap size. Now my question is why it does not count permgen size and memory used by stack. They are not part of the max heap size. IMHO, YARN executor container memory should be set to: spark.executor.memory + [-XX:MaxPermSize] + number_of_threads * [-Xss] + spark.yarn.executor.memoryOverhead. What did I miss? Lan - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Scala VS Java VS Python
For Spark data science project, Python might be a good choice. However, for Spark streaming, Python API is still lagging. For example, for Kafka no receiver connector, according to the Spark 1.5.2 doc: "Spark 1.4 added a Python API, but it is not yet at full feature parity”. Java does not have REPL shell, which is a major drawback from my perspective. Lan > On Dec 16, 2015, at 3:46 PM, Stephen Boesch wrote: > > There are solid reasons to have built spark on the jvm vs python. The > question for Daniel appear to be at this point scala vs java8. For that there > are many comparisons already available: but in the case of working with spark > there is the additional benefit for the scala side that the core libraries > are in that language. > > 2015-12-16 13:41 GMT-08:00 Darren Govoni <mailto:dar...@ontrenet.com>>: > I use python too. I'm actually surprises it's not the primary language since > it is by far more used in data science than java snd Scala combined. > > If I had a second choice of script language for general apps I'd want groovy > over scala. > > > > Sent from my Verizon Wireless 4G LTE smartphone > > > Original message > From: Daniel Lopes mailto:dan...@bankfacil.com.br>> > Date: 12/16/2015 4:16 PM (GMT-05:00) > To: Daniel Valdivia <mailto:h...@danielvaldivia.com>> > Cc: user mailto:user@spark.apache.org>> > Subject: Re: Scala VS Java VS Python > > For me Scala is better like Spark is written in Scala, and I like python cuz > I always used python for data science. :) > > On Wed, Dec 16, 2015 at 5:54 PM, Daniel Valdivia <mailto:h...@danielvaldivia.com>> wrote: > Hello, > > This is more of a "survey" question for the community, you can reply to me > directly so we don't flood the mailing list. > > I'm having a hard time learning Spark using Python since the API seems to be > slightly incomplete, so I'm looking at my options to start doing all my apps > in either Scala or Java, being a Java Developer, java 1.8 looks like the > logical way, however I'd like to ask here what's the most common (Scala Or > Java) since I'm observing mixed results in the social documentation, however > Scala seems to be the predominant language for spark examples. > > Thank for the advice > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: user-h...@spark.apache.org > <mailto:user-h...@spark.apache.org> > > > > > -- > Daniel Lopes, B.Eng > Data Scientist - BankFacil > CREA/SP 5069410560 > <http://edital.confea.org.br/ConsultaProfissional/cartao.aspx?rnp=2613651334> > Mob +55 (18) 99764-2733 > Ph +55 (11) 3522-8009 > http://about.me/dannyeuu <http://about.me/dannyeuu> > > Av. Nova Independência, 956, São Paulo, SP > Bairro Brooklin Paulista > CEP 04570-001 > https://www.bankfacil.com.br <https://www.bankfacil.com.br/> > >
Question about Spark Streaming checkpoint interval
Need some clarification about the documentation. According to Spark doc "the default interval is a multiple of the batch interval that is at least 10 seconds. It can be set by using dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.” My question is that does the checkpointinterval apply only for data checkpointing or it applies to metadata checkpointing? The API says dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this DStream”, implying it is only for data checkpointing. My understanding is that metadata checkpointing is for driver failure. For example, in Kafka direct API, driver keeps track of the offset range of each partition. So if metadata checkpoint is NOT done for each batch, in driver failure, some messages in Kafka is going to be replayed. I do not find the answer in the document saying whether metadata checkpointing is done for each batch and whether checkpointinterval setting applies to both types of checkpointing. Maybe I miss it. If anyone can point me to the right documentation, I would highly appreciate it. Best Regards, Lan
broadcast join in SparkSQL requires analyze table noscan
Hi, there I am looking at the SparkSQL setting spark.sql.autoBroadcastJoinThreshold. According to the programming guide *Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run.* My question is that is "NOSCAN" option a must? If I execute "ANALYZE TABLE compute statistics" command in Hive shell, is the statistics going to be used by SparkSQL to decide broadcast join? Thanks.
Re: broadcast join in SparkSQL requires analyze table noscan
Michael, Thanks for the reply. On Wed, Feb 10, 2016 at 11:44 AM, Michael Armbrust wrote: > My question is that is "NOSCAN" option a must? If I execute "ANALYZE TABLE >> compute statistics" command in Hive shell, is the statistics >> going to be used by SparkSQL to decide broadcast join? > > > Yes, spark SQL will only accept the simple no scan version. However, as > long as the sizeInBytes statistic is present, we will use it. > >
unintended consequence of using coalesce operation
Hi, there I ran into an issue when using Spark (v 1.3) to load avro file through Spark SQL. The code sample is below val df = sqlContext.load(“path-to-avro-file","com.databricks.spark.avro”) val myrdd = df.select(“Key", “Name", “binaryfield").rdd val results = myrdd.map(...) val finalResults = results.filter(...) finalResults.coalesce(1).toDF().saveAsParquetFile(“path-to-parquet”) The avro file 645M. The HDFS block size is 128M. Thus the total is 5 HDFS blocks, which means there should be 5 partitions. Please note that I use coalesce because I expect the previous filter transformation should filter out almost all the data and I would like to write to 1 single parquet file. YARN cluster has 3 datanodes. I use the below configuration for spark submit spark-submit —class —num-executors 3 —executor-cores 2 —executor-memory 8g —master yarn-client mytest.jar I do see 3 executors being created, one on each data/worker node. However, there is only one task running within the cluster. After I remove the coalesce(1) call from the codes, I can see 5 tasks generates, spreading across 3 executors. I was surprised by the result. coalesce usually is thought to be a better choice than repartition operation when reducing the partition numbers. However, in the case, it causes performance issue because Spark only creates one task because the final partition number was coalesced to 1. Thus there is only one thread reading HDFS files instead of 5. Is my understanding correct? In this case, I think repartition is a better choice than coalesce. Lan
How to access lost executor log file
Hi, there When running a Spark job on YARN, 2 executors somehow got lost during the execution. The message on the history server GUI is “CANNOT find address”. Two extra executors were launched by YARN and eventually finished the job. Usually I go to the “Executors” tab on the UI to check the executor stdout/stderr for troubleshoot. Now if I go to the “Executors” tab, I do not see the 2 executors that were lost. I can only see the rest executors and the 2 new executors. Thus I cannot check the stdout/stderr of the lost executors. How can I access the log files of these lost executors to find out why they were lost? Thanks Lan
Re: How to access lost executor log file
Ted, Thanks for your reply. First of all, after sending email to the mailing list, I use yarn logs applicationId to retrieve the aggregated log successfully. I found the exceptions I am looking for. Now as to your suggestion, when I go to the YARN RM UI, I can only see the "Tracking URL" in the application overview section. When I click it, it brings me to the spark history server UI, where I cannot find the lost exectuors. The only logs link I can find one the YARN RM site is the ApplicationMaster log, which is not what I need. Did I miss something? Lan On Thu, Oct 1, 2015 at 1:30 PM, Ted Yu wrote: > Can you go to YARN RM UI to find all the attempts for this Spark Job ? > > The two lost executors should be found there. > > On Thu, Oct 1, 2015 at 10:30 AM, Lan Jiang wrote: > >> Hi, there >> >> When running a Spark job on YARN, 2 executors somehow got lost during the >> execution. The message on the history server GUI is “CANNOT find address”. >> Two extra executors were launched by YARN and eventually finished the job. >> Usually I go to the “Executors” tab on the UI to check the executor >> stdout/stderr for troubleshoot. Now if I go to the “Executors” tab, I do >> not see the 2 executors that were lost. I can only see the rest executors >> and the 2 new executors. Thus I cannot check the stdout/stderr of the lost >> executors. How can I access the log files of these lost executors to find >> out why they were lost? >> >> Thanks >> >> Lan >> >> >> >> >> >> >
"java.io.IOException: Filesystem closed" on executors
Hi, there Here is the problem I ran into when executing a Spark Job (Spark 1.3). The spark job is loading a bunch of avro files using Spark SQL spark-avro 1.0.0 library. Then it does some filter/map transformation, repartition to 1 partition and then write to HDFS. It creates 2 stages. The total HDFS block number is around 12000, thus it creates 12000 partitions, thus 12000 tasks for the first stage. I have total 9 executors launched with 5 thread for each. The job has run fine until the very end. When it reaches 19980/2 tasks succeeded, it suddenly failed the last 20 tasks and I lost 2 executors. The spark did launched 2 new executors and finishes the job eventually by reprocessing the 20 tasks. I only ran into this issue when I run the spark application on the full dataset. When I run the 1/3 of the dataset, everything finishes fine without error. Question 1: What is the root cause of this issue? It is simiar to http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed and https://issues.apache.org/jira/browse/SPARK-3052, but it says the issue has been fixed since 1.2 Quesiton 2: I am a little surprised that after the 2 new executors were launched, replacing the two failed executors, they simply reprocessed the failed 20 tasks/partitions. What about the results for other parititons processed by the 2 failed executors before? I assumed the results of these parititons are stored to the local disk and thus do not need to be computed by the new exectuors? When are the data stored locally? Is it configuration? This question is for my own understanding about the spark framework. The exception causing the exectuor failure is below org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278) at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897) at java.io.DataInputStream.read(DataInputStream.java:149) at org.apache.avro.mapred.FsInput.read(FsInput.java:54) at org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210) at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839) at org.apache.avro.io.BinaryDecoder.isEnd(BinaryDecoder.java:444) at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:264)
Re: "java.io.IOException: Filesystem closed" on executors
I am still facing this issue. Executor dies due to org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278) at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) ... Caused by: java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897) at java.io.DataInputStream.read(DataInputStream.java:149) Spark automatically launched new executors and the whole job completed fine. Anyone has a clue what's going on? The spark job reads avro files from a directory, do some basic map/filter and then repartition to 1, write the result to HDFS. I use spark 1.3 with spark-avro (1.0.0). The error only happens when running on the whole dataset. When running on 1/3 of the files, the same job completes without error. On Thu, Oct 1, 2015 at 2:41 PM, Lan Jiang wrote: > Hi, there > > Here is the problem I ran into when executing a Spark Job (Spark 1.3). The > spark job is loading a bunch of avro files using Spark SQL spark-avro 1.0.0 > library. Then it does some filter/map transformation, repartition to 1 > partition and then write to HDFS. It creates 2 stages. The total HDFS block > number is around 12000, thus it creates 12000 partitions, thus 12000 tasks > for the first stage. I have total 9 executors launched with 5 thread for > each. The job has run fine until the very end. When it reaches 19980/2 > tasks succeeded, it suddenly failed the last 20 tasks and I lost 2 > executors. The spark did launched 2 new executors and finishes the job > eventually by reprocessing the 20 tasks. > > I only ran into this issue when I run the spark application on the full > dataset. When I run the 1/3 of the dataset, everything finishes fine > without error. > > Question 1: What is the root cause of this issue? It is simiar to > http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed > and https://issues.apache.org/jira/browse/SPARK-3052, but it says the > issue has been fixed since 1.2 > Quesiton 2: I am a little surprised that after the 2 new executors were > launched, replacing the two failed executors, they simply reprocessed the > failed 20 tasks/partitions. What about the results for other parititons > processed by the 2 failed executors before? I assumed the results of these > parititons are stored to the local disk and thus do not need to be computed > by the new exectuors? When are the data stored locally? Is it > configuration? This question is for my own understanding about the spark > framework. > > The exception causing the exectuor failure is below > > org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem > closed > at > org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278) > at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) > at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) > at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Work
Spark cache memory storage
Hi, there My understanding is that the cache storage is calculated as following executor heap size * spark.storage.safetyFraction * spark.storage.memoryFraction. The default value for safetyFraction is 0.9 and memoryFraction is 0.6. When I started a spark job on YARN, I set executor-memory to be 6g. thus I expect the memory cache to be 6 * 0.9 * 0.6 = 3.24g. However, on the Spark history server, it shows the reserved cached size for each executor is 3.1g. So it does not add up. What do I miss? Lan
failed spark job reports on YARN as successful
Hi, there I have a spark batch job running on CDH5.4 + Spark 1.3.0. Job is submitted in “yarn-client” mode. The job itself failed due to YARN kills several executor containers because the containers exceeded the memory limit posed by YARN. However, when I went to the YARN resource manager site, it displayed the job as successful. I found there was an issue reported in JIRA https://issues.apache.org/jira/browse/SPARK-3627 <https://issues.apache.org/jira/browse/SPARK-3627>, but it says it was fixed in Spark 1.2. On Spark history server, it shows the job as “Incomplete”. Is this still a bug or there is something I need to do in spark application to report the correct job status to YARN? Lan
Re: How to increase Spark partitions for the DataFrame?
The partition number should be the same as the HDFS block number instead of file number. Did you confirmed from the spark UI that only 12 partitions were created? What is your ORC orc.stripe.size? Lan > On Oct 8, 2015, at 1:13 PM, unk1102 wrote: > > Hi I have the following code where I read ORC files from HDFS and it loads > directory which contains 12 ORC files. Now since HDFS directory contains 12 > files it will create 12 partitions by default. These directory is huge and > when ORC files gets decompressed it becomes around 10 GB how do I increase > partitions for the below code so that my Spark job runs faster and does not > hang for long time because of reading 10 GB files through shuffle in 12 > partitions. Please guide. > > DataFrame df = > hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/"); > df.select().groupby(..) > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to increase Spark partitions for the DataFrame?
Hmm, that’s odd. You can always use repartition(n) to increase the partition number, but then there will be shuffle. How large is your ORC file? Have you used NameNode UI to check how many HDFS blocks each ORC file has? Lan > On Oct 8, 2015, at 2:08 PM, Umesh Kacha wrote: > > Hi Lan, thanks for the response yes I know and I have confirmed in UI that it > has only 12 partitions because of 12 HDFS blocks and hive orc file strip size > is 33554432. > > On Thu, Oct 8, 2015 at 11:55 PM, Lan Jiang <mailto:ljia...@gmail.com>> wrote: > The partition number should be the same as the HDFS block number instead of > file number. Did you confirmed from the spark UI that only 12 partitions were > created? What is your ORC orc.stripe.size? > > Lan > > > > On Oct 8, 2015, at 1:13 PM, unk1102 > <mailto:umesh.ka...@gmail.com>> wrote: > > > > Hi I have the following code where I read ORC files from HDFS and it loads > > directory which contains 12 ORC files. Now since HDFS directory contains 12 > > files it will create 12 partitions by default. These directory is huge and > > when ORC files gets decompressed it becomes around 10 GB how do I increase > > partitions for the below code so that my Spark job runs faster and does not > > hang for long time because of reading 10 GB files through shuffle in 12 > > partitions. Please guide. > > > > DataFrame df = > > hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/"); > > df.select().groupby(..) > > > > > > > > > > -- > > View this message in context: > > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html > > > > <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html> > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > <mailto:user-unsubscr...@spark.apache.org> > > For additional commands, e-mail: user-h...@spark.apache.org > > <mailto:user-h...@spark.apache.org> > > > >
Re: "java.io.IOException: Filesystem closed" on executors
Thank you, Akhil. Actually the problem was solved last week and I did not have time to report back. The error was caused by YARN killing the container because executors use more off-heap memory that they were assigned. There was nothing in the exectuor log, but the AM log clearly states this is the problem. After I increased the spark.yarn.executor.memoryOverhead, it was working fine. I was using Spark 1.3, which has the defaut value as executorMemory * 0.07, with minimum of 384. In spark 1.4 and later, the default value was changed to executorMemory * 0.10, with minimum of 384. Lan On Mon, Oct 12, 2015 at 8:34 AM, Akhil Das wrote: > Can you look a bit deeper in the executor logs? It could be filling up the > memory and getting killed. > > Thanks > Best Regards > > On Mon, Oct 5, 2015 at 8:55 PM, Lan Jiang wrote: > >> I am still facing this issue. Executor dies due to >> >> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem >> closed >> at >> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278) >> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) >> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) >> ... >> Caused by: java.io.IOException: Filesystem closed >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794) >> at >> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833) >> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897) >> at java.io.DataInputStream.read(DataInputStream.java:149) >> >> Spark automatically launched new executors and the whole job completed >> fine. Anyone has a clue what's going on? >> >> The spark job reads avro files from a directory, do some basic map/filter >> and then repartition to 1, write the result to HDFS. I use spark 1.3 with >> spark-avro (1.0.0). The error only happens when running on the whole >> dataset. When running on 1/3 of the files, the same job completes without >> error. >> >> >> On Thu, Oct 1, 2015 at 2:41 PM, Lan Jiang wrote: >> >>> Hi, there >>> >>> Here is the problem I ran into when executing a Spark Job (Spark 1.3). >>> The spark job is loading a bunch of avro files using Spark SQL spark-avro >>> 1.0.0 library. Then it does some filter/map transformation, repartition to >>> 1 partition and then write to HDFS. It creates 2 stages. The total HDFS >>> block number is around 12000, thus it creates 12000 partitions, thus 12000 >>> tasks for the first stage. I have total 9 executors launched with 5 thread >>> for each. The job has run fine until the very end. When it reaches >>> 19980/2 tasks succeeded, it suddenly failed the last 20 tasks and I >>> lost 2 executors. The spark did launched 2 new executors and finishes the >>> job eventually by reprocessing the 20 tasks. >>> >>> I only ran into this issue when I run the spark application on the full >>> dataset. When I run the 1/3 of the dataset, everything finishes fine >>> without error. >>> >>> Question 1: What is the root cause of this issue? It is simiar to >>> http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed >>> and https://issues.apache.org/jira/browse/SPARK-3052, but it says the >>> issue has been fixed since 1.2 >>> Quesiton 2: I am a little surprised that after the 2 new executors were >>> launched, replacing the two failed executors, they simply reprocessed the >>> failed 20 tasks/partitions. What about the results for other parititons >>> processed by the 2 failed executors before? I assumed the results of these >>> parititons are stored to the local disk and thus do not need to be computed >>> by the new exectuors? When are the data stored locally? Is it >>> configuration? This question is for my own understanding about the spark >>> framework. >>> >>> The exception causing the exectuor failure is below >>> >>> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem >>> closed >>> at >>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278) >>> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) >>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) >>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) >>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) >>> at org.apache.spark.rdd.
Re: Ahhhh... Spark creates >30000 partitions... What can I do?
As Francois pointed out, you are encountering a classic small file anti-pattern. One solution I used in the past is to wrap all these small binary files into a sequence file or avro file. For example, the avro schema can have two fields: filename: string and binaryname:byte[]. Thus your file is splittable and will not create so many partitions. Lan > On Oct 20, 2015, at 8:03 AM, François Pelletier > wrote: > > You should aggregate your files in larger chunks before doing anything else. > HDFS is not fit for small files. It will bloat it and cause you a lot of > performance issues. Target a few hundred MB chunks partition size and then > save those files back to hdfs and then delete the original ones. You can > read, use coalesce and the saveAsXXX on the result. > > I had the same kind of problem once and solved it in bunching 100's of files > together in larger ones. I used text files with bzip2 compression. > > > > Le 2015-10-20 08:42, Sean Owen a écrit : >> coalesce without a shuffle? it shouldn't be an action. It just treats many >> partitions as one. >> >> On Tue, Oct 20, 2015 at 1:00 PM, t3l > <mailto:t...@threelights.de>> wrote: >> >> I have dataset consisting of 5 binary files (each between 500kb and >> 2MB). They are stored in HDFS on a Hadoop cluster. The datanodes of the >> cluster are also the workers for Spark. I open the files as a RDD using >> sc.binaryFiles("hdfs:///path_to_directory").When I run the first action that >> involves this RDD, Spark spawns a RDD with more than 3 partitions. And >> this takes ages to process these partitions even if you simply run "count". >> Performing a "repartition" directly after loading does not help, because >> Spark seems to insist on materializing the RDD created by binaryFiles first. >> >> How I can get around this? >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140.html >> >> <http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140.html> >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> >
Re: Ahhhh... Spark creates >30000 partitions... What can I do?
I think the data file is binary per the original post. So in this case, sc.binaryFiles should be used. However, I still recommend against using so many small binary files as 1. They are not good for batch I/O 2. They put too many memory pressure on namenode. Lan > On Oct 20, 2015, at 11:20 AM, Deenar Toraskar > wrote: > > also check out wholeTextFiles > > https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html#wholeTextFiles(java.lang.String,%20int) > > <https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html#wholeTextFiles(java.lang.String,%20int)> > > On 20 October 2015 at 15:04, Lan Jiang <mailto:ljia...@gmail.com>> wrote: > As Francois pointed out, you are encountering a classic small file > anti-pattern. One solution I used in the past is to wrap all these small > binary files into a sequence file or avro file. For example, the avro schema > can have two fields: filename: string and binaryname:byte[]. Thus your file > is splittable and will not create so many partitions. > > Lan > > >> On Oct 20, 2015, at 8:03 AM, François Pelletier >> > <mailto:newslett...@francoispelletier.org>> wrote: >> >> You should aggregate your files in larger chunks before doing anything else. >> HDFS is not fit for small files. It will bloat it and cause you a lot of >> performance issues. Target a few hundred MB chunks partition size and then >> save those files back to hdfs and then delete the original ones. You can >> read, use coalesce and the saveAsXXX on the result. >> >> I had the same kind of problem once and solved it in bunching 100's of files >> together in larger ones. I used text files with bzip2 compression. >> >> >> >> Le 2015-10-20 08:42, Sean Owen a écrit : >>> coalesce without a shuffle? it shouldn't be an action. It just treats many >>> partitions as one. >>> >>> On Tue, Oct 20, 2015 at 1:00 PM, t3l >> <mailto:t...@threelights.de>> wrote: >>> >>> I have dataset consisting of 5 binary files (each between 500kb and >>> 2MB). They are stored in HDFS on a Hadoop cluster. The datanodes of the >>> cluster are also the workers for Spark. I open the files as a RDD using >>> sc.binaryFiles("hdfs:///path_to_directory <>").When I run the first action >>> that >>> involves this RDD, Spark spawns a RDD with more than 3 partitions. And >>> this takes ages to process these partitions even if you simply run "count". >>> Performing a "repartition" directly after loading does not help, because >>> Spark seems to insist on materializing the RDD created by binaryFiles first. >>> >>> How I can get around this? >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140.html >>> >>> <http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140.html> >>> Sent from the Apache Spark User List mailing list archive at Nabble.com >>> <http://nabble.com/>. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> <mailto:user-unsubscr...@spark.apache.org> >>> For additional commands, e-mail: user-h...@spark.apache.org >>> <mailto:user-h...@spark.apache.org> >>> >>> >> > >
specify yarn-client for --master from a laptop
Hi, I have hadoop 2.4 cluster running on some remote VMs, can I start spark shell or submit from my laptop. For example: bin/spark-shell --mast yarn-client If this is possible, how can I do this ? I have copied the same hadoop to my laptop(but I don't run hadoop on my laptop), I have also set: HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop in spark-env.sh Thanks, xilan
Re: Protobuff 3.0 for Spark
I have used protobuf 3 successfully with Spark on CDH 5.4, even though Hadoop itself comes with protobuf 2.5. I think the steps apply to HDP too. You need to do the following 1. Set the below parameter spark.executor.userClassPathFirst=true spark.driver.userClassPathFirst=true 2. Include protobuf 3 jar file either through —jars during the spark-submit or package it into a uber jar file with your own classes. Lan > On Nov 4, 2015, at 4:07 PM, Cassa L wrote: > > Hi, > Does spark support protobuff 3.0? I used protobuff 2.5 with spark-1.4 built > for HDP 2.3. Given that protobuff has compatibility issues , want to know if > spark supports protbuff 3.0 > > LCassa - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Protobuff 3.0 for Spark
I have not run into any linkage problem, but maybe I was lucky. :-). The reason I wanted to use protobuf 3 is mainly for Map type support. On Thu, Nov 5, 2015 at 4:43 AM, Steve Loughran wrote: > > > On 5 Nov 2015, at 00:12, Lan Jiang wrote: > > > > I have used protobuf 3 successfully with Spark on CDH 5.4, even though > Hadoop itself comes with protobuf 2.5. I think the steps apply to HDP too. > You need to do the following > > > Protobuf.jar has been so brittle in the past that the entire hadoop stack > has effectively frozen @ 2.5, at least until another mass-coordinated > update across as many projects as possible. If you do update it locally, > you are highly likely to find linkage problems. I would strongly advise > staying with 2.5 unless there is some pressing need and you are happy to > take on all the pain yourself >
create a table for csv files
Hi, I have some csv file in HDFS with headers like col1, col2, col3, I want to add a column named id, so the a record would be How can I do this using Spark SQL ? Can id be auto increment ? Thanks, Xiaohe
SparkPi is geting java.lang.NoClassDefFoundError: scala/collection/Seq
Hi, I am trying to run SparkPi in Intellij and getting NoClassDefFoundError. Anyone else saw this issue before ? Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/Seq at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 6 more Process finished with exit code 1 Thanks, Xiaohe
Re: SparkPi is geting java.lang.NoClassDefFoundError: scala/collection/Seq
Yeah, lots of libraries needs to be changed to compile in order to run the examples in intellij. Thanks, Xiaohe On Mon, Aug 17, 2015 at 10:01 AM, Jeff Zhang wrote: > Check module example's dependency (right click examples and click Open > Modules Settings), by default scala-library is provided, you need to change > it to compile to run SparkPi in Intellij. As I remember, you also need to > change guava and jetty related library to compile too. > > On Mon, Aug 17, 2015 at 2:14 AM, xiaohe lan > wrote: > >> Hi, >> >> I am trying to run SparkPi in Intellij and getting NoClassDefFoundError. >> Anyone else saw this issue before ? >> >> Exception in thread "main" java.lang.NoClassDefFoundError: >> scala/collection/Seq >> at org.apache.spark.examples.SparkPi.main(SparkPi.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) >> Caused by: java.lang.ClassNotFoundException: scala.collection.Seq >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> ... 6 more >> >> Process finished with exit code 1 >> >> Thanks, >> Xiaohe >> > > > > -- > Best Regards > > Jeff Zhang >
add external jar file to Spark shell vs. Scala Shell
Hi, there I ran into a problem when I try to pass external jar file to spark-shell. I have a uber jar file that contains all the java codes I created for protobuf and all its dependency. If I simply execute my code using Scala Shell, it works fine without error. I use -cp to pass the external uber jar file here ./scala -cp ~/workspace/protobuf/my-app/target/my-app-1.0-SNAPSHOT-jar-with-dependencies.jar scala> import com.test.proto.Tr.MyProto import com.test.proto.Tr.MyProto scala> import java.nio.file.{Files, Paths} import java.nio.file.{Files, Paths} scala> val byteArray = Files.readAllBytes(Paths.get("/Users/ljiang/workspace/protobuf/my-app/myproto.pro")) byteArray: Array[Byte] = Array(10, -62, -91, 2, -86, 6, -108, 65, 8, 10, 18, -113, 65, -54, 12, -85, 46, -22, 18, -30, 10, 10, 2, 73, 78, 18, -37, 10, -118, 25, -52, 10, -22, 43, 15, 10, 1, 49, 18, ... scala> val myProto = MyProto.parseFrom(byteArray) Now the weird thing is that if I launched the spark-shell instead and execute the same code (Please note that I do not even using any SparkContext, RDD), it does not work. I use --jars option to pass the external jar file to spark-shell spark-shell --jars ~/workspace/protobuf/my-app/target/my-app-1.0-SNAPSHOT-jar-with-dependencies.jar scala> import com.test.proto.Tr.MyProto import com.test.proto.Tr.MyProto scala> import java.nio.file.{Files, Paths} import java.nio.file.{Files, Paths} scala> val byteArray = Files.readAllBytes(Paths.get("/Users/ljiang/workspace/protobuf/my-app/myproto.pro")) byteArray: Array[Byte] = Array(10, -62, -91, 2, -86, 6, -108, 65, 8, 10, 18, -113, 65, -54, 12, -85, 46, -22, 18, -30, 10, 10, 2, 73, 78, 18, -37, 10, -118, 25, -52, 10, -22, 43, 15, 10, 1, 49, 18, ... scala> val myProto = MyProto.parseFrom(byteArray) java.lang.NoSuchFieldError: unknownFields at com.test.proto.Tr$MyProto.(Tr.java) at com.test.proto.Tr$MyProto.(Tr.java) at com.test.proto.Tr$MyProto$1.parsePartialFrom(Tr.java) at com.test.proto.Tr$MyProto$1.parsePartialFrom(Tr.java) at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) at com.test.proto.Tr$MyProto.parseFrom(Tr.java) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:23) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30) at $iwC$$iwC$$iwC$$iwC$$iwC.(:32) at $iwC$$iwC$$iwC$$iwC.(:34) at $iwC$$iwC$$iwC.(:36) at $iwC$$iwC.(:38) at $iwC.(:40) at (:42) at .(:46) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMe
Change protobuf version or any other third party library version in Spark application
Hi, there, I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by default. However, I would like to use Protobuf 3 in my spark application so that I can use some new features such as Map support. Is there anyway to do that? Right now if I build a uber.jar with dependencies including protobuf 3 classes and pass to spark-shell through --jars option, during the execution, I got the error *java.lang.NoSuchFieldError: unknownFields. * Is there anyway to use a different version of Protobuf other than the default one included in the Spark distribution? I guess I can generalize and extend the question to any third party libraries. How to deal with version conflict for any third party libraries included in the Spark distribution? Thanks! Lan
Re: Change protobuf version or any other third party library version in Spark application
Steve, Thanks for the input. You are absolutely right. When I use protobuf 2.6.1, I also ran into method not defined errors. You suggest using Maven sharding strategy, but I have already built the uber jar to package all my custom classes and its dependencies including protobuf 3. The problem is how to configure spark shell to use my uber jar first. java8964 -- appreciate the link and I will try the configuration. Looks promising. However, the "user classpath first" attribute does not apply to spark-shell, am I correct? Lan On Tue, Sep 15, 2015 at 8:24 AM, java8964 wrote: > It is a bad idea to use the major version change of protobuf, as it most > likely won't work. > > But you really want to give it a try, set the "user classpath first", so > the protobuf 3 coming with your jar will be used. > > The setting depends on your deployment mode, check this for the parameter: > > https://issues.apache.org/jira/browse/SPARK-2996 > > Yong > > -- > Subject: Re: Change protobuf version or any other third party library > version in Spark application > From: ste...@hortonworks.com > To: ljia...@gmail.com > CC: user@spark.apache.org > Date: Tue, 15 Sep 2015 09:19:28 + > > > > > On 15 Sep 2015, at 05:47, Lan Jiang wrote: > > Hi, there, > > I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by > default. However, I would like to use Protobuf 3 in my spark application so > that I can use some new features such as Map support. Is there anyway to > do that? > > Right now if I build a uber.jar with dependencies including protobuf 3 > classes and pass to spark-shell through --jars option, during the > execution, I got the error *java.lang.NoSuchFieldError: unknownFields. * > > > > protobuf is an absolute nightmare version-wise, as protoc generates > incompatible java classes even across point versions. Hadoop 2.2+ is and > will always be protobuf 2.5 only; that applies transitively to downstream > projects (the great protobuf upgrade of 2013 was actually pushed by the > HBase team, and required a co-ordinated change across multiple projects) > > > Is there anyway to use a different version of Protobuf other than the > default one included in the Spark distribution? I guess I can generalize > and extend the question to any third party libraries. How to deal with > version conflict for any third party libraries included in the Spark > distribution? > > > maven shading is the strategy. Generally it is less needed, though the > troublesome binaries are, across the entire apache big data stack: > > google protobuf > google guava > kryo > jackson > > you can generally bump up the other versions, at least by point releases. >
Re: Change protobuf version or any other third party library version in Spark application
I used the --conf spark.files.userClassPathFirst=true in the spark-shell option, it still gave me the eror: java.lang.NoSuchFieldError: unknownFields if I use protobuf 3. The output says spark.files.userClassPathFirst is deprecated and suggest using spark.executor.userClassPathFirst. I tried that and it did not work either. Lan > On Sep 15, 2015, at 10:31 AM, java8964 wrote: > > If you use Standalone mode, just start spark-shell like following: > > spark-shell --jars your_uber_jar --conf spark.files.userClassPathFirst=true > > Yong > > Date: Tue, 15 Sep 2015 09:33:40 -0500 > Subject: Re: Change protobuf version or any other third party library version > in Spark application > From: ljia...@gmail.com > To: java8...@hotmail.com > CC: ste...@hortonworks.com; user@spark.apache.org > > Steve, > > Thanks for the input. You are absolutely right. When I use protobuf 2.6.1, I > also ran into method not defined errors. You suggest using Maven sharding > strategy, but I have already built the uber jar to package all my custom > classes and its dependencies including protobuf 3. The problem is how to > configure spark shell to use my uber jar first. > > java8964 -- appreciate the link and I will try the configuration. Looks > promising. However, the "user classpath first" attribute does not apply to > spark-shell, am I correct? > > Lan > > On Tue, Sep 15, 2015 at 8:24 AM, java8964 <mailto:java8...@hotmail.com>> wrote: > It is a bad idea to use the major version change of protobuf, as it most > likely won't work. > > But you really want to give it a try, set the "user classpath first", so the > protobuf 3 coming with your jar will be used. > > The setting depends on your deployment mode, check this for the parameter: > > https://issues.apache.org/jira/browse/SPARK-2996 > <https://issues.apache.org/jira/browse/SPARK-2996> > > Yong > > Subject: Re: Change protobuf version or any other third party library version > in Spark application > From: ste...@hortonworks.com <mailto:ste...@hortonworks.com> > To: ljia...@gmail.com <mailto:ljia...@gmail.com> > CC: user@spark.apache.org <mailto:user@spark.apache.org> > Date: Tue, 15 Sep 2015 09:19:28 + > > > > > On 15 Sep 2015, at 05:47, Lan Jiang <mailto:ljia...@gmail.com>> wrote: > > Hi, there, > > I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by > default. However, I would like to use Protobuf 3 in my spark application so > that I can use some new features such as Map support. Is there anyway to do > that? > > Right now if I build a uber.jar with dependencies including protobuf 3 > classes and pass to spark-shell through --jars option, during the execution, > I got the error java.lang.NoSuchFieldError: unknownFields. > > > protobuf is an absolute nightmare version-wise, as protoc generates > incompatible java classes even across point versions. Hadoop 2.2+ is and will > always be protobuf 2.5 only; that applies transitively to downstream projects > (the great protobuf upgrade of 2013 was actually pushed by the HBase team, > and required a co-ordinated change across multiple projects) > > > Is there anyway to use a different version of Protobuf other than the default > one included in the Spark distribution? I guess I can generalize and extend > the question to any third party libraries. How to deal with version conflict > for any third party libraries included in the Spark distribution? > > maven shading is the strategy. Generally it is less needed, though the > troublesome binaries are, across the entire apache big data stack: > > google protobuf > google guava > kryo > jackson > > you can generally bump up the other versions, at least by point releases.
Re: Change protobuf version or any other third party library version in Spark application
I am happy to report that after set spark.dirver.userClassPathFirst, I can use protobuf 3 with spark-shell. Looks like the classloading issue in the driver, not executor. Marcelo, thank you very much for the tip! Lan > On Sep 15, 2015, at 1:40 PM, Marcelo Vanzin wrote: > > Hi, > > Just "spark.executor.userClassPathFirst" is not enough. You should > also set "spark.driver.userClassPathFirst". Also not that I don't > think this was really tested with the shell, but that should work with > regular apps started using spark-submit. > > If that doesn't work, I'd recommend shading, as others already have. > > On Tue, Sep 15, 2015 at 9:19 AM, Lan Jiang wrote: >> I used the --conf spark.files.userClassPathFirst=true in the spark-shell >> option, it still gave me the eror: java.lang.NoSuchFieldError: unknownFields >> if I use protobuf 3. >> >> The output says spark.files.userClassPathFirst is deprecated and suggest >> using spark.executor.userClassPathFirst. I tried that and it did not work >> either. > > -- > Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming proactive monitoring
Hi, there From the Spark UI, we can monitor the following two metrics: • Processing Time - The time to process each batch of data. • Scheduling Delay - the time a batch waits in a queue for the processing of previous batches to finish. However, what is the best way to monitor them proactively? For example, if processing time/scheduling delay exceed certain threshold, send alert to the admin/developer? Lan - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Does monotonically_increasing_id generates the same id even when executor fails or being evicted out of memory
Hi, there I am trying to generate unique ID for each record in a dataframe, so that I can save the dataframe to a relational database table. My question is that when the dataframe is regenerated due to executor failure or being evicted out of cache, does the ID keeps the same as before? According to the document: *The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. * I assume the partition id stays the same after the regeneration. But what about the record number within each partition? My code is like below: import org.apache.spark.sql.functions._ val df1=(1 to 1000).toDF.repartition(4) val df2 = df1.withColumn("id", monotonically_increasing_id).cache df2.show df2.show I executed it several times and it seems to generate the same ID for each specific record, but I am not sure that proves that it will generate the same ID for every scenario. BTW, I am aware of the shortcoming of monotonically_increasing_id in Spark 1.6, explained in https://issues.apache.org/jira/browse/SPARK-14241, which is fixed in 2.0. Lan
BinaryClassificationMetrics only supports AreaUnderPR and AreaUnderROC?
I realized that in the Spark ML, BinaryClassifcationMetrics only supports AreaUnderPR and AreaUnderROC. Why is that? I What if I need other metrics such as F-score, accuracy? I tried to use MulticlassClassificationEvaluator to evaluate other metrics such as Accuracy for a binary classification problem and it seems working. But I am not sure if there is any issue using MulticlassClassificationEvaluator for a binary classification. According to the Spark ML documentation "The Evaluator can be a RegressionEvaluator for regression problems, *a BinaryClassificationEvaluator for binary data, or a MulticlassClassificationEvaluator for multiclass problems*. " https://spark.apache.org/docs/2.1.0/ml-tuning.html Can someone shed some lights on the issue? Lan
unsubscribe
-- Lan Jiang https://hpi.de/naumann/people/lan-jiang Hasso-Plattner-Institut an der Universität Potsdam Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 280
Re: Configuring logging properties for executor
Rename your log4j_special.properties file as log4j.properties and place it under the root of your jar file, you should be fine. If you are using Maven to build your jar, please the log4j.properties in the src/main/resources folder. However, please note that if you have other dependency jar file in the classpath that contains another log4j.properties file this way, it might not work since the first log4j.properties file that is loaded will be used. You can also do spark-submit —file log4j_special.properties … ,which should transfer your log4j property file to the worker nodes automatically without you copying them manually. Lan > On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev wrote: > > Hi all, > > I need to configure spark executor log4j.properties on a standalone cluster. > It looks like placing the relevant properties file in the spark > configuration folder and setting the spark.executor.extraJavaOptions from > my application code: > sparkConf.set("spark.executor.extraJavaOptions", > "-Dlog4j.configuration=log4j_special.properties"); > does the work, and the executor logs are written in the required place and > level. As far as I understand, it works, because the spark configuration > folder is on the class path, and passing parameter without path works here. > However, I would like to avoid deploying these properties to each worker > spark configuration folder. > I wonder, if I put the properties in my application jar, is there any way of > telling executor to load them? > > Thanks, > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring logging properties for executor
Each application gets its own executor processes, so there should be no problem running them in parallel. Lan > On Apr 20, 2015, at 10:25 AM, Michael Ryabtsev wrote: > > Hi Lan, > > Thanks for fast response. It could be a solution if it works. I have more > than one log4 properties file, for different run modes like debug/production, > for executor and for application core. I think I would like to keep them > separate. Then, I suppose I should give all other properties files a special > names and keep the executor configuration with the default name? Can I > conclude that going this way I will not be able to run several applications > on the same cluster in parallel? > > Regarding submit, I am not using it now, I submit from the code, but I think > I should consider this option. > > Thanks. > > On Mon, Apr 20, 2015 at 5:59 PM, Lan Jiang <mailto:ljia...@gmail.com>> wrote: > Rename your log4j_special.properties file as log4j.properties and place it > under the root of your jar file, you should be fine. > > If you are using Maven to build your jar, please the log4j.properties in the > src/main/resources folder. > > However, please note that if you have other dependency jar file in the > classpath that contains another log4j.properties file this way, it might not > work since the first log4j.properties file that is loaded will be used. > > You can also do spark-submit —file log4j_special.properties … ,which should > transfer your log4j property file to the worker nodes automatically without > you copying them manually. > > Lan > > > > On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev > <mailto:michael...@gmail.com>> wrote: > > > > Hi all, > > > > I need to configure spark executor log4j.properties on a standalone cluster. > > It looks like placing the relevant properties file in the spark > > configuration folder and setting the spark.executor.extraJavaOptions from > > my application code: > > sparkConf.set("spark.executor.extraJavaOptions", > > "-Dlog4j.configuration=log4j_special.properties"); > > does the work, and the executor logs are written in the required place and > > level. As far as I understand, it works, because the spark configuration > > folder is on the class path, and passing parameter without path works here. > > However, I would like to avoid deploying these properties to each worker > > spark configuration folder. > > I wonder, if I put the properties in my application jar, is there any way of > > telling executor to load them? > > > > Thanks, > > > > > > > > -- > > View this message in context: > > http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html > > > > <http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html> > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > <mailto:user-unsubscr...@spark.apache.org> > > For additional commands, e-mail: user-h...@spark.apache.org > > <mailto:user-h...@spark.apache.org> > > > >
Re: HiveContext vs SQLContext
Daniel, HiveContext is a subclass of SQLContext, thus offers a superset of features not available in SQLContext, such as access to Hive UDF, Hive table, Hive Serde, etc. This does not change in 1.3.1. Quote from 1.3.1 documentation “… using HiveContext is recommended for the 1.3 release of Spark. Future releases will focus on bringing SQLContext up to feature parity with a HiveContext.” Lan > On Apr 20, 2015, at 4:17 PM, Daniel Mahler wrote: > > Is HiveContext still preferred over SQLContext? > What are the current (1.3.1) diferences between them? > > thanks > Daniel
Re: Scheduling across applications - Need suggestion
YARN capacity scheduler support hierarchical queues, which you can assign cluster resource as percentage. Your spark application/shell can be submitted to different queues. Mesos supports fine-grained mode, which allows the machines/cores used each executors ramp up and down. Lan On Wed, Apr 22, 2015 at 2:32 PM, yana wrote: > Yes. Fair schedulwr only helps concurrency within an application. With > multiple shells you'd either need something like Yarn/Mesos or careful math > on resources as you said > > > Sent on the new Sprint Network from my Samsung Galaxy S®4. > > > Original message > From: Arun Patel > Date:04/22/2015 6:28 AM (GMT-05:00) > To: user > Subject: Scheduling across applications - Need suggestion > > I believe we can use the properties like --executor-memory > --total-executor-cores to configure the resources allocated for each > application. But, in a multi user environment, shells and applications are > being submitted by multiple users at the same time. All users are > requesting resources with different properties. At times, some users are > not getting resources of the cluster. > > > How to control resource usage in this case? Please share any best > practices followed. > > > As per my understanding, Fair scheduler can used for scheduling tasks > within an application but not across multiple applications. Is this > correct? > > > Regards, > > Arun >
How to install spark in spark on yarn mode
Hi experts, I see spark on yarn has yarn-client and yarn-cluster mode. I also have a 5 nodes hadoop cluster (hadoop 2.4). How to install spark if I want to try the spark on yarn mode. Do I need to install spark on the each node of hadoop cluster ? Thanks, Xiaohe
Re: How to install spark in spark on yarn mode
Hi Madhvi, If I only install spark on one node, and use spark-submit to run an application, which are the Worker nodes? Any where are the executors ? Thanks, Xiaohe On Thu, Apr 30, 2015 at 12:52 PM, madhvi wrote: > Hi, > Follow the instructions to install on the following link: > http://mbonaci.github.io/mbo-spark/ > You dont need to install spark on every node.Just install it on one node > or you can install it on remote system also and made a spark cluster. > Thanks > Madhvi > > On Thursday 30 April 2015 09:31 AM, xiaohe lan wrote: > >> Hi experts, >> >> I see spark on yarn has yarn-client and yarn-cluster mode. I also have a >> 5 nodes hadoop cluster (hadoop 2.4). How to install spark if I want to try >> the spark on yarn mode. >> >> Do I need to install spark on the each node of hadoop cluster ? >> >> Thanks, >> Xiaohe >> > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
number of executors
Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
println in spark-shell
Hi, When I start spark shell by passing yarn to master option, println does not print elements in RDD: bash-4.1$ spark-shell --master yarn 15/05/17 01:50:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala> val lines = sc.parallelize(List("hello world", "hi")) lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at :21 scala> lines.first() res1: String = hello world scala> lines.foreach(println) scala> If I start spark shell in local mode, the elements are printed. What's the difference here ? Thanks, Xiaohe
Re: number of executors
bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das wrote: > Did you try --executor-cores param? While you submit the job, do a ps aux > | grep spark-submit and see the exact command parameters. > > Thanks > Best Regards > > On Sat, May 16, 2015 at 12:31 PM, xiaohe lan > wrote: > >> Hi, >> >> I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. >> >> spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar >> --class scala.SimpleApp --num-executors 5 >> >> I have set the number of executor to 5, but from sparkui I could see only >> two executors and it ran very slow. What did I miss ? >> >> Thanks, >> Xiaohe >> > >
Re: number of executors
Sorry, them both are assigned task actually. Aggregated Metrics by Executor Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8 MB On Sun, May 17, 2015 at 11:50 PM, xiaohe lan wrote: > bash-4.1$ ps aux | grep SparkSubmit > xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 > /scratch/xilan/jdk1.8.0_45/bin/java -cp > /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop > -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn > target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp > --num-executors 5 --executor-cores 4 > xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep > --color SparkSubmit > > > When look at the sparkui, I see the following: > Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed > TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB > / 28089782host2:49970 ms00063.4 MB / 1810945 > > So executor 2 is not even assigned a task ? Maybe I have some problems in > my setting, but I don't know what could be the possible settings I set > wrong or have not set. > > > Thanks, > Xiaohe > > On Sun, May 17, 2015 at 11:16 PM, Akhil Das > wrote: > >> Did you try --executor-cores param? While you submit the job, do a ps aux >> | grep spark-submit and see the exact command parameters. >> >> Thanks >> Best Regards >> >> On Sat, May 16, 2015 at 12:31 PM, xiaohe lan >> wrote: >> >>> Hi, >>> >>> I have a 5 nodes yarn cluster, I used spark-submit to submit a simple >>> app. >>> >>> spark-submit --master yarn >>> target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp >>> --num-executors 5 >>> >>> I have set the number of executor to 5, but from sparkui I could see >>> only two executors and it ran very slow. What did I miss ? >>> >>> Thanks, >>> Xiaohe >>> >> >> >
Re: number of executors
Hi Sandy, Thanks for your information. Yes, spark-submit --master yarn --num-executors 5 --executor-cores 4 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is working awesomely. Is there any documentations pointing to this ? Thanks, Xiaohe On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza wrote: > Hi Xiaohe, > > The all Spark options must go before the jar or they won't take effect. > > -Sandy > > On Sun, May 17, 2015 at 8:59 AM, xiaohe lan > wrote: > >> Sorry, them both are assigned task actually. >> >> Aggregated Metrics by Executor >> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput >> Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle >> Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 >> MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 >> MB304.8 MB >> >> On Sun, May 17, 2015 at 11:50 PM, xiaohe lan >> wrote: >> >>> bash-4.1$ ps aux | grep SparkSubmit >>> xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 >>> /scratch/xilan/jdk1.8.0_45/bin/java -cp >>> /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop >>> -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn >>> target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp >>> --num-executors 5 --executor-cores 4 >>> xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep >>> --color SparkSubmit >>> >>> >>> When look at the sparkui, I see the following: >>> Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed >>> TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 >>> MB / 28089782host2:49970 ms00063.4 MB / 1810945 >>> >>> So executor 2 is not even assigned a task ? Maybe I have some problems >>> in my setting, but I don't know what could be the possible settings I set >>> wrong or have not set. >>> >>> >>> Thanks, >>> Xiaohe >>> >>> On Sun, May 17, 2015 at 11:16 PM, Akhil Das >>> wrote: >>> >>>> Did you try --executor-cores param? While you submit the job, do a ps >>>> aux | grep spark-submit and see the exact command parameters. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Sat, May 16, 2015 at 12:31 PM, xiaohe lan >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have a 5 nodes yarn cluster, I used spark-submit to submit a simple >>>>> app. >>>>> >>>>> spark-submit --master yarn >>>>> target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp >>>>> --num-executors 5 >>>>> >>>>> I have set the number of executor to 5, but from sparkui I could see >>>>> only two executors and it ran very slow. What did I miss ? >>>>> >>>>> Thanks, >>>>> Xiaohe >>>>> >>>> >>>> >>> >> >
Re: number of executors
Yeah, I read that page before, but it does not mention the options should come before the application jar. Actually, if I put the --class option before the application jar, I will get ClassNotFoundException. Anyway, thanks again Sandy. On Tue, May 19, 2015 at 11:06 AM, Sandy Ryza wrote: > Awesome! > > It's documented here: > https://spark.apache.org/docs/latest/submitting-applications.html > > -Sandy > > On Mon, May 18, 2015 at 8:03 PM, xiaohe lan > wrote: > >> Hi Sandy, >> >> Thanks for your information. Yes, spark-submit --master yarn >> --num-executors 5 --executor-cores 4 >> target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is >> working awesomely. Is there any documentations pointing to this ? >> >> Thanks, >> Xiaohe >> >> On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza >> wrote: >> >>> Hi Xiaohe, >>> >>> The all Spark options must go before the jar or they won't take effect. >>> >>> -Sandy >>> >>> On Sun, May 17, 2015 at 8:59 AM, xiaohe lan >>> wrote: >>> >>>> Sorry, them both are assigned task actually. >>>> >>>> Aggregated Metrics by Executor >>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput >>>> Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle >>>> Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / >>>> 121007701630.4 >>>> MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / >>>> 109269121646.6 >>>> MB304.8 MB >>>> >>>> On Sun, May 17, 2015 at 11:50 PM, xiaohe lan >>>> wrote: >>>> >>>>> bash-4.1$ ps aux | grep SparkSubmit >>>>> xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 >>>>> /scratch/xilan/jdk1.8.0_45/bin/java -cp >>>>> /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop >>>>> -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn >>>>> target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp >>>>> --num-executors 5 --executor-cores 4 >>>>> xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep >>>>> --color SparkSubmit >>>>> >>>>> >>>>> When look at the sparkui, I see the following: >>>>> Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed >>>>> TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 >>>>> MB / 28089782host2:49970 ms00063.4 MB / 1810945 >>>>> >>>>> So executor 2 is not even assigned a task ? Maybe I have some problems >>>>> in my setting, but I don't know what could be the possible settings I set >>>>> wrong or have not set. >>>>> >>>>> >>>>> Thanks, >>>>> Xiaohe >>>>> >>>>> On Sun, May 17, 2015 at 11:16 PM, Akhil Das < >>>>> ak...@sigmoidanalytics.com> wrote: >>>>> >>>>>> Did you try --executor-cores param? While you submit the job, do a ps >>>>>> aux | grep spark-submit and see the exact command parameters. >>>>>> >>>>>> Thanks >>>>>> Best Regards >>>>>> >>>>>> On Sat, May 16, 2015 at 12:31 PM, xiaohe lan >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I have a 5 nodes yarn cluster, I used spark-submit to submit a >>>>>>> simple app. >>>>>>> >>>>>>> spark-submit --master yarn >>>>>>> target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp >>>>>>> --num-executors 5 >>>>>>> >>>>>>> I have set the number of executor to 5, but from sparkui I could see >>>>>>> only two executors and it ran very slow. What did I miss ? >>>>>>> >>>>>>> Thanks, >>>>>>> Xiaohe >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
Do existing R packages work with SparkR data frames
Hello, Is it possible for existing R Machine Learning packages (which work with R data frames) such as bnlearn, to work with SparkR data frames? Or do I need to convert SparkR data frames to R data frames? Is "collect" the function to do the conversion, or how else to do that? Many Thanks, Lan
Why is Columnar Parquet used as default for saving Row-based DataFrames/RDD?
Hello, I have the above naive question if anyone could help. Why not using a Row-based File format to save Row-based DataFrames/RDD? Thanks, Lan