Spark thriff server hiveStatement.getQueryLog return empty
Spark thriff server hiveStatement.getQueryLog return empty?
Re: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
Hi, The error message indicates that a Streaming Context object end up in the fields of the closure that Spark tries to serialize. Could you show us the enclosing function and component ? The workarounds proposed in the following stack overflow reply might help you to fix the problem: http://stackoverflow.com/a/30094847 On Sat, Mar 11, 2017 at 3:10 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote: > i think the val you defined are only valid in the driver, > you can try boardcast variable. > > ---Original--- > *From:* "lk_spark" > *Date:* 2017/2/27 11:14:23 > *To:* "user.spark"; > *Subject:* java.io.NotSerializableException: org.apache.spark.streaming. > StreamingContext > > hi,all: >I want to extract some info from kafka useing sparkstream,my code > like : > > val keyword = "" > val system = "dmp" > val datetime_idx = 0 > val datetime_length = 23 > val logLevelBeginIdx = datetime_length + 2 - 1 > val logLevelMaxLenght = 5 > > val lines = messages.filter(record => record.value().matches("\\d{4} > .*")).map(record => { > val assembly = record.topic() > val value = record.value > val datatime = value.substring(datetime_idx, datetime_length - 1) > val level = value.substring(logLevelBeginIdx, logLevelBeginIdx + > logLevelMaxLenght - 1) > (assembly,value,datatime,level) > }) > > I will get error : > Caused by: java.io.NotSerializableException: > org.apache.spark.streaming.StreamingContext > Serialization stack: > - object not serializable (class: > org.apache.spark.streaming.StreamingContext, > value: org.apache.spark.streaming.StreamingContext@5a457aa1) > - field (class: $iw, name: streamingContext, type: class > org.apache.spark.streaming.StreamingContext) > - object (class $iw, $iw@38eb2140) > - field (class: $iw, name: $iw, type: class $iw) > - object (class $iw, $iw@2a3ced3d) > - field (class: $iw, name: $iw, type: class $iw) > - object (class $iw, $iw@7c5dbca5) > > > == > if I change the parameter to constant I will not got error : > > val lines = messages.filter(record => record.value().matches("\\d{4} > .*")).map(record => { > val assembly = record.topic() > val value = record.value > val datatime = value.substring(0, 22) > val level = value.substring(24, 27) > (assembly,value,datatime,level) > > }) > > how can I pass parameter to the map function. > > 2017-02-27 > -- > lk_spark >
Re: PySpark Serialization/Deserialization (Pickling) Overhead
Yeoul, I think a you can run an microbench for pyspark serialization/deserialization would be to run a withColumn + a python udf that returns a constant and compare that with similar code in Scala. I am not sure if there is way to measure just the serialization code, because pyspark API only allows you apply a python function over the data frame so that always involve running a for loop in python over the data. You probably need to some hacking to make it just do the serialization. Maybe other people have more insights? Li On Tue, Mar 7, 2017 at 9:18 PM Yeoul Na wrote: Hi all, I am trying to analyze PySpark performance overhead. People just say PySpark is slower than Scala due to the Serialization/Deserialization overhead. I tried with the example in this post: https://0x0fff.com/spark-dataframes-are-faster-arent-they/. This and many articles say straight-forward Python implementation is the slowest due to the serialization/deserialization overhead. However, when I actually looked at the log in the Web UI, serialization and deserialization time of PySpark do not seem to be any bigger than that of Scala. The main contributor was "Executor Computing Time". Thus, we cannot sure whether this is due to serialization or because Python code is basically slower than Scala code. So my question is that does "Task Deserialization Time" in Spark WebUI actually include serialization/deserialization times in PySpark? If this is not the case, how can I actually measure the serialization/deserialization overhead? Thanks, Yeoul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Serialization-Deserialization-Pickling-Overhead-tp28468.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark join over sorted columns of dataset.
I am not an expert on this but here is what I think: Catalyst maintains information on whether a plan node is ordered. If your dataframe is a result of a order by, catalyst will skip the sorting when it does merge sort join. If you dataframe is created from storage, for instance. ParquetRelation, then I am not sure if there is an API that allows user to tell Catalyst that the ParquetRelation is ordered on column x. If there isn't, it's probably useful to add. Li On Fri, Mar 3, 2017 at 11:23 AM Koert Kuipers wrote: > For RDD the shuffle is already skipped but the sort is not. In > spark-sorted we track partitioning and sorting within partitions for > key-value RDDs and can avoid the sort. See: > https://github.com/tresata/spark-sorted > > For Dataset/DataFrame such optimizations are done automatically, however > it's currently not always working for Dataset, see: > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-19468 > > On Mar 3, 2017 11:06 AM, "Rohit Verma" wrote: > > Sending it to dev’s. > Can you please help me providing some ideas for below. > > Regards > Rohit > > On Feb 23, 2017, at 3:47 PM, Rohit Verma > wrote: > > > > Hi > > > > While joining two columns of different dataset, how to optimize join if > both the columns are pre sorted within the dataset. > > So that when spark do sort merge join the sorting phase can skipped. > > > > Regards > > Rohit > > >
Differences between scikit-learn and Spark.ml for regression toy problem
(this was also posted to stackoverflow on 03/10) I am setting up a very simple logistic regression problem in scikit-learn and in spark.ml, and the results diverge: the models they learn are different, but I can't figure out why (data is the same, model type is the same, regularization is the same...). No doubt I am missing some setting on one side or the other. Which setting? How should I set up either scikit or spark.ml to find the same model as its counterpart? I give the sklearn code and spark.ml code below. Both should be ready to cut-and-paste and run. scikit-learn code: -- import numpy as np from sklearn.linear_model import LogisticRegression, Ridge X = np.array([ [-0.7306653538519616, 0.0], [0.6750417712898752, -0.4232874171873786], [0.1863463229359709, -0.8163423997075965], [-0.6719842051493347, 0.0], [0.9699938346531928, 0.0], [0.22759406190283604, 0.0], [0.9688721028330911, 0.0], [0.5993795346650845, 0.0], [0.9219423508390701, -0.8972778242305388], [0.7006904841584055, -0.5607635619919824] ]) y = np.array([ 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0 ]) m, n = X.shape # Add intercept term to simulate inputs to GameEstimator X_with_intercept = np.hstack((X, np.ones(m)[:,np.newaxis])) l = 0.3 e = LogisticRegression( fit_intercept=False, penalty='l2', C=1/l, max_iter=100, tol=1e-11) e.fit(X_with_intercept, y) print e.coef_ # => [[ 0.98662189 0.45571052 -0.23467255]] # Linear regression is called Ridge in sklearn e = Ridge( fit_intercept=False, alpha=l, max_iter=100, tol=1e-11) e.fit(X_with_intercept, y) print e.coef_ # =>[ 0.32155545 0.17904355 0.41222418] spark.ml code: --- import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.sql.SQLContext object TestSparkRegression { def main(args: Array[String]): Unit = { import org.apache.log4j.{Level, Logger} Logger.getLogger("org").setLevel(Level.OFF) Logger.getLogger("akka").setLevel(Level.OFF) val conf = new SparkConf().setAppName("test").setMaster("local") val sc = new SparkContext(conf) val sparkTrainingData = new SQLContext(sc) .createDataFrame(Seq( LabeledPoint(0.0, Vectors.dense(-0.7306653538519616, 0.0)), LabeledPoint(1.0, Vectors.dense(0.6750417712898752, -0.4232874171873786)), LabeledPoint(1.0, Vectors.dense(0.1863463229359709, -0.8163423997075965)), LabeledPoint(0.0, Vectors.dense(-0.6719842051493347, 0.0)), LabeledPoint(1.0, Vectors.dense(0.9699938346531928, 0.0)), LabeledPoint(1.0, Vectors.dense(0.22759406190283604, 0.0)), LabeledPoint(1.0, Vectors.dense(0.9688721028330911, 0.0)), LabeledPoint(0.0, Vectors.dense(0.5993795346650845, 0.0)), LabeledPoint(0.0, Vectors.dense(0.9219423508390701, -0.8972778242305388)), LabeledPoint(0.0, Vectors.dense(0.7006904841584055, -0.5607635619919824 .toDF("label", "features") val logisticModel = new LogisticRegression() .setRegParam(0.3) .setLabelCol("label") .setFeaturesCol("features") .fit(sparkTrainingData) println(s"Spark logistic model coefficients: ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}") // Spark logistic model coefficients: [0.5451588538376263,0.26740606573584713] Intercept: -0.13897955358689987 val linearModel = new LinearRegression() .setRegParam(0.3) .setLabelCol("label") .setFeaturesCol("features") .setSolver("l-bfgs") .fit(sparkTrainingData) println(s"Spark linear model coefficients: ${linearModel.coefficients} Intercept: ${linearModel.intercept}") // Spark linear model coefficients: [0.19852664861346023,0.11501200541407802] Intercept: 0.45464906876832323 sc.stop() } } Thanks, Frank
spark-streaming stopping
I am not able to stop Spark-streaming job. Let me explain briefly * getting data from Kafka topic * splitting data to create a JavaRDD * mapping the JavaRDD to JavaPairRDD to do a reduceByKey transformation * writing the JavaPairRDD into the C* DB // something going wrong here the message in the Kafka topic is exhausted but still the program is running, the staging is happening though there is no data from Kafka, so when I tried to kill the program manually there was no output into the database C*.