Spark thriff server hiveStatement.getQueryLog return empty

2017-03-12 Thread 李斌松
Spark thriff server hiveStatement.getQueryLog return empty?


Re: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext

2017-03-12 Thread Lysiane Bouchard
Hi,

The error message indicates that a Streaming Context object end up in the
fields of the closure that Spark tries to serialize.

Could you show us the enclosing function and component ?

The workarounds proposed in the following stack overflow reply might help
you to fix the problem:
  http://stackoverflow.com/a/30094847



On Sat, Mar 11, 2017 at 3:10 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> i think the val you defined are only valid in the driver,
> you can  try boardcast variable.
>
> ---Original---
> *From:* "lk_spark"
> *Date:* 2017/2/27 11:14:23
> *To:* "user.spark";
> *Subject:* java.io.NotSerializableException: org.apache.spark.streaming.
> StreamingContext
>
> hi,all:
>I want to extract some info from kafka useing sparkstream,my code
> like :
>
> val keyword = ""
> val system = "dmp"
> val datetime_idx = 0
> val datetime_length = 23
> val logLevelBeginIdx = datetime_length + 2 - 1
> val logLevelMaxLenght = 5
>
> val lines = messages.filter(record => record.value().matches("\\d{4}
> .*")).map(record => {
>   val assembly = record.topic()
>   val value = record.value
>   val datatime = value.substring(datetime_idx, datetime_length - 1)
>   val level = value.substring(logLevelBeginIdx, logLevelBeginIdx +
> logLevelMaxLenght - 1)
>   (assembly,value,datatime,level)
> })
>
> I will get error :
> Caused by: java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
> Serialization stack:
>  - object not serializable (class: 
> org.apache.spark.streaming.StreamingContext,
> value: org.apache.spark.streaming.StreamingContext@5a457aa1)
>  - field (class: $iw, name: streamingContext, type: class
> org.apache.spark.streaming.StreamingContext)
>  - object (class $iw, $iw@38eb2140)
>  - field (class: $iw, name: $iw, type: class $iw)
>  - object (class $iw, $iw@2a3ced3d)
>  - field (class: $iw, name: $iw, type: class $iw)
>  - object (class $iw, $iw@7c5dbca5)
> 
> 
> ==
>   if I change the parameter to constant I will not got error :
>
>   val lines = messages.filter(record => record.value().matches("\\d{4}
> .*")).map(record => {
>   val assembly = record.topic()
>   val value = record.value
>   val datatime = value.substring(0, 22)
>   val level = value.substring(24, 27)
>   (assembly,value,datatime,level)
>
> })
>
> how can I pass parameter to the map function.
>
> 2017-02-27
> --
> lk_spark
>


Re: PySpark Serialization/Deserialization (Pickling) Overhead

2017-03-12 Thread Li Jin
Yeoul,

I think a you can run an microbench for pyspark
serialization/deserialization would be to run a withColumn + a python udf
that returns a constant and compare that with similar code in
Scala.

I am not sure if there is way to measure just the serialization code,
because pyspark API only allows you apply a python function over the data
frame so that always involve running a for loop in python over the data.
You probably need to some hacking to make it just do the serialization.

Maybe other people have more insights?

Li


On Tue, Mar 7, 2017 at 9:18 PM Yeoul Na  wrote:


Hi all,

I am trying to analyze PySpark performance overhead. People just say PySpark
is slower than Scala due to the Serialization/Deserialization overhead. I
tried with the example in this post:
https://0x0fff.com/spark-dataframes-are-faster-arent-they/. This and many
articles say straight-forward Python implementation is the slowest due to
the serialization/deserialization overhead.

However, when I actually looked at the log in the Web UI, serialization and
deserialization time of PySpark do not seem to be any bigger than that of
Scala. The main contributor was "Executor Computing Time". Thus, we cannot
sure whether this is due to serialization or because Python code is
basically slower than Scala code.

So my question is that does "Task Deserialization Time" in Spark WebUI
actually include serialization/deserialization times in PySpark? If this is
not the case, how can I actually measure the serialization/deserialization
overhead?

Thanks,
Yeoul



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Serialization-Deserialization-Pickling-Overhead-tp28468.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Spark join over sorted columns of dataset.

2017-03-12 Thread Li Jin
I am not an expert on this but here is what I think:

Catalyst maintains information on whether a plan node is ordered. If your
dataframe is a result of a order by, catalyst will skip the sorting when it
does merge sort join. If you dataframe is created from storage, for
instance. ParquetRelation, then I am not sure if there is an API that
allows user to tell Catalyst that the ParquetRelation is ordered on column
x. If there isn't, it's probably useful to add.

Li
On Fri, Mar 3, 2017 at 11:23 AM Koert Kuipers  wrote:

> For RDD the shuffle is already skipped but the sort is not. In
> spark-sorted we track partitioning and sorting within partitions for
> key-value RDDs and can avoid the sort. See:
> https://github.com/tresata/spark-sorted
>
> For Dataset/DataFrame such optimizations are done automatically, however
> it's currently not always working for Dataset, see:
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-19468
>
> On Mar 3, 2017 11:06 AM, "Rohit Verma"  wrote:
>
> Sending it to dev’s.
> Can you please help me providing some ideas for below.
>
> Regards
> Rohit
> > On Feb 23, 2017, at 3:47 PM, Rohit Verma 
> wrote:
> >
> > Hi
> >
> > While joining two columns of different dataset, how to optimize join if
> both the columns are pre sorted within the dataset.
> > So that when spark do sort merge join the sorting phase can skipped.
> >
> > Regards
> > Rohit
>
>
>


Differences between scikit-learn and Spark.ml for regression toy problem

2017-03-12 Thread Frank Astier
(this was also posted to stackoverflow on 03/10)

I am setting up a very simple logistic regression problem in scikit-learn
and in spark.ml, and the results diverge: the models they learn are
different, but I can't figure out why (data is the same, model type is the
same, regularization is the same...).

No doubt I am missing some setting on one side or the other. Which setting?
How should I set up either scikit or spark.ml to find the same model as its
counterpart?

I give the sklearn code and spark.ml code below. Both should be ready to
cut-and-paste and run.

scikit-learn code:
--

import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge

X = np.array([
[-0.7306653538519616, 0.0],
[0.6750417712898752, -0.4232874171873786],
[0.1863463229359709, -0.8163423997075965],
[-0.6719842051493347, 0.0],
[0.9699938346531928, 0.0],
[0.22759406190283604, 0.0],
[0.9688721028330911, 0.0],
[0.5993795346650845, 0.0],
[0.9219423508390701, -0.8972778242305388],
[0.7006904841584055, -0.5607635619919824]
])

y = np.array([
0.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
0.0
])

m, n = X.shape

# Add intercept term to simulate inputs to GameEstimator
X_with_intercept = np.hstack((X, np.ones(m)[:,np.newaxis]))

l = 0.3
e = LogisticRegression(
fit_intercept=False,
penalty='l2',
C=1/l,
max_iter=100,
tol=1e-11)

e.fit(X_with_intercept, y)

print e.coef_
# => [[ 0.98662189  0.45571052 -0.23467255]]

# Linear regression is called Ridge in sklearn
e = Ridge(
fit_intercept=False,
alpha=l,
max_iter=100,
tol=1e-11)

e.fit(X_with_intercept, y)

print e.coef_
# =>[ 0.32155545  0.17904355  0.41222418]

spark.ml code:
---

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.SQLContext

object TestSparkRegression {
  def main(args: Array[String]): Unit = {
import org.apache.log4j.{Level, Logger}

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)

val sparkTrainingData = new SQLContext(sc)
  .createDataFrame(Seq(
LabeledPoint(0.0, Vectors.dense(-0.7306653538519616, 0.0)),
LabeledPoint(1.0, Vectors.dense(0.6750417712898752,
-0.4232874171873786)),
LabeledPoint(1.0, Vectors.dense(0.1863463229359709,
-0.8163423997075965)),
LabeledPoint(0.0, Vectors.dense(-0.6719842051493347, 0.0)),
LabeledPoint(1.0, Vectors.dense(0.9699938346531928, 0.0)),
LabeledPoint(1.0, Vectors.dense(0.22759406190283604, 0.0)),
LabeledPoint(1.0, Vectors.dense(0.9688721028330911, 0.0)),
LabeledPoint(0.0, Vectors.dense(0.5993795346650845, 0.0)),
LabeledPoint(0.0, Vectors.dense(0.9219423508390701,
-0.8972778242305388)),
LabeledPoint(0.0, Vectors.dense(0.7006904841584055,
-0.5607635619919824
  .toDF("label", "features")

val logisticModel = new LogisticRegression()
  .setRegParam(0.3)
  .setLabelCol("label")
  .setFeaturesCol("features")
  .fit(sparkTrainingData)

println(s"Spark logistic model coefficients:
${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
// Spark logistic model coefficients:
[0.5451588538376263,0.26740606573584713]
Intercept: -0.13897955358689987

val linearModel = new LinearRegression()
  .setRegParam(0.3)
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setSolver("l-bfgs")
  .fit(sparkTrainingData)

println(s"Spark linear model coefficients:
${linearModel.coefficients} Intercept: ${linearModel.intercept}")
// Spark linear model coefficients:
[0.19852664861346023,0.11501200541407802]
Intercept: 0.45464906876832323

sc.stop()
  }
}

Thanks,

Frank


spark-streaming stopping

2017-03-12 Thread sathyanarayanan mudhaliyar
 I am not able to stop Spark-streaming job.
Let me explain briefly
* getting data from Kafka topic
* splitting data to create a JavaRDD
* mapping the JavaRDD to JavaPairRDD to do a reduceByKey transformation
* writing the JavaPairRDD into the C* DB   // something going wrong here
the message in the Kafka topic is exhausted but still the program is
running, the staging is happening though there is no data from Kafka, so
when I tried to kill the program manually there was no output into the
database C*.