ClassNotFoundException: org.apache.parquet.hadoop.ParquetOutputCommitter

2016-07-07 Thread kevin
hi,all:
I build spark use:

./make-distribution.sh --name "hadoop2.7.1" --tgz
"-Pyarn,hadoop-2.6,parquet-provided,hive,hive-thriftserver" -DskipTests
-Dhadoop.version=2.7.1

I can run example :
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master spark://master1:7077 \
--driver-memory 1g \
--executor-memory 512m \
--executor-cores 1 \
lib/spark-examples*.jar \
10

but can't run example :
org.apache.spark.examples.sql.RDDRelation

*I got error:*
16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
app-20160707182845-0003/2 is now RUNNING
16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
app-20160707182845-0003/4 is now RUNNING
16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
app-20160707182845-0003/3 is now RUNNING
16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
app-20160707182845-0003/0 is now RUNNING
16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
app-20160707182845-0003/1 is now RUNNING
16/07/07 18:28:45 INFO client.AppClient$ClientEndpoint: Executor updated:
app-20160707182845-0003/5 is now RUNNING
16/07/07 18:28:46 INFO cluster.SparkDeploySchedulerBackend:
SchedulerBackend is ready for scheduling beginning after reached
minRegisteredResourcesRatio: 0.0
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/parquet/hadoop/ParquetOutputCommitter
at org.apache.spark.sql.SQLConf$.(SQLConf.scala:319)
at org.apache.spark.sql.SQLConf$.(SQLConf.scala)
at org.apache.spark.sql.SQLContext.(SQLContext.scala:85)
at org.apache.spark.sql.SQLContext.(SQLContext.scala:77)
at main.RDDRelation$.main(RDDRelation.scala:13)
at main.RDDRelation.main(RDDRelation.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.parquet.hadoop.ParquetOutputCommitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 15 more


where I can find spark-streaming-kafka for spark2.0

2016-07-24 Thread kevin
hi,all :
I try to run example org.apache.spark.examples.streaming.KafkaWordCount , I
got error :
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/kafka/KafkaUtils$
at
org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
at
org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaUtils$
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more

so where I can find spark-streaming-kafka for spark2.0


Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread kevin
I have compile it from source code

2016-07-25 12:05 GMT+08:00 kevin :

> hi,all :
> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
> I got error :
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
> at
> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
> at
> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaUtils$
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 11 more
>
> so where I can find spark-streaming-kafka for spark2.0
>


spark2.0 can't run SqlNetworkWordCount

2016-07-25 Thread kevin
hi,all:
I download spark2.0 per-build. I can run SqlNetworkWordCount test use :
bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount
master1 

but when I use spark2.0 example source code SqlNetworkWordCount.scala and
build it to a jar bao with dependencies ( JDK 1.8 AND SCALA2.10)
when I use spark-submit to run it I got error:

16/07/25 17:28:30 INFO scheduler.JobScheduler: Starting job streaming job
146943891 ms.0 from job set of time 146943891 ms
Exception in thread "streaming-job-executor-2" java.lang.NoSuchMethodError:
scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
at
main.SqlNetworkWordCount$$anonfun$main$1.apply(SqlNetworkWordCount.scala:67)
at
main.SqlNetworkWordCount$$anonfun$main$1.apply(SqlNetworkWordCount.scala:61)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread kevin
Thank you,I can't find spark-streaming-kafka_2.10 jar for spark2 from maven
center. so I try the version 1.6.2,it not work ,it need class
org.apache.spark.Logging, which can't find in spark2. so I build
spark-streaming-kafka_2.10
jar for spark2 from the source code. it's work now.

2016-07-26 2:12 GMT+08:00 Cody Koeninger :

> For 2.0, the kafka dstream support is in two separate subprojects
> depending on which version of Kafka you are using
>
> spark-streaming-kafka-0-10
> or
> spark-streaming-kafka-0-8
>
> corresponding to brokers that are version 0.10+ or 0.8+
>
> On Mon, Jul 25, 2016 at 12:29 PM, Reynold Xin  wrote:
> > The presentation at Spark Summit SF was probably referring to Structured
> > Streaming. The existing Spark Streaming (dstream) in Spark 2.0 has the
> same
> > production stability level as Spark 1.6. There is also Kafka 0.10
> support in
> > dstream.
> >
> > On July 25, 2016 at 10:26:49 AM, Andy Davidson
> > (a...@santacruzintegration.com) wrote:
> >
> > Hi Kevin
> >
> > Just a heads up at the recent spark summit in S.F. There was a
> presentation
> > about streaming in 2.0. They said that streaming was not going to
> production
> > ready in 2.0.
> >
> > I am not sure if the older 1.6.x version will be supported. My project
> will
> > not be able to upgrade with streaming support. We also use kafka
> >
> > Andy
> >
> > From: Marco Mistroni 
> > Date: Monday, July 25, 2016 at 2:33 AM
> > To: kevin 
> > Cc: "user @spark" , "dev.spark"
> > 
> > Subject: Re: where I can find spark-streaming-kafka for spark2.0
> >
> > Hi Kevin
> >   you should not need to rebuild everything.
> > Instead, i believe you should launch spark-submit by specifying the kafka
> > jar file in your --packages... i had to follow same when integrating
> spark
> > streaming with flume
> >
> >   have you checked this link ?
> > https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> >
> >
> > hth
> >
> >
> >
> > On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:
> >>
> >> I have compile it from source code
> >>
> >> 2016-07-25 12:05 GMT+08:00 kevin :
> >>>
> >>> hi,all :
> >>> I try to run example
> org.apache.spark.examples.streaming.KafkaWordCount ,
> >>> I got error :
> >>> Exception in thread "main" java.lang.NoClassDefFoundError:
> >>> org/apache/spark/streaming/kafka/KafkaUtils$
> >>> at
> >>>
> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
> >>> at
> >>>
> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>> at
> >>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>> at
> >>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> at java.lang.reflect.Method.invoke(Method.java:498)
> >>> at
> >>>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> >>> at
> >>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> >>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> >>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >>> Caused by: java.lang.ClassNotFoundException:
> >>> org.apache.spark.streaming.kafka.KafkaUtils$
> >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >>> ... 11 more
> >>>
> >>> so where I can find spark-streaming-kafka for spark2.0
> >>
> >>
> >
>


Re: Odp.: spark2.0 can't run SqlNetworkWordCount

2016-07-25 Thread kevin
thanks a lot .after change to scala 2.11 , it works.

2016-07-25 17:40 GMT+08:00 Tomasz Gawęda :

> Hi,
>
> Please change Scala version to 2.11.  As far as I know, Spark packages are
> now build with Scala 2.11 and I've got other - 2.10 - version
>
>
>
> ------
> *Od:* kevin 
> *Wysłane:* 25 lipca 2016 11:33
> *Do:* user.spark; dev.spark
> *Temat:* spark2.0 can't run SqlNetworkWordCount
>
> hi,all:
> I download spark2.0 per-build. I can run SqlNetworkWordCount test use :
> bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount
> master1 
>
> but when I use spark2.0 example source code SqlNetworkWordCount.scala and
> build it to a jar bao with dependencies ( JDK 1.8 AND SCALA2.10)
> when I use spark-submit to run it I got error:
>
> 16/07/25 17:28:30 INFO scheduler.JobScheduler: Starting job streaming job
> 146943891 ms.0 from job set of time 146943891 ms
> Exception in thread "streaming-job-executor-2"
> java.lang.NoSuchMethodError:
> scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
> at
> main.SqlNetworkWordCount$$anonfun$main$1.apply(SqlNetworkWordCount.scala:67)
> at
> main.SqlNetworkWordCount$$anonfun$main$1.apply(SqlNetworkWordCount.scala:61)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>


spark2.0 how to use sparksession and StreamingContext same time

2016-07-25 Thread kevin
hi,all:
I want to read data from kafka and regist as a table then join a jdbc table.
My sample like this :

val spark = SparkSession
  .builder
  .config(sparkConf)
  .getOrCreate()

val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
"jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
jdbcDF.cache().createOrReplaceTempView("black_book")
  val df = spark.sql("select * from black_book")
  df.show()

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))

*I got error :*

16/07/26 11:18:07 WARN AbstractHandler: No Server set for
org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
++++
|  id|username|password|
++++
|e6faca36-8766-4dc...|   a|   a|
|699285a3-a108-457...|   admin| 123|
|e734752d-ac98-483...|test|test|
|c0245226-128d-487...|   test2|   test2|
|4f1bbdb2-89d1-4cc...| 119| 911|
|16a9a360-13ee-4b5...|1215|1215|
|bf7d6a0d-2949-4c3...|   demo3|   demo3|
|de30747c-c466-404...| why| why|
|644741c9-8fd7-4a5...|   scala|   p|
|cda1e44d-af4b-461...| 123| 231|
|6e409ed9-c09b-4e7...| 798|  23|
++++

Exception in thread "main" org.apache.spark.SparkException: Only one
SparkContext may be running in this JVM (see SPARK-2243). To ignore this
error, set spark.driver.allowMultipleContexts = true. The currently running
SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
main.POC$.main(POC.scala:43)
main.POC.main(POC.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
at
org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
at org.apache.spark.SparkContext.(SparkContext.scala:91)
at
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:84)
at main.POC$.main(POC.scala:50)
at main.POC.main(POC.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Re: spark2.0 how to use sparksession and StreamingContext same time

2016-07-25 Thread kevin
thanks a lot Terry

2016-07-26 12:03 GMT+08:00 Terry Hoo :

> Kevin,
>
> Try to create the StreamingContext as following:
>
> val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
>
>
>
> On Tue, Jul 26, 2016 at 11:25 AM, kevin  wrote:
>
>> hi,all:
>> I want to read data from kafka and regist as a table then join a jdbc
>> table.
>> My sample like this :
>>
>> val spark = SparkSession
>>   .builder
>>   .config(sparkConf)
>>   .getOrCreate()
>>
>> val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
>> "jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
>> "dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
>> jdbcDF.cache().createOrReplaceTempView("black_book")
>>   val df = spark.sql("select * from black_book")
>>   df.show()
>>
>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>> ssc.checkpoint("checkpoint")
>>
>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>> topicMap).map(_._2)
>> val words = lines.flatMap(_.split(" "))
>>
>> *I got error :*
>>
>> 16/07/26 11:18:07 WARN AbstractHandler: No Server set for
>> org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
>> ++++
>> |  id|username|password|
>> ++++
>> |e6faca36-8766-4dc...|   a|   a|
>> |699285a3-a108-457...|   admin| 123|
>> |e734752d-ac98-483...|test|test|
>> |c0245226-128d-487...|   test2|   test2|
>> |4f1bbdb2-89d1-4cc...| 119| 911|
>> |16a9a360-13ee-4b5...|1215|1215|
>> |bf7d6a0d-2949-4c3...|   demo3|   demo3|
>> |de30747c-c466-404...| why| why|
>> |644741c9-8fd7-4a5...|   scala|   p|
>> |cda1e44d-af4b-461...| 123| 231|
>> |6e409ed9-c09b-4e7...| 798|  23|
>> ++++
>>
>> Exception in thread "main" org.apache.spark.SparkException: Only one
>> SparkContext may be running in this JVM (see SPARK-2243). To ignore this
>> error, set spark.driver.allowMultipleContexts = true. The currently running
>> SparkContext was created at:
>>
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
>> main.POC$.main(POC.scala:43)
>> main.POC.main(POC.scala)
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:498)
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> at
>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
>> at
>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
>> at scala.Option.foreach(Option.scala:257)
>> at
>> org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
>> at
>> org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
>> at org.apache.spark.SparkContext.(SparkContext.scala:91)
>> at
>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:84)
>> at main.POC$.main(POC.scala:50)
>> at main.POC.main(POC.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>


dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread kevin
HI ALL:
I don't quite understand the different between : dataframe.foreach and
dataframe.collect().foreach . When to use dataframe.foreach?

I use spark2.0 ,I want to iterate a dataframe to get one colum's value :

this can work out

 blacklistDF.collect().foreach { x =>
println(s">>>getString(0)" + x.getAs[String]("uid"))
val put = new Put(Bytes.toBytes(x.getAs[String]("uid")));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("uid"),
Bytes.toBytes(x.getAs[String]("uid")))
hrecords.add(put)
  }

if I use blacklistDF.foreach {} I will get nothing


Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread kevin
thank you Chanh

2016-07-26 15:34 GMT+08:00 Chanh Le :

> Hi Ken,
>
> *blacklistDF -> just DataFrame *
> Spark is lazy until you call something like* collect, take, write* it
> will execute the hold process *like you do map or filter before you
> collect*.
> That mean until you call collect spark* do nothing* so you df would not
> have any data -> can’t call foreach.
> Call collect execute the process -> get data -> foreach is ok.
>
>
> On Jul 26, 2016, at 2:30 PM, kevin  wrote:
>
>  blacklistDF.collect()
>
>
>


tpcds for spark2.0

2016-07-27 Thread kevin
hi,all:
I want to have a test about tpcds99 sql run on spark2.0.
I user https://github.com/databricks/spark-sql-perf

about the master version ,when I run :val tpcds = new TPCDS (sqlContext =
sqlContext) I got error:

scala> val tpcds = new TPCDS (sqlContext = sqlContext)
error: missing or invalid dependency detected while loading class file
'Benchmarkable.class'.
Could not access term typesafe in package com,
because it (or its dependencies) are missing. Check your build definition
for
missing or conflicting dependencies. (Re-run with -Ylog-classpath to see
the problematic classpath.)
A full rebuild may help if 'Benchmarkable.class' was compiled against an
incompatible version of com.
error: missing or invalid dependency detected while loading class file
'Benchmarkable.class'.
Could not access term scalalogging in value com.typesafe,
because it (or its dependencies) are missing. Check your build definition
for
missing or conflicting dependencies. (Re-run with -Ylog-classpath to see
the problematic classpath.)
A full rebuild may help if 'Benchmarkable.class' was compiled against an
incompatible version of com.typesafe.

about spark-sql-perf-0.4.3 when I run
:tables.genData("hdfs://master1:9000/tpctest", "parquet", true, false,
false, false, false) I got error:

Generating table catalog_sales in database to
hdfs://master1:9000/tpctest/catalog_sales with save mode Overwrite.
16/07/27 18:59:59 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
slave1): java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD


spark.read.format("jdbc")

2016-07-31 Thread kevin
hi,all:
   I try to load data from jdbc datasource,but I got error with :
java.lang.RuntimeException: Multiple sources found for jdbc
(org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider,
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource), please
specify the fully qualified class name.

spark version is 2.0


Re: spark.read.format("jdbc")

2016-07-31 Thread kevin
maybe there is another version spark on the classpath?

2016-08-01 14:30 GMT+08:00 kevin :

> hi,all:
>I try to load data from jdbc datasource,but I got error with :
> java.lang.RuntimeException: Multiple sources found for jdbc
> (org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider,
> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource), please
> specify the fully qualified class name.
>
> spark version is 2.0
>
>


Re: spark.read.format("jdbc")

2016-08-01 Thread kevin
I was fix it by :
val jdbcDF =
spark.read.format("org.apache.spark.sql.execution.datasources.jdbc.DefaultSource")
  .options(Map("url" -> s"jdbc:mysql://${mysqlhost}:3306/test",
"driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "i_user", "user" ->
"root", "password" -> "123456"))
  .load()

where org.apache.spark.sql.execution.datasources.jdbc.DefaultSource and
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider both
have the same short name:jdbc

2016-08-01 15:24 GMT+08:00 Nikolay Zhebet :

> You should specify classpath for your jdbc connection.
> As example, if you want connect to Impala, you can try it snippet:
>
>
>
> import java.util.Properties
> import org.apache.spark._
> import org.apache.spark.sql.SQLContext
> import java.sql.Connection
> import java.sql.DriverManager
> Class.forName("com.cloudera.impala.jdbc41.Driver")
>
> var conn: java.sql.Connection = null
> conn = 
> DriverManager.getConnection("jdbc:impala://127.0.0.1:21050/default;auth=noSasl",
>  "", "")
> val statement = conn.createStatement();
>
> val result = statement.executeQuery("SELECT * FROM users limit 10")
> result.next()
> result.getString("user_id")val sql_insert = "INSERT INTO users 
> VALUES('user_id','email','gender')"
> statement.executeUpdate(sql_insert)
>
>
> Also you should specify path your jdbc jar file in --driver-class-path
> variable when you running spark-submit:
>
> spark-shell --master "local[2]" --driver-class-path 
> /opt/cloudera/parcels/CDH/jars/ImpalaJDBC41.jar
>
>
> 2016-08-01 9:37 GMT+03:00 kevin :
>
>> maybe there is another version spark on the classpath?
>>
>> 2016-08-01 14:30 GMT+08:00 kevin :
>>
>>> hi,all:
>>>I try to load data from jdbc datasource,but I got error with :
>>> java.lang.RuntimeException: Multiple sources found for jdbc
>>> (org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider,
>>> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource), please
>>> specify the fully qualified class name.
>>>
>>> spark version is 2.0
>>>
>>>
>>
>


Re: tpcds for spark2.0

2016-08-01 Thread kevin
finally I use  spark-sql-perf-0.4.3 :
./bin/spark-shell --jars
/home/dcos/spark-sql-perf-0.4.3/target/scala-2.11/spark-sql-perf_2.11-0.4.3.jar
--executor-cores 4 --executor-memory 10G --master spark://master1:7077
If I don't use "--jars" I will get error what I mentioned.

2016-07-29 21:17 GMT+08:00 Olivier Girardot :

> I have the same kind of issue (not using spark-sql-perf), just trying to
> deploy 2.0.0 on mesos.
> I'll keep you posted as I investigate
>
>
>
> On Wed, Jul 27, 2016 1:06 PM, kevin kiss.kevin...@gmail.com wrote:
>
>> hi,all:
>> I want to have a test about tpcds99 sql run on spark2.0.
>> I user https://github.com/databricks/spark-sql-perf
>>
>> about the master version ,when I run :val tpcds = new TPCDS (sqlContext =
>> sqlContext) I got error:
>>
>> scala> val tpcds = new TPCDS (sqlContext = sqlContext)
>> error: missing or invalid dependency detected while loading class file
>> 'Benchmarkable.class'.
>> Could not access term typesafe in package com,
>> because it (or its dependencies) are missing. Check your build definition
>> for
>> missing or conflicting dependencies. (Re-run with -Ylog-classpath to see
>> the problematic classpath.)
>> A full rebuild may help if 'Benchmarkable.class' was compiled against an
>> incompatible version of com.
>> error: missing or invalid dependency detected while loading class file
>> 'Benchmarkable.class'.
>> Could not access term scalalogging in value com.typesafe,
>> because it (or its dependencies) are missing. Check your build definition
>> for
>> missing or conflicting dependencies. (Re-run with -Ylog-classpath to see
>> the problematic classpath.)
>> A full rebuild may help if 'Benchmarkable.class' was compiled against an
>> incompatible version of com.typesafe.
>>
>> about spark-sql-perf-0.4.3 when I run
>> :tables.genData("hdfs://master1:9000/tpctest", "parquet", true, false,
>> false, false, false) I got error:
>>
>> Generating table catalog_sales in database to
>> hdfs://master1:9000/tpctest/catalog_sales with save mode Overwrite.
>> 16/07/27 18:59:59 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> slave1): java.lang.ClassCastException: cannot assign instance of
>> scala.collection.immutable.List$SerializationProxy to field
>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type
>> scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
>>
>>
>
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
> +33 6 24 09 17 94
>


how to build spark with out hive

2016-01-25 Thread kevin
HI,all
I need to test hive on spark ,to use spark as the hive's execute engine.
I download the spark source 1.5.2 from apache web-site.
I have installed maven3.3.9 and scala 2.10.6 ,so I change
the /make-distribution.sh
to point to my mvn location where I installed.

then I run the commond:
./make-distribution.sh --name "hadoop2-without-hive" --tgz
"-Pyarn,hadoop-2.7,hadoop-provided,parquet-provided" -DskipTests
-Dhadoop.version=2.7.1

is this all right? when I star the spark cluster ,I got error :

Spark Command: /usr/lib/jdk/bin/java -cp
/dcos/spark/sbin/../conf/:/dcos/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar:/dcos/hadoop/etc/hadoop/
-Xms
1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip
10.1.3.107 --port 7077 --webui-port 8080

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/Logger
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2615)
at java.lang.Class.getMethod0(Class.java:2856)
at java.lang.Class.getMethod(Class.java:1668)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: org.slf4j.Logger
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 6 more


I NEED some advise.


hive1.2.1 on spark 1.5.2

2016-01-26 Thread kevin
hi,all
   I tried hive on spark with version hive1.2.1 spark1.5.2. I build spark
witout -Phive . And I test spark cluster stand alone with spark-submit and
it is ok.
   but when I use hive , on spark web-site I can see the hive on spark
application ,finally I got error:


16/01/26 16:23:42 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/01/26 16:23:42 INFO Remoting: Starting remoting
16/01/26 16:23:42 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://driverPropsFetcher@10.1.3.116:42307]
16/01/26 16:23:42 INFO util.Utils: Successfully started service
'driverPropsFetcher' on port 42307.
Exception in thread "main" akka.actor.ActorNotFound: Actor not found
for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.1.3.107:34725/),
Path(/user/CoarseGrainedScheduler)]
at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at 
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:935)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



*Could anyone tell me is it for the version of hive and spark not
matching ? which version is ok or there is some other reason? *


Keep state inside map function

2014-07-30 Thread Kevin
Hi,

Is it possible to maintain state inside a Spark map function? With Hadoop
MapReduce, Mappers and Reducers are classes that can have their own state
using instance variables. Can this be done with Spark? Are there any
examples?

Most examples I have seen do a simple operating on the value passed into
the map function and then pass it along to the reduce function.

Thanks in advance.

-Kevin


Re: Keep state inside map function

2014-07-30 Thread Kevin
Thanks to the both of you for your inputs. Looks like I'll play with the
mapPartitions function to start porting MapReduce algorithms to Spark.


On Wed, Jul 30, 2014 at 1:23 PM, Sean Owen  wrote:

> Really, the analog of a Mapper is not map(), but mapPartitions(). Instead
> of:
>
> rdd.map(yourFunction)
>
> ... you can run setup code before mapping a bunch of records, and
> after, like so:
>
> rdd.mapPartitions { partition =>
>// Some setup code here
>partition.map(yourfunction)
>// Some cleanup code here
> }
>
> You couldn't share state across Mappers, or Mappers and Reducers in
> Hadoop. (At least there was no direct way.) Same here. But you can
> maintain state across many map calls.
>
> On Wed, Jul 30, 2014 at 6:07 PM, Kevin  wrote:
> > Hi,
> >
> > Is it possible to maintain state inside a Spark map function? With Hadoop
> > MapReduce, Mappers and Reducers are classes that can have their own state
> > using instance variables. Can this be done with Spark? Are there any
> > examples?
> >
> > Most examples I have seen do a simple operating on the value passed into
> the
> > map function and then pass it along to the reduce function.
> >
> > Thanks in advance.
> >
> > -Kevin
>


Re: [Spark Core] Vectorizing very high-dimensional data sourced in long format

2020-11-01 Thread kevin chen
Perhaps it can avoid errors(exhausting executor and driver memory) to add
random numbers to the entity_id column when you solve the issue by
Patrick's way.

Daniel Chalef  于2020年10月31日周六
上午12:42写道:

> Yes, the resulting matrix would be sparse. Thanks for the suggestion. Will
> explore ways of doing this using an agg and UDF.
>
> On Fri, Oct 30, 2020 at 6:26 AM Patrick McCarthy
>  wrote:
>
>> That's a very large vector. Is it sparse? Perhaps you'd have better luck
>> performing an aggregate instead of a pivot, and assembling the vector using
>> a UDF.
>>
>> On Thu, Oct 29, 2020 at 10:19 PM Daniel Chalef
>>  wrote:
>>
>>> Hello,
>>>
>>> I have a very large long-format dataframe (several billion rows) that
>>> I'd like to pivot and vectorize (using the VectorAssembler), with the aim
>>> to reduce dimensionality using something akin to TF-IDF. Once pivoted, the
>>> dataframe will have ~130 million columns.
>>>
>>> The source, long-format schema looks as follows:
>>>
>>> root
>>>  |-- entity_id: long (nullable = false)
>>>  |-- attribute_id: long (nullable = false)
>>>  |-- event_count: integer (nullable = true)
>>>
>>> Pivoting as per the following fails, exhausting executor and driver
>>> memory. I am unsure whether increasing memory limits would be successful
>>> here as my sense is that pivoting and then using a VectorAssembler isn't
>>> the right approach to solving this problem.
>>>
>>> wide_frame = (
>>> long_frame.groupBy("entity_id")
>>> .pivot("attribute_id")
>>> .agg(F.first("event_count"))
>>> )
>>>
>>> Are there other Spark patterns that I should attempt in order to achieve
>>> my end goal of a vector of attributes for every entity?
>>>
>>> Thanks, Daniel
>>>
>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>


Re: spark-submit parameters about two keytab files to yarn and kafka

2020-11-01 Thread kevin chen
Hi,

Hope it can solve the issue by following method:

*step 1 : *
create a kafka kerberos config named kafka_client_jaas.conf:

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="./kafka.service.keytab"
   storeKey=true
   useTicketCache=false
   serviceName="kafka"
   principal="kafka/x...@example.com";
};


*step 2:*
spark-submit command :

/usr/local/spark/bin/spark-submit \
--files ./kafka_client_jaas.conf,./kafka.service.keytab \
--driver-java-options
"-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
--conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf"
\
--conf spark.yarn.keytab=./hadoop.service.keytab \
--conf spark.yarn.principal=hadoop/EXAMPLE.COM \

.

*step 3:*

change security.protocol in kafka client config  to SASL_PLAINTEXT, if your
spark version is 1.6.


*note:*
my test env:  spark 2.0.2  kafka 0.10

references
1. using-spark-streaming
<https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.6.0/bk_spark-component-guide/content/using-spark-streaming.html>


-- 

Best,

Kevin Pis

Gabor Somogyi  于2020年10月28日周三 下午5:25写道:

> Hi,
>
> Cross-realm trust must be configured. One can find several docs on how to
> do that.
>
> BR,
> G
>
>
> On Wed, Oct 28, 2020 at 8:21 AM big data  wrote:
>
>> Hi,
>>
>> We want to submit spark streaming job to YARN and consume Kafka topic.
>>
>> YARN and Kafka are in two different clusters, and they have the
>> different kerberos authentication.
>>
>> We have two keytab files for YARN and Kafka.
>>
>> And my questions is how to add parameter for spark-submit command for
>> this situation?
>>
>> Thanks.
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


kafka_client_jaas.conf
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark streaming with Kafka

2020-11-03 Thread Kevin Pis
Hi,

this is my  Word Count demo.  https://github.com/kevincmchen/wordcount

MohitAbbi  于2020年11月4日周三 上午3:32写道:

> Hi,
>
> Can you please share the correct versions of JAR files which you used to
> resolve the issue. I'm also facing the same issue.
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Best,

Kevin Pis


Re: Using two WriteStreams in same spark structured streaming job

2020-11-08 Thread Kevin Pis
you means sparkSession.streams.awaitAnyTermination()?   May i have your
code ?  or you can see the following:


my demo code:


 val hourDevice =
beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
  .agg("duration" -> "sum").withColumnRenamed("sum(duration)",
"durationForHour")

hourDevice.writeStream
  .outputMode("update")
  .option("truncate", "false")
  .format("console")
  .start()

beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
  .agg("duration" -> "sum").withColumnRenamed("sum(duration)",
"durationForDay")

   dayDevice.writeStream
  .outputMode("update")
  .option("truncate", "false")
  .format("console")
  .start()

   sparkSession.streams.awaitAnyTermination()
`

   sparkSession.streams.awaitAnyTermination() is ok,  maybe its wrong
somewhere else in your code.

act_coder  于2020年11月5日周四 上午11:45写道:

> If I use for each function, then I may need to use custom Kafka stream
> writer
> right ?!
>
> And I might not be able to use default writestream.format(Kafka) method ?!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Best,

Kevin Pis


Re: how to manage HBase connections in Executors of Spark Streaming ?

2020-11-25 Thread chen kevin
  1.  the issue about that Kerberos expires.
 *   You don’t need to care aboubt usually, you can use the local keytab at 
every node in the Hadoop cluster.
 *   If there don’t have the keytab in your Hadoop cluster, you will need 
update your keytab in every executor periodically。
  2.  best practices about how to manage Hbase connections with kerberos 
authentication, the demo.java is the code about how to get the hbase connection.




From: big data 
Date: Tuesday, November 24, 2020 at 1:58 PM
To: "user@spark.apache.org" 
Subject: how to manage HBase connections in Executors of Spark Streaming ?


Hi,

Does any best practices about how to manage Hbase connections with kerberos 
authentication in Spark Streaming (YARN) environment?

Want to now how executors manage the HBase connections,how to create them, 
close them and refresh Kerberos expires.

Thanks.


demo.java
Description: demo.java

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Stream-static join : Refreshing subset of static data / Connection pooling

2020-11-29 Thread chen kevin
Hi,

you can use Debezium to capture real-timely the row-level changes in 
PostgreSql, then stream them to kafka, finally etl and write  the data to hbase 
by flink/spark streaming。So you can join the data in hbase directly. in 
consideration of the particularly big table,  the scan performance  in hbase is 
much better than PostgreSql.


From: German Schiavon 
Date: Friday, November 27, 2020 at 12:09 AM
To: Geervan Hayatnagarkar 
Cc: fanxin , User 
Subject: Re: Stream-static join : Refreshing subset of static data / Connection 
pooling

So that's it no? you can push down the in filter in the query with the id's and 
only retrieve those rows.

On Thu, 26 Nov 2020 at 16:49, Geervan Hayatnagarkar 
mailto:pande.a...@gmail.com>> wrote:
Yes we used collect to get all IDs in forEachBatch

No, the static table is huge and is updated frequently by other systems

On Thu, Nov 26, 2020, 9:01 PM fanxin 
mailto:fanxin...@gmail.com>> wrote:
Hi,
If the static table is not particularly big and the modify frequency is low, 
you can load the whole table as a DataFrame and persist it in the memory. You 
may also need to repartition the DataFrame


On 11/26/2020 21:44,Geervan 
Hayatnagarkar wrote:
Hi

We intend to do a stream-static join where kafka is a streaming source and 
RDBMS is a static source.

e.g. User activity data is coming in as a stream from Kafka source and we need 
to pull User personal details from PostgreSQL.

Because PostgreSQL is a static source, the entire "User-Personal-Details" table 
is being reloaded into spark memory for every microbatch.

Is there a way to optimise this? For example we should be able to pull user-ids 
from every microbatch and then make a query as below ?

select * from user-personal-details where user-id in 
()

While we could not find a clean way to do this, we chose to make a JDBC 
connection for every microbatch and achieved the above optimisation. But that 
is still suboptimal solution because JDBC connection is being created for every 
micro-batch. Is there a way to pool JDBC connection in Structured streaming?

Thanks & regards,
Arti Pande


Re: Stream-static join : Refreshing subset of static data / Connection pooling

2020-11-29 Thread chen kevin


  1.  I think it should not cause memory issue, , if you configurate kafka, 
spark/flink and hbase.
 *   We use the method in our scenario, the data will reach aoubt  80~150Tb 
a day.  Does it generate more data in your scenario ?  I think it’s the best 
method to deal with the particularly big  table that will be joined.
 *   I think frequent I/O actions like select  may cause memery or i/o 
issues.
  2.  You can use postgreSql connection pools to avoid making connnection 
frequently.


--
Best,
Kevin Chen


From: Geervan Hayatnagarkar 
Date: Sunday, November 29, 2020 at 6:20 PM
To: chen kevin 
Cc: German Schiavon , fanxin , 
User 
Subject: Re: Stream-static join : Refreshing subset of static data / Connection 
pooling

The real question is two fold:

1) we had to do collect on each microbatch. In high velocity streams this could 
result in millions of records causing memory issue. Also it appears that we are 
manually doing the real join by selecting  only matching rows from static 
source. Is there a better way to do this?

2) can we avoid making JDBC connection per microbatch? Can we pool it?

On Sun, Nov 29, 2020, 2:22 PM chen kevin 
mailto:kc4163...@gmail.com>> wrote:
Hi,

you can use Debezium to capture real-timely the row-level changes in 
PostgreSql, then stream them to kafka, finally etl and write  the data to hbase 
by flink/spark streaming。So you can join the data in hbase directly. in 
consideration of the particularly big table,  the scan performance  in hbase is 
much better than PostgreSql.


From: German Schiavon 
mailto:gschiavonsp...@gmail.com>>
Date: Friday, November 27, 2020 at 12:09 AM
To: Geervan Hayatnagarkar mailto:pande.a...@gmail.com>>
Cc: fanxin mailto:fanxin...@gmail.com>>, User 
mailto:user@spark.apache.org>>
Subject: Re: Stream-static join : Refreshing subset of static data / Connection 
pooling

So that's it no? you can push down the in filter in the query with the id's and 
only retrieve those rows.

On Thu, 26 Nov 2020 at 16:49, Geervan Hayatnagarkar 
mailto:pande.a...@gmail.com>> wrote:
Yes we used collect to get all IDs in forEachBatch

No, the static table is huge and is updated frequently by other systems

On Thu, Nov 26, 2020, 9:01 PM fanxin 
mailto:fanxin...@gmail.com>> wrote:
Hi,
If the static table is not particularly big and the modify frequency is low, 
you can load the whole table as a DataFrame and persist it in the memory. You 
may also need to repartition the DataFrame


On 11/26/2020 21:44,Geervan 
Hayatnagarkar<mailto:pande.a...@gmail.com> wrote:
Hi

We intend to do a stream-static join where kafka is a streaming source and 
RDBMS is a static source.

e.g. User activity data is coming in as a stream from Kafka source and we need 
to pull User personal details from PostgreSQL.

Because PostgreSQL is a static source, the entire "User-Personal-Details" table 
is being reloaded into spark memory for every microbatch.

Is there a way to optimise this? For example we should be able to pull user-ids 
from every microbatch and then make a query as below ?

select * from user-personal-details where user-id in 
()

While we could not find a clean way to do this, we chose to make a JDBC 
connection for every microbatch and achieved the above optimisation. But that 
is still suboptimal solution because JDBC connection is being created for every 
micro-batch. Is there a way to pool JDBC connection in Structured streaming?

Thanks & regards,
Arti Pande


Fwd: Fail to run benchmark in Github Action

2021-06-26 Thread Kevin Su
-- Forwarded message -
寄件者: Kevin Su 
Date: 2021年6月25日 週五 下午8:23
Subject: Fail to run benchmark in Github Action
To: 


Hi all,

I try to run a benchmark test in GitHub action in my fork, and I faced the
below error.
https://github.com/pingsutw/spark/runs/2867617238?check_suite_focus=true
java.lang.AssertionError: assertion failed: spark.test.home is not set!
23799
<https://github.com/pingsutw/spark/runs/2867617238?check_suite_focus=true#step:6:23799>
at scala.Predef$.assert(Predef.scala:223)
23800
<https://github.com/pingsutw/spark/runs/2867617238?check_suite_focus=true#step:6:23800>
at org.apache.spark.deploy.worker.Worker.(Worker.scala:148)
23801
<https://github.com/pingsutw/spark/runs/2867617238?check_suite_focus=true#step:6:23801>
at
org.apache.spark.deploy.worker.Worker$.startRpcEnvAndEndpoint(Worker.scala:954)

23802
<https://github.com/pingsutw/spark/runs/2867617238?check_suite_focus=true#step:6:23802>
at
org.apache.spark.deploy.LocalSparkCluster.$anonfun$start$2(LocalSparkCluster.scala:68)

23803
<https://github.com/pingsutw/spark/runs/2867617238?check_suite_focus=true#step:6:23803>
at
org.apache.spark.deploy.LocalSparkCluster.$anonfun$start$2$adapted(LocalSparkCluster.scala:65)

23804
<https://github.com/pingsutw/spark/runs/2867617238?check_suite_focus=true#step:6:23804>
at scala.collection.immutable.Range.foreach(Range.scala:158)

After I add the  "--driver-java-options
"-Dspark.test.home=$GITHUB_WORKSPACE" \" in benchmark.yml
<https://github.com/pingsutw/spark/blob/3256bfc2948f70e46af7d06421d15e43ff128bba/.github/workflows/benchmark.yml#L88>

I still got the below error.
https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true>.
Do I need to set something up in my fork?
after 1900, vec on, rebase EXCEPTION 7474 7511 58 13.4 74.7 2.7X
4427
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4427>after
1900, vec on, rebase LEGACY 9228 9296 60 10.8 92.3 2.2X
4428
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4428>after
1900, vec on, rebase CORRECTED 7553 7678 128 13.2 75.5 2.7X
4429
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4429>before
1900, vec off, rebase LEGACY 23280 23362 71 4.3 232.8 0.9X
4430
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4430>before
1900, vec off, rebase CORRECTED 20548 20630 119 4.9 205.5 1.0X
4431
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4431>before
1900, vec on, rebase LEGACY 12210 12239 37 8.2 122.1 1.7X
4432
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4432>before
1900, vec on, rebase CORRECTED 7486 7489 2 13.4 74.9 2.7X
4433
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4433>
4434
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4434>Running
benchmark: Save TIMESTAMP_MICROS to parquet
4435
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4435>
Running case: after 1900, noop
4436
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4436>
Stopped after 1 iterations, 4003 ms
4437
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4437>
Running case: before 1900, noop
4438
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4438>
Stopped after 1 iterations, 3965 ms
4439
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4439>
Running case: after 1900, rebase EXCEPTION
4440
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4440>
Stopped after 1 iterations, 18339 ms
4441
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4441>
Running case: after 1900, rebase LEGACY
4442
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4442>
Stopped after 1 iterations, 18375 ms
4443
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4443>
Running case: after 1900, rebase CORRECTED

<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:>
Stopped after 1 iterations, 18716 ms
4445
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4445>
Running case: before 1900, rebase LEGACY
4446
<https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true#step:6:4446>Error:
The operation was canceled.


How to run spark benchmark on standalone cluster?

2021-07-02 Thread Kevin Su
Hi all,

I want to run spark benchmark on a standalone cluster, and I have changed
the DataSourceReadBenchmark.scala setting. (Remove "spark.master")

--- a/sql/core/src/test
/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ b/sql/core/src/test
/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -48,7 +48,6 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
val conf = new SparkConf()
  .setAppName("DataSourceReadBenchmark")
  // Since `spark.master` always exists, overrides this value
-  .set("spark.master", "local[1]")
  .setIfMissing("spark.driver.memory", "3g")
  .setIfMissing("spark.executor.memory", "3g")

I ran the benchmark using below command

bin/spark-submit \

--driver-memory 16g \

--master spark://kobe-pc:7077 \

--class org.apache.spark.benchmark.Benchmarks \

--jars \ "`find . -name '*-SNAPSHOT-tests.jar' -o -name
'*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ "`find . -name
'spark-core*-SNAPSHOT-tests.jar'`" \
"org.apache.spark.sql.execution.datasources.*"

I met the below error

Driver stacktrace:
21/07/02 22:35:13 INFO DAGScheduler: Job 0 failed: apply at
BenchmarkBase.scala:42, took 1.374943 s
21/07/02 22:35:13 ERROR FileFormatWriter: Aborting job
a6ceeb0c-5f9d-44ca-a896-65d4a7b8b948.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in
stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0
(TID 7) (192.168.103.14 executor 0): java.lang.NoClassDefFoundError: Could
not initialize class
org.apache.spark.sql.execution.datasources.csv.CSVBenchmark$
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)


determine week of month from date in spark3

2022-02-11 Thread Appel, Kevin
Previously in Spark2 we could use the spark function date_format with the "W" 
flag and it will provide back the week of month of that date.  In Spark3 when 
trying this there is an error back:


* org.apache.spark.SparkUpgradeException: You may get a different 
result due to the upgrading of Spark 3.0: Fail to recognize 'W' pattern in the 
DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY 
to restore the behavior before Spark 3.0. 2) You can form a valid datetime 
pattern with the guide from 
https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html


* Caused by: java.lang.IllegalArgumentException: All week-based 
patterns are unsupported since Spark 3.0, detected: W, Please use the SQL 
function EXTRACT instead

If I use the first solution and set the policy to LEGACY, currently it is 
EXCEPTION, then the code runs through

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

df1 = spark.createDataFrame(
[
(1, date(2014, 3, 7)),
(2, date(2014, 3, 8)),
(3, date(2014, 3, 30)),
(4, date(2014, 3, 31)),
(5, date(2015, 3, 7)),
(6, date(2015, 3, 8)),
(7, date(2015, 3, 30)),
(8, date(2015, 3, 31)),
],
schema="a long, b date",
)
df1 = df1.withColumn("WEEKOFMONTH", F.date_format(F.col("b"), "W"))
df1.show()

+---+--+---+
|  a| b|WEEKOFMONTH|
+---+--+---+
|  1|2014-03-07|  2|
|  2|2014-03-08|  2|
|  3|2014-03-30|  6|
|  4|2014-03-31|  6|
|  5|2015-03-07|  1|
|  6|2015-03-08|  2|
|  7|2015-03-30|  5|
|  8|2015-03-31|  5|
+---+--+---+

Trying to explore the latter options, in both the EXTRACT and the datetime 
patterns that are listed, I don't see that there is the "W" option or 
equivalent way to produce this.
The datetime link mentions: In Spark 3.0, we define our own pattern strings in 
Datetime Patterns for Formatting and 
Parsing<https://spark.apache.org/docs/3.2.1/sql-ref-datetime-pattern.html>, 
which is implemented via 
DateTimeFormatter<https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html>
 under the hood.
If I follow the link into the DateTimeFormatter then I see the W existing there:
   W   week-of-month   number4

My first question is, with the Spark3 and not setting the policy to LEGACY, is 
there no longer a way to compute this using the spark built in functions?

The second question is, if we are setting the policy to LEGACY, is there any 
caveats or items to be aware of that might get us later? For example in a 
future Spark 3.3.X is this option going to be deprecated

This was an item that we ran into from Spark2 to Spark3 conversion and trying 
to see how to best handle this

Thanks for your feedback,

Kevin





--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.


RE: determine week of month from date in spark3

2022-02-11 Thread Appel, Kevin
The output I sent originally with WEEKOFMONTHF is when LEGACY is set, when 
EXCEPTION is set this is the result which is also different

+---+--++
|  a| b|WEEKOFMONTHF|
+---+--++
|  1|2014-03-07|   7|
|  2|2014-03-08|   1|
|  3|2014-03-30|   2|
|  4|2014-03-31|   3|
|  5|2015-03-07|   7|
|  6|2015-03-08|   1|
|  7|2015-03-30|   2|
|  8|2015-03-31|   3|
+---+--++

From: Appel, Kevin
Sent: Friday, February 11, 2022 2:35 PM
To: user@spark.apache.org; 'Sean Owen' 
Subject: RE: determine week of month from date in spark3

Thanks for the reply, that is looking to be along the lines of what is going 
on, and they mention in that item the W is banned which is what I saw in the 
error, however F is not giving the same as W.

+---+--+++
|  a| b|WEEKOFMONTHW|WEEKOFMONTHF|
+---+--+++
|  1|2014-03-07|   2|   1|
|  2|2014-03-08|   2|   2|
|  3|2014-03-30|   6|   5|
|  4|2014-03-31|   6|   5|
|  5|2015-03-07|   1|   1|
|  6|2015-03-08|   2|   2|
|  7|2015-03-30|   5|   5|
|  8|2015-03-31|   5|   5|
+---+--+++

The best way to explain what W is giving, if you look at a printed picture of a 
calendar from March 2014, the March 30 and March 31 are on row6 which is week6; 
whereas if you look at the same for March 2015, they are on row5 which is week5

From: Sean Owen [mailto:sro...@gmail.com]
Sent: Friday, February 11, 2022 2:11 PM
To: Appel, Kevin mailto:kevin.ap...@bofa.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: determine week of month from date in spark3

Here is some back-story: 
https://issues.apache.org/jira/browse/SPARK-32683<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/SPARK-32683__;!!I2XIyG2ANlwasLbx!CmBwmER27rdSMbrkX0Q5pAnGP1kT0purbfK-xyXJaSYcA73FQ-BRNothV5PIVyCX$>
I think the answer may be: use "F"?

On Fri, Feb 11, 2022 at 12:43 PM Appel, Kevin 
mailto:kevin.ap...@bofa.com.invalid>> wrote:
Previously in Spark2 we could use the spark function date_format with the “W” 
flag and it will provide back the week of month of that date.  In Spark3 when 
trying this there is an error back:


• org.apache.spark.SparkUpgradeException: You may get a different 
result due to the upgrading of Spark 3.0: Fail to recognize 'W' pattern in the 
DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY 
to restore the behavior before Spark 3.0. 2) You can form a valid datetime 
pattern with the guide from 
https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html<https://urldefense.com/v3/__https:/spark.apache.org/docs/latest/sql-ref-datetime-pattern.html__;!!I2XIyG2ANlwasLbx!CmBwmER27rdSMbrkX0Q5pAnGP1kT0purbfK-xyXJaSYcA73FQ-BRNothV8Wk6Rhv$>


• Caused by: java.lang.IllegalArgumentException: All week-based 
patterns are unsupported since Spark 3.0, detected: W, Please use the SQL 
function EXTRACT instead

If I use the first solution and set the policy to LEGACY, currently it is 
EXCEPTION, then the code runs through

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

df1 = spark.createDataFrame(
[
(1, date(2014, 3, 7)),
(2, date(2014, 3, 8)),
(3, date(2014, 3, 30)),
(4, date(2014, 3, 31)),
(5, date(2015, 3, 7)),
(6, date(2015, 3, 8)),
(7, date(2015, 3, 30)),
(8, date(2015, 3, 31)),
],
schema="a long, b date",
)
df1 = df1.withColumn("WEEKOFMONTH", F.date_format(F.col("b"), "W"))
df1.show()

+---+--+---+
|  a| b|WEEKOFMONTH|
+---+--+---+
|  1|2014-03-07|  2|
|  2|2014-03-08|  2|
|  3|2014-03-30|  6|
|  4|2014-03-31|  6|
|  5|2015-03-07|  1|
|  6|2015-03-08|  2|
|  7|2015-03-30|  5|
|  8|2015-03-31|  5|
+---+--+---+

Trying to explore the latter options, in both the EXTRACT and the datetime 
patterns that are listed, I don’t see that there is the “W” option or 
equivalent way to produce this.
The datetime link mentions: In Spark 3.0, we define our own pattern strings in 
Datetime Patterns for Formatting and 
Parsing<https://urldefense.com/v3/__https:/spark.apache.org/docs/3.2.1/sql-ref-datetime-pattern.html__;!!I2XIyG2ANlwasLbx!CmBwmER27rdSMbrkX0Q5pAnGP1kT0purbfK-xyXJaSYcA73FQ-BRNothV4nyIR2F$>,
 which is implemented via 
DateTimeFormatter<https://urldefense.com/v3/__https:/docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html__;!!I2XIyG2ANlwasLbx!CmBwmER27rdSMbrkX0Q5pAnGP1kT0purbfK-xyXJaSYcA73FQ-BRNothVwUZzbod$>
 under the 

Unsubscribe

2023-07-27 Thread Kevin Wang
Unsubscribe please!


RE: How to add MaxDOP option in spark mssql JDBC

2024-04-24 Thread Appel, Kevin
You might be able to leverage the prepareQuery option, that is at 
https://spark.apache.org/docs/3.5.1/sql-data-sources-jdbc.html#data-source-option
 ... this was introduced in Spark 3.4.0 to handle temp table query and CTE 
query against MSSQL server since what you send in is not actually what gets 
sent, there is some items that get wrapped.

There is more of the technical info in 
https://issues.apache.org/jira/browse/SPARK-37259 with the PR's linked that had 
the fix done for this


From: Elite 
Sent: Tuesday, April 23, 2024 10:28 PM
To: user@spark.apache.org
Subject: How to add MaxDOP option in spark mssql JDBC

[QUESTION] How to pass MAXDOP option * Issue #2395 * microsoft/mssql-jdbc 
(github.com)

Hi team,

I am suggested to require help form spark community.

We suspect spark rewerite the query before pass to ms sql, and it lead to 
syntax error.
Is there any work around to let make my codes work?


spark.read()
.format("jdbc")
.option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", "jdbc:sqlserver://xxx.database.windows.net;databaseName=")
.option("query", "SELECT TOP 10 * FROM dbo.Demo with (nolock) WHERE Id = 1 
option (maxdop 1)")
.load()
.show();

com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near the 
keyword 'option'.
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:270)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1778)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:697)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:616)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7775)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:4397)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:293)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:263)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeQuery(SQLServerPreparedStatement.java:531)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:61)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)

--
This message, and any attachment(s), is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/electronic-disclaimer. If you are not the intended 
recipient, please delete this message. For more information about how Bank of 
America protects your privacy, including specific rights that may apply, please 
visit the following pages: 
https://business.bofa.com/en-us/content/global-privacy-notices.html (which 
includes global privacy notices) and 
https://www.bankofamerica.com/security-center/privacy-overview/ (which includes 
US State specific privacy notices such as the 
http://www.bankofamerica.com/ccpa-notice).


IPv6 support

2015-05-20 Thread Kevin Liu
Hello, I have to work with IPv6 only servers and when I installed the
1.3.1 hadoop 2.6 build, I couldn¹t get the example to run due to IPv6
issues (errors below). I tried to add the
-Djava.net.preferIPv6Addresses=true setting but it still doesn¹t work. A
search on Spark¹s support for IPv6 is inconclusive. Can someone help
clarify the current status for IPv6?

Thanks
Kevin


‹‹ errors ‹

5/05/20 10:17:30 INFO Executor: Fetching
http://2401:db00:2030:709b:face:0:9:0:51453/jars/spark-examples-1.3.1-hadoo
p2.6.0.jar with timestamp 1432142250197
15/05/20 10:17:30 INFO Executor: Fetching
http://2401:db00:2030:709b:face:0:9:0:51453/jars/spark-examples-1.3.1-hadoo
p2.6.0.jar with timestamp 1432142250197
15/05/20 10:17:30 ERROR Executor: Exception in task 5.0 in stage 0.0 (TID
5)
java.net.MalformedURLException: For input string:
"db00:2030:709b:face:0:9:0:51453"
at java.net.URL.(URL.java:620)
at java.net.URL.(URL.java:483)
at java.net.URL.(URL.java:432)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:603)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:431)
at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Execu
tor$$updateDependencies$5.apply(Executor.scala:374)
at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Execu
tor$$updateDependencies$5.apply(Executor.scala:366)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Traver
sableLike.scala:772)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:7
71)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$upda
teDependencies(Executor.scala:366)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NumberFormatException: For input string:
"db00:2030:709b:face:0:9:0:51453"
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:6
5)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at java.net.URLStreamHandler.parseURL(URLStreamHandler.java:216)
at java.net.URL.(URL.java:615)
... 18 more






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to implement ALS with csv file? getting error while calling Rating class

2016-03-07 Thread Kevin Mellott
If you are using DataFrames, then you also can specify the schema when
loading as an alternate solution. I've found Spark-CSV
 to be a very useful library when
working with CSV data.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader


On Mon, Mar 7, 2016 at 1:10 AM, Nick Pentreath 
wrote:

> As you've pointed out, Rating requires user and item ids in Int form. So
> you will need to map String user ids to integers.
>
> See this thread for example:
> https://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAJgQjQ9GhGqpg1=hvxpfrs+59elfj9f7knhp8nyqnh1ut_6...@mail.gmail.com%3E
> .
>
> There is a DeveloperApi method
> in org.apache.spark.ml.recommendation.ALS that takes Rating with generic
> type (can be String) for user id and item id. However that is a little more
> involved, and for larger scale data will be a lot less efficient.
>
> Something like this for example:
>
> import org.apache.spark.ml.recommendation.ALS
> import org.apache.spark.ml.recommendation.ALS.Rating
>
> val conf = new SparkConf().setAppName("ALSWithStringID").setMaster("local[4]")
> val sc = new SparkContext(conf)
> // Name,Value1,Value2.
> val rdd = sc.parallelize(Seq(
>   Rating[String]("foo", "1", 4.0f),
>   Rating[String]("foo", "2", 2.0f),
>   Rating[String]("bar", "1", 5.0f),
>   Rating[String]("bar", "3", 1.0f)
> ))
> val (userFactors, itemFactors) = ALS.train(rdd)
>
>
> As you can see, you just get the factor RDDs back, and if you want an
> ALSModel you will have to construct it yourself.
>
>
> On Sun, 6 Mar 2016 at 18:23 Shishir Anshuman 
> wrote:
>
>> I am new to apache Spark, and I want to implement the Alternating Least
>> Squares algorithm. The data set is stored in a csv file in the format:
>> *Name,Value1,Value2*.
>>
>> When I read the csv file, I get
>> *java.lang.NumberFormatException.forInputString* error because the
>> Rating class needs the parameters in the format: *(user: Int, product:
>> Int, rating: Double)* and the first column of my file contains *Name*.
>>
>> Please suggest me a way to overcome this issue.
>>
>


Re: RDD recomputation

2016-03-10 Thread Kevin Mellott
I've had very good success troubleshooting this type of thing by using the
Spark Web UI, which will depict a breakdown of all tasks. This also
includes the RDDs being used, as well as any cached data. Additional
information about this tool can be found at
http://spark.apache.org/docs/latest/monitoring.html.

On Thu, Mar 10, 2016 at 1:31 PM, souri datta 
wrote:

> Hi,
>  Currently I am trying to optimize my spark application and in that
> process, I am trying to figure out if at any stage in the code, I am
> recomputing a large RDD (so that I can optimize it by
> persisting/checkpointing it).
>
> Is there any indication in the event logs that tells us about an RDD being
> computed?
> If anyone has done similar analysis, can you please share how you went
> about it.
>
> Thanks in advance,
> Souri
>


Re: How to convert Parquet file to a text file.

2016-03-15 Thread Kevin Mellott
I'd recommend reading the parquet file into a DataFrame object, and then
using spark-csv  to write to a CSV
file.

On Tue, Mar 15, 2016 at 3:34 PM, Shishir Anshuman  wrote:

> I need to convert the parquet file generated by the spark to a text (csv
> preferably) file. I want to use the data model outside spark.
>
> Any suggestion on how to proceed?
>


java.lang.OutOfMemoryError: Direct buffer memory when using broadcast join

2016-03-21 Thread Dai, Kevin
Hi,  All


I'm joining a small table (about 200m) with a huge table using broadcast join, 
however, spark throw the exception as follows:


16/03/20 22:32:06 WARN TransportChannelHandler: Exception in connection from
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at 
io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
at 
io.netty.buffer.CompositeByteBuf.allocBuffer(CompositeByteBuf.java:1345)
at 
io.netty.buffer.CompositeByteBuf.consolidateIfNeeded(CompositeByteBuf.java:276)
at 
io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:116)
at 
org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:82)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

Can anyone tell me what's wrong and how to fix it?

Best Regards,
Kevin.



Re: println not appearing in libraries when running job using spark-submit --master local

2016-03-28 Thread Kevin Peng
Ted,

What triggerAndWait does is perform a rest call to a specified url and then
waits until the status message that gets returned by that url in a json a
field says complete.  The issues is I put a println at the very top of the
method and that doesn't get printed out, and I know that println isn't
causing an issues because there is an exception that I throw further down
the line and that exception is what I am currently getting, but none of the
println along the way are showing:


  def triggerAndWait(url: String, pollInterval: Int = 1000 * 30,

timeOut: Int = 1000 * 60 * 60, connectTimeout: Int = 3,

readTimeout: Int = 3, requestMethod: String = "GET"): Boolean = {

println("Entering triggerAndWait function - url: " + url +

  " pollInterval: " + pollInterval.toString() + " timeOut: " +

  timeOut.toString() + " connectionTimeout: " +

  connectTimeout.toString() + " readTimeout: " + readTimeout.toString()
+

  " requestMethod: " + requestMethod)


.


Thanks,


KP

On Mon, Mar 28, 2016 at 1:52 PM, Ted Yu  wrote:

> Can you describe what gets triggered by triggerAndWait ?
>
> Cheers
>
> On Mon, Mar 28, 2016 at 1:39 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I am currently trying to debug a spark application written in scala.  I
>> have
>> a main method:
>>   def main(args: Array[String]) {
>> ...
>>  SocialUtil.triggerAndWait(triggerUrl)
>> ...
>>
>> The SocialUtil object is included in a seperate jar.  I launched the
>> spark-submit command using --jars passing the SocialUtil jar.  Inside the
>> triggerAndWait function I have a println statement that is the first thing
>> in the method, but it doesn't seem to be coming out.  All println that
>> happen inside the main function directly are appearing though.  I was
>> wondering if anyone knows what is going on in this situation and how I can
>> go about making the println in the SocialUtil object appear.
>>
>> Thanks,
>>
>> KP
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/println-not-appearing-in-libraries-when-running-job-using-spark-submit-master-local-tp26617.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Run a self-contained Spark app on a Spark standalone cluster

2016-04-12 Thread Kevin Eid
Hi,

Thanks for your emails, I tried running your command but it returned: "No
such file or directory".
So I definitely need to move my local .py files to the cluster, I tried
login but (before sshing) but could not find the master:
./spark-ec2 -k key -i key.pem  login weather-cluster
- then sshing, the copy-dir is located in the spark-ec2 but to, replicate
my files across all nodes I need to get them into the root folder in the
spark EC2 cluster:
./spark-ec2/copy-dir /root/spark/myfiles

I used that: http://spark.apache.org/docs/latest/ec2-scripts.html.

Do you have any suggestions about how to move those files from local to the
cluster?
Thanks in advance,
Kevin

On 12 April 2016 at 12:19, Sun, Rui  wrote:

> Which py file is your main file (primary py file)? Zip the other two py
> files. Leave the main py file alone. Don't copy them to S3 because it seems
> that only local primary and additional py files are supported.
>
> ./bin/spark-submit --master spark://... --py-files   file>
>
> -Original Message-
> From: kevllino [mailto:kevin.e...@mail.dcu.ie]
> Sent: Tuesday, April 12, 2016 5:07 PM
> To: user@spark.apache.org
> Subject: Run a self-contained Spark app on a Spark standalone cluster
>
> Hi,
>
> I need to know how to run a self-contained Spark app  (3 python files) in
> a Spark standalone cluster. Can I move the .py files to the cluster, or
> should I store them locally, on HDFS or S3? I tried the following locally
> and on S3 with a zip of my .py files as suggested  here <
> http://spark.apache.org/docs/latest/submitting-applications.html>  :
>
> ./bin/spark-submit --master
> spark://ec2-54-51-23-172.eu-west-1.compute.amazonaws.com:5080
> --py-files
> s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@mubucket
> //weather_predict.zip
>
> But get: “Error: Must specify a primary resource (JAR or Python file)”
>
> Best,
> Kevin
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Run-a-self-contained-Spark-app-on-a-Spark-standalone-cluster-tp26753.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Kevin EID
M.Sc. in Computing, Data Analytics
<https://fr.linkedin.com/pub/kevin-eid/85/689/b01>


Re: Run a self-contained Spark app on a Spark standalone cluster

2016-04-16 Thread Kevin Eid
One last email to announce that I've fixed all of the issues. Don't
hesitate to contact me if you encounter the same. I'd be happy to help.

Regards,
Kevin
On 14 Apr 2016 12:39 p.m., "Kevin Eid"  wrote:

> Hi all,
>
> I managed to copy my .py files from local to the cluster using SCP . And I
> managed to run my Spark app on the cluster against a small dataset.
>
> However, when I iterate over a dataset of 5GB I got the followings:
> org.apache.spark.shuffle.MetadataFetchFailedException + please see the
> joined screenshots.
>
> I am deploying 3*m3.xlarge and using the following parameters while
> submitting the app: --executor-memory 50g --driver-memory 20g
> --executor-cores 4 --num-executors 3.
>
> Can you recommend other configurations (driver executors number memory) or
> do I have to deploy more and larger instances  in order to run my app on
> 5GB ? Or do I need to add more partitions while reading the file?
>
> Best,
> Kevin
>
> On 12 April 2016 at 12:19, Sun, Rui  wrote:
>
>> Which py file is your main file (primary py file)? Zip the other two py
>> files. Leave the main py file alone. Don't copy them to S3 because it seems
>> that only local primary and additional py files are supported.
>>
>> ./bin/spark-submit --master spark://... --py-files  > file>
>>
>> -Original Message-
>> From: kevllino [mailto:kevin.e...@mail.dcu.ie]
>> Sent: Tuesday, April 12, 2016 5:07 PM
>> To: user@spark.apache.org
>> Subject: Run a self-contained Spark app on a Spark standalone cluster
>>
>> Hi,
>>
>> I need to know how to run a self-contained Spark app  (3 python files) in
>> a Spark standalone cluster. Can I move the .py files to the cluster, or
>> should I store them locally, on HDFS or S3? I tried the following locally
>> and on S3 with a zip of my .py files as suggested  here <
>> http://spark.apache.org/docs/latest/submitting-applications.html>  :
>>
>> ./bin/spark-submit --master
>> spark://ec2-54-51-23-172.eu-west-1.compute.amazonaws.com:5080
>> --py-files
>> s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@mubucket
>> //weather_predict.zip
>>
>> But get: “Error: Must specify a primary resource (JAR or Python file)”
>>
>> Best,
>> Kevin
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Run-a-self-contained-Spark-app-on-a-Spark-standalone-cluster-tp26753.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Kevin EID
> M.Sc. in Computing, Data Analytics
> <https://fr.linkedin.com/pub/kevin-eid/85/689/b01>
>
>


Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread Kevin Peng
Gourav,

Apologies.  I edited my post with this information:
Spark version: 1.6
Result from spark shell
OS: Linux version 2.6.32-431.20.3.el6.x86_64 (
mockbu...@c6b9.bsys.dev.centos.org) (gcc version 4.4.7 20120313 (Red Hat
4.4.7-4) (GCC) ) #1 SMP Thu Jun 19 21:14:45 UTC 2014

Thanks,

KP

On Mon, May 2, 2016 at 11:05 AM, Gourav Sengupta 
wrote:

> Hi,
>
> As always, can you please write down details regarding your SPARK cluster
> - the version, OS, IDE used, etc?
>
> Regards,
> Gourav Sengupta
>
> On Mon, May 2, 2016 at 5:58 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I am running into a weird result with Spark SQL Outer joins.  The results
>> for all of them seem to be the same, which does not make sense due to the
>> data.  Here are the queries that I am running with the results:
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT:23747
>>
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT:23747
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT: 23747
>>
>> Was wondering if someone had encountered this issues before.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread Kevin Peng
Gourav,

I wish that was case, but I have done a select count on each of the two
tables individually and they return back different number of rows:


dps.registerTempTable("dps_pin_promo_lt")

swig.registerTempTable("swig_pin_promo_lt")


dps.count()

RESULT: 42632


swig.count()

RESULT: 42034

On Mon, May 2, 2016 at 11:55 AM, Gourav Sengupta 
wrote:

> This shows that both the tables have matching records and no mismatches.
> Therefore obviously you have the same results irrespective of whether you
> use right or left join.
>
> I think that there is no problem here, unless I am missing something.
>
> Regards,
> Gourav
>
> On Mon, May 2, 2016 at 7:48 PM, kpeng1  wrote:
>
>> Also, the results of the inner query produced the same results:
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT:23747
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread Kevin Peng
Yong,

Sorry, let explain my deduction; it is going be difficult to get a sample
data out since the dataset I am using is proprietary.

>From the above set queries (ones mentioned in above comments) both inner
and outer join are producing the same counts.  They are basically pulling
out selected columns from the query, but there is no roll up happening or
anything that would possible make it suspicious that there is any
difference besides the type of joins.  The tables are matched based on
three keys that are present in both tables (ad, account, and date), on top
of this they are filtered by date being above 2016-01-03.  Since all the
joins are producing the same counts, the natural suspicions is that the
tables are identical, but I when I run the following two queries:

scala> sqlContext.sql("select * from swig_pin_promo_lt where date
>='2016-01-03'").count

res14: Long = 34158

scala> sqlContext.sql("select * from dps_pin_promo_lt where date
>='2016-01-03'").count

res15: Long = 42693


The above two queries filter out the data based on date used by the joins
of 2016-01-03 and you can see the row count between the two tables are
different, which is why I am suspecting something is wrong with the outer
joins in spark sql, because in this situation the right and outer joins may
produce the same results, but it should not be equal to the left join and
definitely not the inner join; unless I am missing something.


Side note: In my haste response above I posted the wrong counts for
dps.count, the real value is res16: Long = 42694


Thanks,


KP



On Mon, May 2, 2016 at 12:50 PM, Yong Zhang  wrote:

> We are still not sure what is the problem, if you cannot show us with some
> example data.
>
> For dps with 42632 rows, and swig with 42034 rows, if dps full outer join
> with swig on 3 columns; with additional filters, get the same resultSet row
> count as dps lefter outer join with swig on 3 columns, with additional
> filters, again get the the same resultSet row count as dps right outer join
> with swig on 3 columns, with same additional filters.
>
> Without knowing your data, I cannot see the reason that has to be a bug in
> the spark.
>
> Am I misunderstanding your bug?
>
> Yong
>
> --
> From: kpe...@gmail.com
> Date: Mon, 2 May 2016 12:11:18 -0700
> Subject: Re: Weird results with Spark SQL Outer joins
> To: gourav.sengu...@gmail.com
> CC: user@spark.apache.org
>
>
> Gourav,
>
> I wish that was case, but I have done a select count on each of the two
> tables individually and they return back different number of rows:
>
>
> dps.registerTempTable("dps_pin_promo_lt")
> swig.registerTempTable("swig_pin_promo_lt")
>
>
> dps.count()
> RESULT: 42632
>
>
> swig.count()
> RESULT: 42034
>
> On Mon, May 2, 2016 at 11:55 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> This shows that both the tables have matching records and no mismatches.
> Therefore obviously you have the same results irrespective of whether you
> use right or left join.
>
> I think that there is no problem here, unless I am missing something.
>
> Regards,
> Gourav
>
> On Mon, May 2, 2016 at 7:48 PM, kpeng1  wrote:
>
> Also, the results of the inner query produced the same results:
> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
> AS
> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
> =
> d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count()
> RESULT:23747
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Davies,

Here is the code that I am typing into the spark-shell along with the
results (my question is at the bottom):

val dps =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/dps_csv/")
val swig =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/swig_csv/")

dps.count
res0: Long = 42694

swig.count
res1: Long = 42034


dps.registerTempTable("dps_pin_promo_lt")
swig.registerTempTable("swig_pin_promo_lt")

sqlContext.sql("select * from dps_pin_promo_lt where date >
'2016-01-03'").count
res4: Long = 42666

sqlContext.sql("select * from swig_pin_promo_lt where date >
'2016-01-03'").count
res5: Long = 34131

sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
where date > '2016-01-03'").count
res6: Long = 42533

sqlContext.sql("select distinct date, account, ad from swig_pin_promo_lt
where date > '2016-01-03'").count
res7: Long = 34131


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res9: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res10: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res11: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res12: Long = 23809



>From my results above, we notice that the counts of distinct values based
on the join criteria and filter criteria for each individual table is
located at res6 and res7.  My question is why is the outer join producing
less rows than the smallest table; if there are no matches it should still
bring in that row as part of the outer join.  For the full and right outer
join I am expecting to see a minimum of res6 rows, but I get less, is there
something specific that I am missing here?  I am expecting that the full
outer join would give me the union of the two table sets so I am expecting
at least 42533 rows not 23809.


Gourav,

I just ran this result set on a new session with slightly newer data...
still seeing those results.



Thanks,

KP


On Mon, May 2, 2016 at 11:16 PM, Davies Liu  wrote:

> as @Gourav said, all the join with different join type show the same
> results,
> which meant that all the rows from left could match at least one row from
> right,
> all the rows from right could match at least one row from left, even
> the number of row from left does not equal that of right.
>
> This is correct result.
>
> On Mon, May 2, 2016 at 2:13 PM, Kevin Peng  wrote:
> > Yong,
> >
> > Sorry, let explain my deduction; it is going be difficult to get a sample
> > data out since the dataset I am using is proprietary.
> >
> > From the above set queries (ones mentioned in above comments) both inner
> and
> > outer join are producing the same counts.  They are basically pulling out
> > selected columns from the query, but there is no roll up happening or
> > anything that would possible make it suspicious that there is any
> difference
> > besides the type of joins.  The tables are matched based on three keys
> that
> > are present in both tables (ad, account, and date), on top of this they
> are
> > filtered by date being above 2016-01-03.  Since all the joins are
> producing
> > the same counts, the natural suspicions is that the tables are identical,
> > but I when I run the following two queries:
> >
&g

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Davies,

What exactly do you mean in regards to Spark 2.0 turning these join into an
inner join?  Does this mean that spark sql won't be supporting where
clauses in outer joins?


Cesar & Gourav,

When running the queries without the where clause it works as expected.  I
am pasting my results below:
val dps =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/dps_csv/")
val swig =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/swig_csv/")

dps.count
res0: Long = 42694

swig.count
res1: Long = 42034


dps.registerTempTable("dps_pin_promo_lt")
swig.registerTempTable("swig_pin_promo_lt")


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res5: Long = 60919


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res6: Long = 42034


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res7: Long = 42694

Thanks,

KP


On Tue, May 3, 2016 at 10:42 AM, Davies Liu  wrote:

> Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
> '2016-01-03' is the root cause,
>  which will filter out all the nulls from outer join, will have same
> result as inner join.
>
> In Spark 2.0, we turn these join into inner join actually.
>
> On Tue, May 3, 2016 at 9:50 AM, Cesar Flores  wrote:
> > Hi
> >
> > Have you tried the joins without the where clause? When you use them you
> are
> > filtering all the rows with null columns in those fields. In other words
> you
> > are doing a inner join in all your queries.
> >
> > On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> > wrote:
> >>
> >> Hi Kevin,
> >>
> >> Having given it a first look I do think that you have hit something here
> >> and this does not look quite fine. I have to work on the multiple AND
> >> conditions in ON and see whether that is causing any issues.
> >>
> >> Regards,
> >> Gourav Sengupta
> >>
> >> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:
> >>>
> >>> Davies,
> >>>
> >>> Here is the code that I am typing into the spark-shell along with the
> >>> results (my question is at the bottom):
> >>>
> >>> val dps =
> >>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>> "true").load("file:///home/ltu/dps_csv/")
> >>> val swig =
> >>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>> "true").load("file:///home/ltu/swig_csv/")
> >>>
> >>> dps.count
> >>> res0: Long = 42694
> >>>
> >>> swig.count
> >>> res1: Long = 42034
> >>>
> >>>
> >>> dps.registerTempTable("dps_pin_promo_lt")
> >>> swig.registerTempTable("swig_pin_promo_lt")
> >>>
> >>> sqlContext.sql("select * from dps_pin_promo_lt where date >
> >>> '2016-01-03'").count
> >>> res4: Long = 42666
> >>>
> >>> sqlContext.sql("select * from swig_pin_promo_lt where date >
> >>> '2016-01-03'").count
> >>> res5: Long = 34131
> >>>
> >>> sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
> >>> where date > '2016-01-03'").count
> >>> res6: Long = 42533
> >>>
> >>> sqlContext.sql("select distinct date, account, ad from
> swig_pin_promo_lt
> >>> where date > '2016-01-03'").count
> >>> res7: Long = 34131
> >>>
> >>>
> >>> sqlContext.sql("SELECT s.date AS edat

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Mike,

It looks like you are right.  The result seem to be fine.  It looks like I
messed up on the filtering clause.

sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE (s.date >= '2016-01-03' OR s.date IS NULL) AND (d.date >=
'2016-01-03' OR d.date IS NULL)").count()
res2: Long = 53042

Davies, Cesar, Gourav,

Thanks for the support.

KP

On Tue, May 3, 2016 at 11:26 AM, Michael Segel 
wrote:

> Silly question?
>
> If you change the predicate to
> ( s.date >= ‘2016-01-03’ OR s.date IS NULL )
> AND
> (d.date >= ‘2016-01-03’ OR d.date IS NULL)
>
> What do you get?
>
> Sorry if the syntax isn’t 100% correct. The idea is to not drop null
> values from the query.
> I would imagine that this shouldn’t kill performance since its most likely
> a post join filter on the result set?
> (Or is that just a Hive thing?)
>
> -Mike
>
> > On May 3, 2016, at 12:42 PM, Davies Liu  wrote:
> >
> > Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
> > '2016-01-03' is the root cause,
> > which will filter out all the nulls from outer join, will have same
> > result as inner join.
> >
> > In Spark 2.0, we turn these join into inner join actually.
> >
> > On Tue, May 3, 2016 at 9:50 AM, Cesar Flores  wrote:
> >> Hi
> >>
> >> Have you tried the joins without the where clause? When you use them
> you are
> >> filtering all the rows with null columns in those fields. In other
> words you
> >> are doing a inner join in all your queries.
> >>
> >> On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> >> wrote:
> >>>
> >>> Hi Kevin,
> >>>
> >>> Having given it a first look I do think that you have hit something
> here
> >>> and this does not look quite fine. I have to work on the multiple AND
> >>> conditions in ON and see whether that is causing any issues.
> >>>
> >>> Regards,
> >>> Gourav Sengupta
> >>>
> >>> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:
> >>>>
> >>>> Davies,
> >>>>
> >>>> Here is the code that I am typing into the spark-shell along with the
> >>>> results (my question is at the bottom):
> >>>>
> >>>> val dps =
> >>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>>> "true").load("file:///home/ltu/dps_csv/")
> >>>> val swig =
> >>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>>> "true").load("file:///home/ltu/swig_csv/")
> >>>>
> >>>> dps.count
> >>>> res0: Long = 42694
> >>>>
> >>>> swig.count
> >>>> res1: Long = 42034
> >>>>
> >>>>
> >>>> dps.registerTempTable("dps_pin_promo_lt")
> >>>> swig.registerTempTable("swig_pin_promo_lt")
> >>>>
> >>>> sqlContext.sql("select * from dps_pin_promo_lt where date >
> >>>> '2016-01-03'").count
> >>>> res4: Long = 42666
> >>>>
> >>>> sqlContext.sql("select * from swig_pin_promo_lt where date >
> >>>> '2016-01-03'").count
> >>>> res5: Long = 34131
> >>>>
> >>>> sqlContext.sql("select distinct date, account, ad from
> dps_pin_promo_lt
> >>>> where date > '2016-01-03'").count
> >>>> res6: Long = 42533
> >>>>
> >>>> sqlContext.sql("select distinct date, account, ad from
> swig_pin_promo_lt
> >>>> where date > '2016-01-03'").count
> >>>> res7: Long = 34131
> >>>>
> >>>>
> >>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
> >>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> >>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
> >>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
> s.ad =
> >>&

Re: Alternative to groupByKey() + mapValues() for non-commutative, non-associative aggregate?

2016-05-03 Thread Kevin Mellott
If you put this into a dataframe then you may be able to use one hot
encoding and treat these as categorical features. I believe that the ml
pipeline components use project tungsten so the performance will be very
fast. After you process the result on the dataframe you would then need to
assemble your desired format.
On May 3, 2016 4:29 PM, "Bibudh Lahiri"  wrote:

> Hi,
>   I have multiple procedure codes that a patient has undergone, in an RDD
> with a different row for each combination of patient and procedure. I am
> trying to covert this data to the LibSVM format, so that the resultant
> looks as follows:
>
>   "0 1:1 2:0 3:1 29:1 30:1 32:1 110:1"
>
>   where 1, 2, 3, 29, 30, 32, 110 are numeric procedure codes a given
> patient has undergone. Note that Spark needs these codes to be one-based
> and in ascending order, so I am using a combination of groupByKey() and
> mapValues() to do this conversion as follows:
>
> procedures_rdd = procedures_rdd.groupByKey().mapValues(combine_procedures)
>
> where combine_procedures() is defined as:
>
> def combine_procedures(l_procs):
>   ascii_procs = map(lambda x: int(custom_encode(x)), l_procs)
>   return ' '.join([str(x) + ":1" for x in sorted(ascii_procs)])
>
>   Note that this reduction is neither commutative nor associative, since
> combining "29:1 30:1" and "32:1 110:1" to "32:1 110:1 29:1 30:1" is not
> going to work.
>   Can someone suggest some faster alternative to the combination
> of groupByKey() and mapValues() for this?
>
> Thanks
>Bibudh
>
>
> --
> Bibudh Lahiri
> Senior Data Scientist, Impetus Technolgoies
> 720 University Avenue, Suite 130
> Los Gatos, CA 95129
> http://knowthynumbers.blogspot.com/
>
>


Compute the global rank of the column

2016-05-31 Thread Dai, Kevin
Hi, All


I want to compute the rank of some column in a table.


Currently, I use the window function to do it.


However all data will be in one partition.


Is there better solution to do it?


Regards,

Kevin.


Classpath hell and Elasticsearch 2.3.2...

2016-06-02 Thread Kevin Burton
I'm trying to get spark 1.6.1 to work with 2.3.2... needless to say it's
not super easy.

I wish there was an easier way to get this stuff to work.. Last time I
tried to use spark more I was having similar problems with classpath setup
and Cassandra.

Seems a huge opportunity to make this easier for new developers.  This
stuff isn't rocket science but it can (needlessly) waste a ton of time.

... anyway... I'm have since figured out I have to specific *specific* jars
from the elasticsearch-hadoop distribution and use those.

Right now I'm using :

SPARK_CLASSPATH=/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-2.3.2.jar:/usr/share/elasticsearch-hadoop/lib/elasticsearch-spark_2.11-2.3.2.jar:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-mr-2.3.2.jar:/usr/share/apache-spark/lib/*

... but I"m getting:

java.lang.NoClassDefFoundError: Could not initialize class
org.elasticsearch.hadoop.util.Version
at
org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:376)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

... but I think its caused by this:

16/06/03 00:26:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
localhost): java.lang.Error: Multiple ES-Hadoop versions detected in the
classpath; please use only one
jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-2.3.2.jar
jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-spark_2.11-2.3.2.jar
jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-mr-2.3.2.jar

at org.elasticsearch.hadoop.util.Version.(Version.java:73)
at
org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:376)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

.. still tracking this down but was wondering if there is someting obvious
I'm dong wrong.  I'm going to take out elasticsearch-hadoop-2.3.2.jar and
try again.

Lots of trial and error here :-/

Kevin

-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>


Re: Classpath hell and Elasticsearch 2.3.2...

2016-06-02 Thread Kevin Burton
Thanks.

I'm trying to run it in a standalone cluster with an existing / large 100
node ES install.

I'm using the standard 1.6.1 -2.6 distribution with
elasticsearch-hadoop-2.3.2...

I *think* I'm only supposed to use the
elasticsearch-spark_2.11-2.3.2.jar with it...

but now I get the following exception:


java.lang.NoSuchMethodError:
scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:52)
at
org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:37)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:40)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:57)
at $iwC$$iwC$$iwC$$iwC.(:59)
at $iwC$$iwC$$iwC.(:61)
at $iwC$$iwC.(:63)
at $iwC.(:65)
at (:67)
at .(:71)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:875)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


On Thu, Jun 2, 2016 at 3:45 PM, Nick Pentreath 
wrote:

> Hey there
>
> When I used es-hadoop, I just pulled in the dependency into my pom.xml,
> with spark as a "provided" dependency, and built a fat jar with assembly.
>
> Then with spark-submit use the --jars option to include your assembly jar
> (IIRC I sometimes also needed to use --driver-classpath too, but perhaps
> not with recent Spark versions).
>
>
>
> On Thu, 2 Jun 2016 at 15:34 Kevin Burton  wrote:
>
>> I'm trying to get spark 1.6.1 to work with 2.3.2... needless to say it's
>> not super easy.
>>
>> I wish there was an easier way to get this stuff to work.. Last time I
>> tried to use spark more I was having similar problems with classpath setup
>> and Cassandra.
>>
>> Seems a huge opportunity to make this easier for new developers.  This
>> stuff isn't rocket science but it can (needlessly) waste a ton of time.
>>
>> ... anyway... I'm have since figured out I have to specific *specific*
>> jars from the elasticsearch-hadoop distribution and use those.
>>
>> Right now I'm using :

Re: Classpath hell and Elasticsearch 2.3.2...

2016-06-02 Thread Kevin Burton
Yeah.. thanks Nick. Figured that out since your last email... I deleted the
2.10 by accident but then put 2+2 together.

Got it working now.

Still sticking to my story that it's somewhat complicated to setup :)

Kevin

On Thu, Jun 2, 2016 at 3:59 PM, Nick Pentreath 
wrote:

> Which Scala version is Spark built against? I'd guess it's 2.10 since
> you're using spark-1.6, and you're using the 2.11 jar for es-hadoop.
>
>
> On Thu, 2 Jun 2016 at 15:50 Kevin Burton  wrote:
>
>> Thanks.
>>
>> I'm trying to run it in a standalone cluster with an existing / large 100
>> node ES install.
>>
>> I'm using the standard 1.6.1 -2.6 distribution with
>> elasticsearch-hadoop-2.3.2...
>>
>> I *think* I'm only supposed to use the
>> elasticsearch-spark_2.11-2.3.2.jar with it...
>>
>> but now I get the following exception:
>>
>>
>> java.lang.NoSuchMethodError:
>> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>> at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:52)
>> at
>> org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:37)
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:40)
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
>> at $iwC$$iwC$$iwC$$iwC$$iwC.(:57)
>> at $iwC$$iwC$$iwC$$iwC.(:59)
>> at $iwC$$iwC$$iwC.(:61)
>> at $iwC$$iwC.(:63)
>> at $iwC.(:65)
>> at (:67)
>> at .(:71)
>> at .()
>> at .(:7)
>> at .()
>> at $print()
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>> at
>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>> at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>> at
>> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
>> at
>> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
>> at
>> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:875)
>> at
>> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
>> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
>> at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
>> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
>> at org.apache.spark.repl.SparkILoop.org
>> $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>> at
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>> at org.apache.spark.repl.SparkILoop.org
>> $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
>> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
>> at org.apache.spark.repl.Main$.main(Main.scala:31)
>> at org.apache.spark.repl.Main.main(Main.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>> at org.apache.spark.deploy.SparkSubmit$.mai

Re: rdd join very slow when rdd created from data frame

2016-01-12 Thread Kevin Mellott
Can you please provide the high-level schema of the entities that you are
attempting to join? I think that you may be able to use a more efficient
technique to join these together; perhaps by registering the Dataframes as
temp tables and constructing a Spark SQL query.

Also, which version of Spark are you using?

On Tue, Jan 12, 2016 at 4:16 PM, Koert Kuipers  wrote:

> we are having a join of 2 rdds thats fast (< 1 min), and suddenly it
> wouldn't even finish overnight anymore. the change was that the rdd was now
> derived from a dataframe.
>
> so the new code that runs forever is something like this:
> dataframe.rdd.map(row => (Row(row(0)), row)).join(...)
>
> any idea why?
> i imagined it had something to do with recomputing parts of the data
> frame, but even a small change like this makes the issue go away:
> dataframe.rdd.map(row => Row.fromSeq(row.toSeq)).map(row => (Row(row(0)),
> row)).join(...)
>


Re: yarn-client: SparkSubmitDriverBootstrapper not found in yarn client mode (1.6.0)

2016-01-13 Thread Kevin Mellott
Lin - if you add "--verbose" to your original *spark-submit* command, it
will let you know the location in which Spark is running. As Marcelo
pointed out, this will likely indicate version 1.3, which may help you
confirm if this is your problem.

On Wed, Jan 13, 2016 at 12:06 PM, Marcelo Vanzin 
wrote:

> SparkSubmitDriverBootstrapper was removed back in Spark 1.4, so it
> seems you have a mixbag of 1.3 / 1.6 in your path / classpath and
> things are failing because of that.
>
> On Wed, Jan 13, 2016 at 9:31 AM, Lin Zhao  wrote:
> > My job runs fine in yarn cluster mode but I have reason to use client
> mode
> > instead. But I'm hitting this error when submitting:
> >> spark-submit --class com.exabeam.martini.scripts.SparkStreamingTest
> >> --master yarn --deploy-mode client --executor-memory 90G
> --num-executors 3
> >> --executor-cores 14 Martini-assembly-0.1.jar yarn-client
> >
> > Error: Could not find or load main class
> > org.apache.spark.deploy.SparkSubmitDriverBootstrapper
> >
> >
> >  If I replace deploy-mode to cluster the job is submitted successfully.
> Is
> > there a dependency missing from my project? Right now only one I
> included is
> > spark-streaming 1.6.0.
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Hive is unable to avro file written by spark avro

2016-01-13 Thread Kevin Mellott
Hi Sivakumar,

I have run into this issue in the past, and we were able to fix it by using
an explicit schema when saving the DataFrame to the Avro file. This schema
was an exact match to the one associated with the metadata on the Hive
database table, which allowed the Hive queries to work even after updating
the underlying Avro file via Spark.

We are using Spark 1.3.0, and I was hoping to find a better solution to
this problem once we upgrade to Spark 1.5.0 (we manage versions via CDH).
This one works, but the coding involved can be a little tedious based on
the complexity of your data.

If memory serves correctly, the explicit schema was necessary because our
data structure contained optional nested properties. The DataFrame writer
will automatically create a schema for you, but ours was differing based on
the data being saved (i.e. whether it did or did not contain a nested
element).

- Kevin

On Wed, Jan 13, 2016 at 7:20 PM, Siva  wrote:

> Hi Everyone,
>
> Avro data written by dataframe in hdfs in not able to read by hive. Saving
> data avro format with below statement.
>
> df.save("com.databricks.spark.avro", SaveMode.Append, Map("path" -> path))
>
> Created hive avro external table and while reading I see all nulls. Did
> anyone face similar issue, what is the best way to write the data in avro
> format from spark, so that it can also readable by hive.
>
> Thanks,
> Sivakumar Bhavanari.
>


Re: sqlContext.cacheTable("tableName") vs dataFrame.cache()

2016-01-15 Thread Kevin Mellott
Hi George,

I believe that sqlContext.cacheTable("tableName") is to be used when you
want to cache the data that is being used within a Spark SQL query. For
example, take a look at the code below.


> val myData = sqlContext.load("com.databricks.spark.csv", Map("path" ->
> "hdfs://somepath/file", "header" -> "false").toDF("col1", "col2")
>
myData.registerTempTable("myData")


Here, the usage of *cache()* will affect ONLY the *myData.select* query.

> myData.cache()

myData.select("col1", "col2").show()


Here, the usage of *cacheTable* will affect ONLY the *sqlContext.sql* query.

> sqlContext.cacheTable("myData")

sqlContext.sql("SELECT col1, col2 FROM myData").show()


Thanks,
Kevin

On Fri, Jan 15, 2016 at 7:00 AM, George Sigletos 
wrote:

> According to the documentation they are exactly the same, but in my
> queries
>
> dataFrame.cache()
>
> results in much faster execution times vs doing
>
> sqlContext.cacheTable("tableName")
>
> Is there any explanation about this? I am not caching the RDD prior to
> creating the dataframe. Using Pyspark on Spark 1.5.2
>
> Kind regards,
> George
>


Re: Multi tenancy, REST and MLlib

2016-01-15 Thread Kevin Mellott
It sounds like you may be interested in a solution that implements the Lambda
Architecture , such as
Oryx2 . At a high level, this gives you the ability to
request and receive information immediately (serving layer), generating the
responses using a pre-built model (speed layer). Meanwhile, that model is
constantly being updated in the background as new information becomes
available (batch layer).

An example of a pre-built model in this scenario may be a predictive model
that want to predict the class of an incoming piece of data (i.e. does this
email look like SPAM or not).

On Fri, Jan 15, 2016 at 5:00 PM, feribg  wrote:

> I'm fairly new to Spark and Mllib, but i'm doing some research into multi
> tenancy of mllib based app. The idea is to provide ability to train models
> on demand with certain constraints (executor size) and then allow to serve
> predictions from those models via a REST layer.
>
> So far from my research I've gathered the following:
>
> 1) It's fairly easy to schedule training jobs and define the size of the
> executor of the job with something like spark job server or via cmd. I'd
> imagine you need separate contexts here anyways, because if theres one big
> context shared amongst different tenants, it wont allow training different
> models in parallel for the most part. So the solution here seems a context
> per tenant and training via Spark Job Server.
>
> 2) Second part seems a bit more tricky as it must expose the results of the
> trained models to the outside world via some form of API. So far I've been
> able to create a new context inside of a simple Spring REST application,
> load the persisted model and be able to call predict and return results.
>
> My main problem with this approach is that now I need to load the whole
> spark context for each single model instance and a single tenant can
> potentially have a bunch, which also means at least a JVM per tenant and
> this is quite wasteful. It seems the actual prediction part is fairly
> simple
> and I was wondering if there was a way to share multiple models to predict
> from on the same context. Would that allow parallel predictions (ie model B
> doesnt have to wait for a prediction of model A to complete in order to
> return).
>
> Given this simple scenario do you see a better approach to architect that,
> maybe I'm missing certain features of Spark that would facilitate it in a
> cleaner and more efficient manner.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Multi-tenancy-REST-and-MLlib-tp25979.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem

2016-01-20 Thread Kevin Mellott
Hi Andy,

According to the API documentation for DataFrame
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame>,
you should have access to *sqlContext* as a property off of the DataFrame
instance. In your example, you could then do something like:

df.sqlContext.udf.register(...)

Thanks,
Kevin

On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> For clarity callUDF() is not defined on DataFrames. It is defined on 
> org.apache.spark.sql.functions
> . Strange the class name starts with lower case. I have not figure out
> how to use function class.
>
>
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html
>
> Andy
>
> From: Andrew Davidson 
> Date: Wednesday, January 20, 2016 at 4:05 PM
> To: "user @spark" 
> Subject: trouble implementing complex transformer in java that can be
> used with Pipeline. Scala to Java porting problem
>
> I am using 1.6.0. I am having trouble implementing a custom transformer
> derived from org.apache.spark.ml.Transformer in Java that I can use in
> a PipeLine.
>
> So far the only way I figure out how to implement any kind of complex
> functionality and have it applied to a DataFrame is to implement a UDF. For
> example
>
>
>class StemmerUDF implements UDF1>, Serializable {
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
> public List call(String text) throws Exception {
>
> List ret = stemText(text); //call org.apache.lucene
>
> return ret;
>
> }
>
> }
>
>
> Before I can use the UDF it needs to be registered. This requires the
> sqlContext. *The problem is sqlContext is not available during
> pipeline.load()*
>
>void registerUDF(SQLContext sqlContext) {
>
> if (udf == null) {
>
> udf = new StemmerUDF();
>
> DataType returnType = DataTypes.createArrayType(DataTypes.
> StringType);
>
> sqlContext.udf().register(udfName, udf, returnType);
>
> }
>
> }
>
>
> Our transformer needs to implement transform(). For it to be able to use
> the registered UDF we need the sqlContext. *The problem is the sqlContext
> is not part of the signature of transform.* My current hack is to pass
> the sqlContext to the constructor and not to use pipelines
>
>   @Override
>
> public DataFrame transform(DataFrame df) {
>
>   String fmt = "%s(%s) as %s";
>
> String stmt = String.format(fmt, udfName, inputCol, outputCol);
>
> logger.info("\nstmt: {}", stmt);
>
> DataFrame ret = df.selectExpr("*", stmt);
>
> return ret;
>
> }
>
>
> Is they a way to do something like df.callUDF(myUDF);
>
>
> *The following Scala code looks like it is close to what I need. I not
> been able to figure out how do something like this in Java 8. callUDF does
> not seem to be avaliable.*
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>
> @DeveloperApi
>
> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT,
> T]]
>
>   extends Transformer with HasInputCol with HasOutputCol with Logging {
>
> . . .
>
>
>  override def transform(dataset: DataFrame): DataFrame = {
>
> transformSchema(dataset.schema, logging = true)
>
> dataset.withColumn($(outputCol),
>
>   callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol
> 
>
>   }
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
>
>
> class Tokenizer(override val uid: String)
>
>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
> DefaultParamsWritable {
>
>
> . . .
>
>   override protected def createTransformFunc: String => Seq[String] = {
>
> _.toLowerCase.split("\\s")
>
>   }
>
> . . .
>
> }
>
>
> Kind regards
>
>
> Andy
>
>
>


Re: Passing binding variable in query used in Data Source API

2016-01-21 Thread Kevin Mellott
Another alternative that you can consider is to use Sqoop
 to move your data from PostgreSQL to HDFS, and
then just load it into your DataFrame without needing to use JDBC drivers.
I've had success using this approach, and depending on your setup you can
easily manage/schedule this type of workflow using a tool like Oozie
.

On Thu, Jan 21, 2016 at 8:34 AM, Todd Nist  wrote:

> Hi Satish,
>
> You should be able to do something like this:
>
>val props = new java.util.Properties()
>props.put("user", username)
>props.put("password",pwd)
>props.put("driver", "org.postgresql.Drive")
>val deptNo = 10
>val where = Some(s"dept_number = $deptNo")
>val df = sqlContext.read.jdbc("jdbc:postgresql://
> 10.00.00.000:5432/db_test?user=username&password=password
> ", "
> schema.table1", Array(where.getOrElse("")), props)
>
> or just add the fillter to your query like this and I believe these should
> get pushed down.
>
>   val df = sqlContext.read
> .format("jdbc")
> .option("url", "jdbc:postgresql://
> 10.00.00.000:5432/db_test?user=username&password=password
> ")
> .option("user", username)
> .option("password", pwd)
> .option("driver", "org.postgresql.Driver")
> .option("dbtable", "schema.table1")
> .load().filter('dept_number === $deptNo)
>
> This is form the top of my head and the code has not been tested or
> compiled.
>
> HTH.
>
> -Todd
>
>
> On Thu, Jan 21, 2016 at 6:02 AM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> Hi All,
>>
>> We have requirement to fetch data from source PostgreSQL database as per
>> a condition, hence need to pass a binding variable in query used in Data
>> Source API as below:
>>
>>
>> var DeptNbr = 10
>>
>> val dataSource_dF=cc.load("jdbc",Map("url"->"jdbc:postgresql://
>> 10.00.00.000:5432/db_test?user=username&password=password","driver"->"org.postgresql.Driver","dbtable"->"(select*
>> from schema.table1 where dept_number=DeptNbr) as table1"))
>>
>>
>> But it errors saying expected ';' but found '='
>>
>>
>> Note: As it is an iterative approach hence cannot use constants but need
>> to pass variable to query
>>
>>
>> If anybody had a similar implementation to pass binding variable while
>> fetching data from source database using Data Source than please provide
>> details on the same
>>
>>
>> Regards,
>>
>> Satish Chandra
>>
>
>


Re: [Spark] Reading avro file in Spark 1.3.0

2016-01-25 Thread Kevin Mellott
I think that you may be looking at documentation pertaining to the more
recent versions of Spark. Try looking at the examples linked below, which
applies to the Spark 1.3 version. There aren't many Java examples, but the
code should be very similar to the Scala ones (i.e. using "load" instead of
"read' on the DataFrame).

https://github.com/databricks/spark-avro/tree/branch-1.0

On Mon, Jan 25, 2016 at 4:38 AM, diplomatic Guru 
wrote:

> Hello guys,
>
> I've been trying to read avro file using Spark's DataFrame but it's
> throwing this error:
> java.lang.NoSuchMethodError:
> org.apache.spark.sql.SQLContext.read()Lorg/apache/spark/sql/DataFrameReader;
>
> This is what I've done so far:
>
> I've added the dependency to pom.xml:
>
> 
> com.databricks
> spark-avro_2.10
> 1.0.0
> 
>
> Java code:
>
> JavaSparkContext sc = new JavaSparkContext(sparkConf);
> SQLContext sqlContext = new SQLContext(sc);
> DataFrame df =
> sqlContext.read().format("com.databricks.spark.avro").load(args[0]);
>
> Could you please let me know what am I doing wrong?
>
> Thanks.
>
>
>
>
>


Re: Spark SQL . How to enlarge output rows ?

2016-01-27 Thread Kevin Mellott
I believe that *show* should work if you provide it with both the number of
rows and the truncate flag.

ex: df.show(10, false)

http://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show


On Wed, Jan 27, 2016 at 2:39 AM, Akhil Das 
wrote:

> Why would you want to print all rows? You can try the following:
>
> sqlContext.sql("select day_time from my_table limit
> 10").collect().foreach(println)
>
>
>
> Thanks
> Best Regards
>
> On Sun, Jan 24, 2016 at 5:58 PM, Eli Super  wrote:
>
>> Unfortunately still getting error when use .show() with `false` or
>> `False` or `FALSE`
>>
>> Py4JError: An error occurred while calling o153.showString. Trace:
>> py4j.Py4JException: Method showString([class java.lang.String, class 
>> java.lang.Boolean]) does not exist
>>  at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>>  at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>>  at py4j.Gateway.invoke(Gateway.java:252)
>>  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>  at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>  at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>> On Thu, Jan 21, 2016 at 4:54 PM, Spencer, Alex (Santander) <
>> alex.spen...@santander.co.uk> wrote:
>>
>>> I forgot to add this is (I think) from 1.5.0.
>>>
>>>
>>>
>>> And yeah that looks like a Python – I’m not hot with Python but it may
>>> be capitalised as False or FALSE?
>>>
>>>
>>>
>>>
>>>
>>> *From:* Eli Super [mailto:eli.su...@gmail.com]
>>> *Sent:* 21 January 2016 14:48
>>> *To:* Spencer, Alex (Santander)
>>> *Cc:* user@spark.apache.org
>>> *Subject:* Re: Spark SQL . How to enlarge output rows ?
>>>
>>>
>>>
>>> Thanks Alex
>>>
>>>
>>>
>>> I get NameError
>>>
>>> NameError: name 'false' is not defined
>>>
>>> Is it because of PySpark ?
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Jan 14, 2016 at 3:34 PM, Spencer, Alex (Santander) <
>>> alex.spen...@santander.co.uk> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> Try …..show(*false*)
>>>
>>>
>>>
>>> public void show(int numRows,
>>>
>>> boolean truncate)
>>>
>>>
>>>
>>>
>>>
>>> Kind Regards,
>>>
>>> Alex.
>>>
>>>
>>>
>>> *From:* Eli Super [mailto:eli.su...@gmail.com]
>>> *Sent:* 14 January 2016 13:09
>>> *To:* user@spark.apache.org
>>> *Subject:* Spark SQL . How to enlarge output rows ?
>>>
>>>
>>>
>>>
>>>
>>> Hi
>>>
>>>
>>>
>>> After executing sql
>>>
>>>
>>>
>>> sqlContext.sql("select day_time from my_table limit 10").show()
>>>
>>>
>>>
>>> my output looks like  :
>>>
>>> ++
>>>
>>> |  day_time|
>>>
>>> ++
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:53:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:51:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:53:...|
>>>
>>> ++
>>>
>>>
>>>
>>> I'd like to get full rows
>>>
>>> Thanks !
>>>
>>> Emails aren't always secure, and they may be intercepted or changed after
>>> they've been sent. Santander doesn't accept liability if this happens.
>>> If you
>>> think someone may have interfered with this email, please get in touch
>>> with the
>>> sender another way. This message doesn't create or change any contract.
>>> Santander doesn't accept responsibility for damage caused by any viruses
>>> contained in this email or its attachments. Emails may be monitored. If
>>> you've
>>> received this email by mistake, please let the sender know at once that
>>> it's
>>> gone to the wrong person and then destroy it without copying, using, or
>>> telling
>>> anyone about its contents.
>>> Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services
>>> plc Reg.
>>> No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London
>>> NW1 3AN.
>>> Registered in England. www.santander.co.uk. Authorised by the Prudential
>>> Regulation Authority and regulated by the Financial Conduct Authority
>>> and the
>>> Prudential Regulation Authority. FCA Reg. No. 106054 and 146003
>>> respectively.
>>> Santander Sharedealing is a trading name of Abbey Stockbrokers Limited
>>> Reg. No.
>>> 02666793. Registered Office: Kingfisher House, Radford Way, Billericay,
>>> Essex
>>> CM12 0GZ. Authorised and regulated by the Financial Conduct Authority.
>>> FCA Reg.
>>> No. 154210. You can check this on the Financial Services Register by
>>> visiting
>>> the FCA’s website www.fca.org.uk/register or by contacting the FCA on
>>> 0800 111
>>> 6768. Santander UK plc is also licensed by the Financial Supervision
>>> Commission
>>> of the Isle of Man for its branch in the Isle of Man. Deposits held with
>>> the
>>> Isle of Man branch are covered by the Isle of Man Depositors’
>>> Compensation
>>> Scheme as set out in the Isle of Man Depositors’ Compensation Scheme
>>> Regulations

Re: Spark Distribution of Small Dataset

2016-01-28 Thread Kevin Mellott
Hi Phil,

The short answer is that there is a driver machine (which handles the
distribution of tasks and data) and a number of worker nodes (which receive
data and perform the actual tasks). That being said, certain tasks need to
be performed on the driver, because they require all of the data.

I'd recommend taking a look at the video below, which will explain this
concept in much greater detail. It also goes through an example and shows
you how to use the logging tools to understand what is happening within
your program.

https://www.youtube.com/watch?v=dmL0N3qfSc8

Thanks,
Kevin

On Thu, Jan 28, 2016 at 4:41 AM, Philip Lee  wrote:

> Hi,
>
> Simple Question about Spark Distribution of Small Dataset.
>
> Let's say I have 8 machine with 48 cores and 48GB of RAM as a cluster.
> Dataset  (format is ORC by Hive) is so small like 1GB, but I copied it to
> HDFS.
>
> 1) if spark-sql run the dataset distributed on HDFS in each machine, what
> happens to the job? I meant one machine handles the dataset because it is
> so small?
>
> 2) but the thing is dataset is already distributed in each machine.
> or each machine handles the distributed dataset and send it to the Master
> Node?
>
> Could you explain about this in detail in a distributed way?
>
> Best,
> Phil
>
>
>
>


Re: unsubscribe email

2016-02-01 Thread Kevin Mellott
Take a look at the first section on http://spark.apache.org/community.html.
You basically just need to send an email from the aliased email to
user-unsubscr...@spark.apache.org. If you cannot log into that email
directly, then I'd recommend using a mail client that allows for the
"send-as" functionality (such as Gmail
).

On Mon, Feb 1, 2016 at 4:38 PM, Eduardo Costa Alfaia  wrote:

> Hi Guys,
> How could I unsubscribe the email e.costaalf...@studenti.unibs.it, that
> is an alias from my email e.costaalf...@unibs.it and it is registered in
> the mail list .
>
> Thanks
>
> *Eduardo Costa Alfaia*
> *PhD Student Telecommunication Engineering*
> *Università degli Studi di Brescia-UNIBS*
>
>
> Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: how to covert millisecond time to SQL timeStamp

2016-02-01 Thread Kevin Mellott
I've had pretty good success using Joda-Time
 for date/time manipulations
within Spark applications. You may be able to use the *DateTIme* constructor
below, if you are starting with milliseconds.

DateTime

public DateTime(long instant)

Constructs an instance set to the milliseconds from 1970-01-01T00:00:00Z
using ISOChronology in the default time zone.
Parameters:instant - the milliseconds from 1970-01-01T00:00:00Z

On Mon, Feb 1, 2016 at 5:51 PM, Andy Davidson  wrote:

> What little I know about working with timestamps is based on
> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html
>
> Using the example of dates formatted into human friend strings ->
> timeStamps I was able to figure out how to convert Epoch times to
> timestamps. The same trick did not work for millisecond times.
>
> Any suggestions would be greatly appreciated.
>
>
> Andy
>
> Working with epoch times
> 
>
> ref: http://www.epochconverter.com/
>
> Epoch timestamp: 1456050620
>
> Timestamp in milliseconds: 145605062
>
> Human time (GMT): Sun, 21 Feb 2016 10:30:20 GMT
>
> Human time (your time zone): 2/21/2016, 2:30:20 AM
>
>
> # Epoch time stamp example
>
> data = [
>
>   ("1456050620", "1456050621", 1),
>
>   ("1456050622", "14560506203", 2),
>
>   ("14560506204", "14560506205", 3)]
>
> df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>
> ​
>
> # convert epoch time strings in to spark timestamps
>
> df = df.select(
>
>   df.start_time.cast("long").alias("start_time"),
>
>   df.end_time.cast("long").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> ​
>
> # convert longs to timestamps
>
> df = df.select(
>
>   df.start_time.cast("timestamp").alias("start_time"),
>
>   df.end_time.cast("timestamp").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> ​
>
> root
>  |-- start_time: long (nullable = true)
>  |-- end_time: long (nullable = true)
>  |-- id: long (nullable = true)
>
> +---+---+---+
> |start_time |end_time   |id |
> +---+---+---+
> |1456050620 |1456050621 |1  |
> |1456050622 |14560506203|2  |
> |14560506204|14560506205|3  |
> +---+---+---+
>
> root
>  |-- start_time: timestamp (nullable = true)
>  |-- end_time: timestamp (nullable = true)
>  |-- id: long (nullable = true)
>
> +-+-+---+
> |start_time   |end_time |id |
> +-+-+---+
> |2016-02-21 02:30:20.0|2016-02-21 02:30:21.0|1  |
> |2016-02-21 02:30:22.0|2431-05-28 02:03:23.0|2  |
> |2431-05-28 02:03:24.0|2431-05-28 02:03:25.0|3  |
> +-+-+---+
>
>
> In [21]:
>
> # working with millisecond times
>
> data = [
>
>   ("145605062", "145605062", 1)]
>
>   df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>
> ​
>
> # convert epoch time strings in to spark timestamps
>
> df = df.select(
>
>   df.start_time.cast("long").alias("start_time"),
>
>   df.end_time.cast("long").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> ​
>
> # convert longs to timestamps
>
> df = df.select(
>
>   df.start_time.cast("timestamp").alias("start_time"),
>
>   df.end_time.cast("timestamp").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> root
>  |-- start_time: long (nullable = true)
>  |-- end_time: long (nullable = true)
>  |-- id: long (nullable = true)
>
> +-+-+---+
> |start_time   |end_time |id |
> +-+-+---+
> |145605062|145605062|1  |
> +-+-+---+
>
> root
>  |-- start_time: timestamp (nullable = true)
>  |-- end_time: timestamp (nullable = true)
>  |-- id: long (nullable = true)
>
> +--+--+---+
> |start_time|end_time  |id |
> +--+--+---+
> |48110-05-29 10:33:20.0|48110-05-29 10:33:20.0|1  |
> +--+--+---+
>
>
>


Best way to bring up Spark with Cassandra (and Elasticsearch) in production.

2016-02-14 Thread Kevin Burton
Afternoon.

About 6 months ago I tried (and failed) to get Spark and Cassandra working
together in production due to dependency hell.

I'm going to give it another try!

Here's my general strategy.

I'm going to create a maven module for my code... with spark dependencies.

Then I'm going to get that to run and have unit tests for reading from
files and writing the data back out the way I want via spark jobs.

Then I'm going to setup cassandra unit to embed cassandra in my project.
Then I'm going to point Spark to Cassandra and have the same above code
work with Cassandra but instead of reading from a file it reads/writes to
C*.

Then once testing is working I'm going to setup spark in cluster mode with
the same dependencies.

Does this sound like a reasonable strategy?

Kevin

-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>


Re: Using functional programming rather than SQL

2016-02-22 Thread Kevin Mellott
In your example, the *rs* instance should be a DataFrame object. In other
words, the result of *HiveContext.sql* is a DataFrame that you can
manipulate using *filter, map, *etc.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext


On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the result set
> using SQL from Hive tables, registering as a temporary table in Spark
>
> Now Ideally I can get the result set into a DF and work on DF to slice and
> dice the data using functional programming with filter, map. split etc.
>
> I wanted to get some ideas on how to go about it.
>
> thanks
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND   s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> *rs.registerTempTable("tmp")*
>
>
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println)
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>
>
>


Re: How to get progress information of an RDD operation

2016-02-23 Thread Kevin Mellott
Have you considered using the Spark Web UI to view progress on your job? It
does a very good job showing the progress of the overall job, as well as
allows you to drill into the individual tasks and server activity.

On Tue, Feb 23, 2016 at 12:53 PM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

> How can I get progress information of a RDD operation? For example
>
>
>
> *val *lines = sc.textFile(*"c:/temp/input.txt"*)  // a RDD of millions of
> line
> lines.foreach(line => {
> handleLine(line)
> })
>
> The input.txt contains millions of lines. The entire operation take 6
> hours. I want to print out how many lines are processed every 1 minute so
> user know the progress. How can I do that?
>
>
>
> One way I am thinking of is to use accumulator, e.g.
>
>
>
>
>
> *val *lines = sc.textFile(*"c:/temp/input.txt"*)
> *val *acCount = sc.accumulator(0L)
> lines.foreach(line => {
> handleLine(line)
> acCount += 1
> }
>
> However how can I print out account every 1 minutes?
>
>
>
>
>
> Ningjun
>
>
>


Re: Network Spark Streaming from multiple remote hosts

2016-02-23 Thread Kevin Mellott
Hi Vinti,

That example is (in my opinion) more of a tutorial and not necessarily the
way you'd want to set it up for a "real world" application. I'd recommend
using something like Apache Kafka, which will allow the various hosts to
publish messages to a queue. Your Spark Streaming application is then
receiving messages from the queue and performing whatever processing you'd
like.

http://kafka.apache.org/documentation.html#introduction

Thanks,
Kevin

On Tue, Feb 23, 2016 at 3:13 PM, Vinti Maheshwari 
wrote:

> Hi All
>
> I wrote program for Spark Streaming in Scala. In my program, i passed
> 'remote-host' and 'remote port' under socketTextStream.
>
> And in the remote machine, i have one perl script who is calling system
> command:
>
> echo 'data_str' | nc  <>
>
> In that way, my spark program is able to get data, but it seems little bit
> confusing as i have multiple remote machines which needs to send data to
> spark machine. I wanted to know the right way of doing it. Infact, how will
> i deal with data coming from multiple hosts?
>
> For Reference, My current program:
>
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
>
> val ssc = new StreamingContext(sc, Seconds(2))
>
> val inputStream = ssc.socketTextStream(, )
> ---
> ---
>
> ssc.start()
> // Wait for the computation to terminate
> ssc.awaitTermination()
>
>   }}
>
> Thanks in advance.
>
> Regards,
> ~Vinti
>


Re: Spark SQL partitioned tables - check for partition

2016-02-25 Thread Kevin Mellott
Once you have loaded information into a DataFrame, you can use the
*mapPartitionsi
or forEachPartition *operations to both identify the partitions and operate
against them.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame


On Thu, Feb 25, 2016 at 9:24 AM, Deenar Toraskar 
wrote:

> Hi
>
> How does one check for the presence of a partition in a Spark SQL
> partitioned table (save using dataframe.write.partitionedBy("partCol") not
> hive compatible tables), other than physically checking the directory on
> HDFS or doing a count(*)  with the partition cols in the where clause ?
>
>
> Regards
> Deenar
>


Re: Spark SQL partitioned tables - check for partition

2016-02-25 Thread Kevin Mellott
If you want to see which partitions exist on disk (without manually
checking), you could write code against the Hadoop FileSystem library to
check. Is that what you are asking?

https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/fs/package-summary.html


On Thu, Feb 25, 2016 at 10:54 AM, Deenar Toraskar  wrote:

> Kevin
>
> I meant the partitions on disk/hdfs not the inmemory RDD/Dataframe
> partitions. If I am right mapPartitions or forEachPartitions would identify
> and operate on the in memory partitions.
>
> Deenar
>
> On 25 February 2016 at 15:28, Kevin Mellott 
> wrote:
>
>> Once you have loaded information into a DataFrame, you can use the 
>> *mapPartitionsi
>> or forEachPartition *operations to both identify the partitions and
>> operate against them.
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
>>
>>
>> On Thu, Feb 25, 2016 at 9:24 AM, Deenar Toraskar <
>> deenar.toras...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> How does one check for the presence of a partition in a Spark SQL
>>> partitioned table (save using dataframe.write.partitionedBy("partCol") not
>>> hive compatible tables), other than physically checking the directory on
>>> HDFS or doing a count(*)  with the partition cols in the where clause ?
>>>
>>>
>>> Regards
>>> Deenar
>>>
>>
>>
>


Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java

2016-02-29 Thread Kevin Mellott
You can use the constructor that accepts a BoostingStrategy object, which
will allow you to set the tree strategy (and other hyperparameters as well).

*GradientBoostedTrees
*
(BoostingStrategy

 boostingStrategy)

On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru 
wrote:

> Hello guys,
>
> I think the default Loss algorithm is Squared Error for regression, but
> how do I change that to Absolute Error in Java.
>
> Could you please show me an example?
>
>
>


Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java

2016-02-29 Thread Kevin Mellott
I believe that you can instantiate an instance of the AbsoluteError class
for the *Loss* object, since that object implements the Loss interface. For
example.

val loss = new AbsoluteError()
boostingStrategy.setLoss(loss)

On Mon, Feb 29, 2016 at 9:33 AM, diplomatic Guru 
wrote:

> Hi Kevin,
>
> Yes, I've set the bootingStrategy like that using the example. But I'm not
> sure how to create and pass the Loss object.
>
> e.g
>
> boostingStrategy.setLoss(..);
>
> Not sure how to pass the selected Loss.
>
> How do I set the  Absolute Error in setLoss() function?
>
>
>
>
> On 29 February 2016 at 15:26, Kevin Mellott 
> wrote:
>
>> You can use the constructor that accepts a BoostingStrategy object, which
>> will allow you to set the tree strategy (and other hyperparameters as well).
>>
>> *GradientBoostedTrees
>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/GradientBoostedTrees.html#GradientBoostedTrees(org.apache.spark.mllib.tree.configuration.BoostingStrategy)>*
>> (BoostingStrategy
>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/configuration/BoostingStrategy.html>
>>  boostingStrategy)
>>
>> On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru <
>> diplomaticg...@gmail.com> wrote:
>>
>>> Hello guys,
>>>
>>> I think the default Loss algorithm is Squared Error for regression, but
>>> how do I change that to Absolute Error in Java.
>>>
>>> Could you please show me an example?
>>>
>>>
>>>
>>
>


Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java

2016-02-29 Thread Kevin Mellott
Looks like it should be present in 1.3 at
org.apache.spark.mllib.tree.loss.AbsoluteError

spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/tree/loss/AbsoluteError.html

On Mon, Feb 29, 2016 at 9:46 AM, diplomatic Guru 
wrote:

> AbsoluteError() constructor is undefined.
>
> I'm using Spark 1.3.0, maybe it is not ready for this version?
>
>
>
> On 29 February 2016 at 15:38, Kevin Mellott 
> wrote:
>
>> I believe that you can instantiate an instance of the AbsoluteError class
>> for the *Loss* object, since that object implements the Loss interface.
>> For example.
>>
>> val loss = new AbsoluteError()
>> boostingStrategy.setLoss(loss)
>>
>> On Mon, Feb 29, 2016 at 9:33 AM, diplomatic Guru <
>> diplomaticg...@gmail.com> wrote:
>>
>>> Hi Kevin,
>>>
>>> Yes, I've set the bootingStrategy like that using the example. But I'm
>>> not sure how to create and pass the Loss object.
>>>
>>> e.g
>>>
>>> boostingStrategy.setLoss(..);
>>>
>>> Not sure how to pass the selected Loss.
>>>
>>> How do I set the  Absolute Error in setLoss() function?
>>>
>>>
>>>
>>>
>>> On 29 February 2016 at 15:26, Kevin Mellott 
>>> wrote:
>>>
>>>> You can use the constructor that accepts a BoostingStrategy object,
>>>> which will allow you to set the tree strategy (and other hyperparameters as
>>>> well).
>>>>
>>>> *GradientBoostedTrees
>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/GradientBoostedTrees.html#GradientBoostedTrees(org.apache.spark.mllib.tree.configuration.BoostingStrategy)>*
>>>> (BoostingStrategy
>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/configuration/BoostingStrategy.html>
>>>>  boostingStrategy)
>>>>
>>>> On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru <
>>>> diplomaticg...@gmail.com> wrote:
>>>>
>>>>> Hello guys,
>>>>>
>>>>> I think the default Loss algorithm is Squared Error for regression,
>>>>> but how do I change that to Absolute Error in Java.
>>>>>
>>>>> Could you please show me an example?
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java

2016-02-29 Thread Kevin Mellott
I found a helper class that I think should do the trick. Take a look at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Losses.scala

When passing the Loss, you should be able to do something like:

Losses.fromString("leastSquaresError")

On Mon, Feb 29, 2016 at 10:03 AM, diplomatic Guru 
wrote:

> It's strange as you are correct the doc does state it. But it's
> complaining about the constructor.
>
> When I clicked on the org.apache.spark.mllib.tree.loss.AbsoluteError
> class, this is what I see:
>
>
> @Since("1.2.0")
> @DeveloperApi
> object AbsoluteError extends Loss {
>
>   /**
>* Method to calculate the gradients for the gradient boosting
> calculation for least
>* absolute error calculation.
>* The gradient with respect to F(x) is: sign(F(x) - y)
>* @param prediction Predicted label.
>* @param label True label.
>* @return Loss gradient
>*/
>   @Since("1.2.0")
>   override def gradient(prediction: Double, label: Double): Double = {
> if (label - prediction < 0) 1.0 else -1.0
>   }
>
>   override private[mllib] def computeError(prediction: Double, label:
> Double): Double = {
> val err = label - prediction
> math.abs(err)
>   }
> }
>
>
> On 29 February 2016 at 15:49, Kevin Mellott 
> wrote:
>
>> Looks like it should be present in 1.3 at
>> org.apache.spark.mllib.tree.loss.AbsoluteError
>>
>>
>> spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/tree/loss/AbsoluteError.html
>>
>> On Mon, Feb 29, 2016 at 9:46 AM, diplomatic Guru <
>> diplomaticg...@gmail.com> wrote:
>>
>>> AbsoluteError() constructor is undefined.
>>>
>>> I'm using Spark 1.3.0, maybe it is not ready for this version?
>>>
>>>
>>>
>>> On 29 February 2016 at 15:38, Kevin Mellott 
>>> wrote:
>>>
>>>> I believe that you can instantiate an instance of the AbsoluteError
>>>> class for the *Loss* object, since that object implements the Loss
>>>> interface. For example.
>>>>
>>>> val loss = new AbsoluteError()
>>>> boostingStrategy.setLoss(loss)
>>>>
>>>> On Mon, Feb 29, 2016 at 9:33 AM, diplomatic Guru <
>>>> diplomaticg...@gmail.com> wrote:
>>>>
>>>>> Hi Kevin,
>>>>>
>>>>> Yes, I've set the bootingStrategy like that using the example. But I'm
>>>>> not sure how to create and pass the Loss object.
>>>>>
>>>>> e.g
>>>>>
>>>>> boostingStrategy.setLoss(..);
>>>>>
>>>>> Not sure how to pass the selected Loss.
>>>>>
>>>>> How do I set the  Absolute Error in setLoss() function?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 29 February 2016 at 15:26, Kevin Mellott >>>> > wrote:
>>>>>
>>>>>> You can use the constructor that accepts a BoostingStrategy object,
>>>>>> which will allow you to set the tree strategy (and other hyperparameters 
>>>>>> as
>>>>>> well).
>>>>>>
>>>>>> *GradientBoostedTrees
>>>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/GradientBoostedTrees.html#GradientBoostedTrees(org.apache.spark.mllib.tree.configuration.BoostingStrategy)>*
>>>>>> (BoostingStrategy
>>>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/configuration/BoostingStrategy.html>
>>>>>>  boostingStrategy)
>>>>>>
>>>>>> On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru <
>>>>>> diplomaticg...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello guys,
>>>>>>>
>>>>>>> I think the default Loss algorithm is Squared Error for regression,
>>>>>>> but how do I change that to Absolute Error in Java.
>>>>>>>
>>>>>>> Could you please show me an example?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Flattening Data within DataFrames

2016-02-29 Thread Kevin Mellott
Fellow Sparkers,

I'm trying to "flatten" my view of data within a DataFrame, and am having
difficulties doing so. The DataFrame contains product information, which
includes multiple levels of categories (primary, secondary, etc).

*Example Data (Raw):*
*NameLevelCategory*
Baked CodeFood 1
Baked CodeSeafood 2
Baked CodeFish   3
Hockey Stick  Sports1
Hockey Stick  Hockey  2
Hockey Stick  Equipment  3

*Desired Data:*
*NameCategory1 Category2 Category3*
Baked CodeFood  Seafood Fish
Hockey Stick  SportsHockey  Equipment

*Approach:*
After parsing the "raw" information into two separate DataFrames
(called *products
*and *categories*) and registering them as a Spark SQL tables, I was
attempting to perform the following query to flatten this all into the
"desired data" (depicted above).

products.registerTempTable("products")
categories.registerTempTable("categories")

val productList = sqlContext.sql(
  " SELECT p.Name, " +
  " c1.Description AS Category1, " +
  " c2.Description AS Category2, " +
  " c3.Description AS Category3 " +
  " FROM products AS p " +
  "   JOIN categories AS c1 " +
  " ON c1.Name = p.Name AND c1.Level = '1' "
  "   JOIN categories AS c2 " +
  " ON c2.Name = p.Name AND c2.Level = '2' "
  "   JOIN categories AS c3 " +
  " ON c3.Name = p.Name AND c3.Level = '3' "

*Issue:*
I get an error when running my query above, because I am not able to JOIN
the *categories* table more than once. Has anybody dealt with this type of
use case before, and if so how did you achieve the desired behavior?

Thank you in advance for your thoughts.

Kevin


Re: Flattening Data within DataFrames

2016-02-29 Thread Kevin Mellott
Thanks Michal - this is exactly what I need.

On Mon, Feb 29, 2016 at 11:40 AM, Michał Zieliński <
zielinski.mich...@gmail.com> wrote:

> Hi Kevin,
>
> This should help:
>
> https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-spark.html
>
> On 29 February 2016 at 16:54, Kevin Mellott 
> wrote:
>
>> Fellow Sparkers,
>>
>> I'm trying to "flatten" my view of data within a DataFrame, and am having
>> difficulties doing so. The DataFrame contains product information, which
>> includes multiple levels of categories (primary, secondary, etc).
>>
>> *Example Data (Raw):*
>> *NameLevelCategory*
>> Baked CodeFood 1
>> Baked CodeSeafood 2
>> Baked CodeFish   3
>> Hockey Stick  Sports1
>> Hockey Stick  Hockey  2
>> Hockey Stick  Equipment  3
>>
>> *Desired Data:*
>> *NameCategory1 Category2 Category3*
>> Baked CodeFood  Seafood Fish
>> Hockey Stick  SportsHockey  Equipment
>>
>> *Approach:*
>> After parsing the "raw" information into two separate DataFrames (called 
>> *products
>> *and *categories*) and registering them as a Spark SQL tables, I was
>> attempting to perform the following query to flatten this all into the
>> "desired data" (depicted above).
>>
>> products.registerTempTable("products")
>> categories.registerTempTable("categories")
>>
>> val productList = sqlContext.sql(
>>   " SELECT p.Name, " +
>>   " c1.Description AS Category1, " +
>>   " c2.Description AS Category2, " +
>>   " c3.Description AS Category3 " +
>>   " FROM products AS p " +
>>   "   JOIN categories AS c1 " +
>>   " ON c1.Name = p.Name AND c1.Level = '1' "
>>   "   JOIN categories AS c2 " +
>>   " ON c2.Name = p.Name AND c2.Level = '2' "
>>   "   JOIN categories AS c3 " +
>>   " ON c3.Name = p.Name AND c3.Level = '3' "
>>
>> *Issue:*
>> I get an error when running my query above, because I am not able to JOIN
>> the *categories* table more than once. Has anybody dealt with this type
>> of use case before, and if so how did you achieve the desired behavior?
>>
>> Thank you in advance for your thoughts.
>>
>> Kevin
>>
>
>


GenericRowWithSchema is too heavy

2015-07-27 Thread Kevin Jung
Hi all,

SparkSQL usually creates DataFrame with GenericRowWithSchema(is that
right?). And 'Row' is a super class of GenericRow and GenericRowWithSchema.
The only difference is that GenericRowWithSchema has its schema information
as StructType. But I think one DataFrame has only one schema then each row
should not have to store schema in it. Because StructType is very heavy and
most of RDD has many rows. To test this,
1) create DataFrame and call rdd ( RDD[Row] ) <= GenericRowWithSchema
2) dataframe.map( row => Row(row.toSeq)) <= GenericRow
3) dataframe.map( row => row.toSeq) <= underlying sequence of a row
4) saveAsObjectFile or use org.apache.spark.util.SizeEstimator.estimate
And my result is,
(dataframe with 5columns)
GenericRowWithSchema => 13gb
GenericRow => 8.2gb
Seq => 7gb

Best regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GenericRowWithSchema-is-too-heavy-tp24018.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What is the optimal approach to do Secondary Sort in Spark?

2015-08-11 Thread Kevin Jung
You should create key as tuple type. In your case, RDD[((id, timeStamp) , 
value)] is the proper way to do.

Kevin

--- Original Message ---
Sender : swetha
Date : 2015-08-12 09:37 (GMT+09:00)
Title : What is the optimal approach to do Secondary Sort in Spark?

Hi,

What is the optimal approach to do Secondary sort in Spark? I have to first
Sort by an Id in the key and further sort it by timeStamp which is present
in the value.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-optimal-approach-to-do-Secondary-Sort-in-Spark-tp24219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Can't find directory after resetting REPL state

2015-08-15 Thread Kevin Jung
Spark shell can't find base directory of class server after running ":reset" 
command. 
scala> :reset 
scala> 1 
uncaught exception during compilation: java.lang.AssertiON-ERROR 
java.lang.AssertiON-ERROR: assertion failed: Tried to find '$line33' in 
'/tmp/spark-f47f3917-ac31-4138-bf1a-a8cefd094ac3' but it is not a directory 
~~~impossible to command anymore~~~ 
I figure out reset() method in SparkIMain try to delete virtualDirectory and 
then create again. But virtualDirectory.create() makes a file, not a directory. 
Does anyone face a same problem under spark 1.4.0? 

Kevin
상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Re: Can't find directory after resetting REPL state

2015-08-16 Thread Kevin Jung
Thanks Ted, it may be a bug. This is a jira ticket.
https://issues.apache.org/jira/browse/SPARK-10039 

Kevin

--- Original Message ---
Sender : Ted Yu
Date : 2015-08-16 11:29 (GMT+09:00)
Title : Re: Can't find directory after resetting REPL state

I tried with master branch and got the following:


http://pastebin.com/2nhtMFjQ



FYI


On Sat, Aug 15, 2015 at 1:03 AM, Kevin Jung  wrote:

Spark shell can't find base directory of class server after running ":reset" 
command.
scala> :reset
scala> 1
uncaught exception during compilation: java.lang.AssertiON-ERROR
java.lang.AssertiON-ERROR: assertion failed: Tried to find '$line33' in 
'/tmp/spark-f47f3917-ac31-4138-bf1a-a8cefd094ac3' but it is not a directory
~~~impossible to command anymore~~~
I figure out reset() method in SparkIMain try to delete virtualDirectory and 
then create again. But virtualDirectory.create() makes a file, not a directory.
Does anyone face a same problem under spark 1.4.0?

Kevin




상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

SaveAsTable changes the order of rows

2015-08-18 Thread Kevin Jung
I have a simple RDD with Key/Value and do

val partitioned = rdd.partitionBy(new HashPartitioner(400))
val row = partitioned.first

I can get a key "G2726" from a returned row. This first row is located on a 
partition #0 because "G2726".hashCode is 67114000 and 67114000%400 is 0. But 
the order of keys is changed when I save rdd to table by doing saveAsTable. 
After doing this and calling sqlContext.table, a key from a first row is 
"G265". Does DataFrame forget a parent's partitioner or Parquet format always 
rearranges the order of original data? In my case, the order is not important 
but some of users may want to keep their keys ordered.


Kevin




상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Drop table and Hive warehouse

2015-08-24 Thread Kevin Jung
When I store DataFrame as table with command "saveAsTable" and then execute 
"DROP TABLE" in SparkSQL, it doesn't actually delete files in hive warehouse.
The table disappears from a table list but the data files are still alive.
Because of this, I can't saveAsTable with a same name before dropping table.
Is it a normal situation? If it is, I will delete files manually ;)

Kevin



상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Re: Drop table and Hive warehouse

2015-08-24 Thread Kevin Jung
Thanks, Michael.
I discovered it myself. Finally, it was not a bug from Spark. 
I have two HDFS cluster and Hive uses hive.metastore.warehouse.dir + 
fs.defaultFS(HDFS1) for saving internal tables and also reference a default 
database URI(HDFS2) in "DBS" table from metastore.
It may not be a problem if URI of default database is same as fs.defaultFS.
Maybe few of people set their default database URI to another HDFS like me.
I copied hive-site.xml into spark conf then Hive and Spark had same metastore 
configuration.
But the result table of "saveAsTable" has its metadata in HDFS1 and its data in 
HDFS2.
"DESCRIBE FORMATTED " will show the difference between Location of 
table(HDFS1) and Path in Storage Desc Params(HDFS2) even though table is type 
of MANAGED_TABLE.
That is why "DROP TABLE" deletes only metadata in HDFS1 and NOT delete data 
files in HDFS2.
So I can not reproduce a table with same location and same name. If I update 
DBS table in metastoredb to set default database URI to HDFS1, it works 
perfectly.


Kevin

--- Original Message ---
Sender : Michael Armbrust
Date : 2015-08-25 00:43 (GMT+09:00)
Title : Re: Drop table and Hive warehouse

Thats not the expected behavior.  What version of Spark?


On Mon, Aug 24, 2015 at 1:32 AM, Kevin Jung  wrote:

When I store DataFrame as table with command "saveAsTable" and then execute 
"DROP TABLE" in SparkSQL, it doesn't actually delete files in hive warehouse.
The table disappears from a table list but the data files are still alive.
Because of this, I can't saveAsTable with a same name before dropping table.
Is it a normal situation? If it is, I will delete files manually ;)

Kevin




상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Re: Unable to build Spark 1.5, is build broken or can anyone successfully build?

2015-08-30 Thread Kevin Jung
I expect it because the versions are not in the range defined in pom.xml.
You should upgrade your maven version to 3.3.3 and JDK to 1.7.
Spark team already knows this issue so you can get some information on
community board of developers.

Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-build-Spark-1-5-is-build-broken-or-can-anyone-successfully-build-tp24513p24515.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What happens to this RDD? OutOfMemoryError

2015-09-04 Thread Kevin Mandich
Hi All,

I'm using PySpark to create a corpus of labeled data points. I create an
RDD called corpus, and then join to this RDD each newly-created feature RDD
as I go. My code repeats something like this for each feature:

feature = raw_data_rdd.map(...).reduceByKey(...).map(...) # create feature
RDD
corpus = corpus.join(feature).map(lambda x: (x[0], x[1][0] + (x[1][1],)) #
"append" new feature to existing corpus

The corpus RDD is a key-value tuple, where the key is the label and the
value is a tuple of the features. I repeat the above for the 6 features i'm
working with. It looks like I'm running into a memory error when performing
the join on the last feature. Here's some relevant information:

- raw_data_rdd has ~ 50 million entries, while feature and corpus have ~
450k after the map-reduce operations
- The driver and each of the 6 executor nodes have 6GB memory available
- I'm kicking off the script using the following:
pyspark --driver-memory 2G --executor-memory 2G --conf
spark.akka.frameSize=64 create_corpus.py

My question is: why would I be running out of memory when joining the
relatively small feature and corpus RRDs? Also, what happens to the "old"
corpus RDD when I join it and point corpus to the new, larger RDD? Does
this stay in memory, and could this be the reason why i'm running into the
issue? If so, is there a better way of "appending" to my corpus RDD? Should
I be persisting raw_data_rdd? The full error is shown below.

Please let me know if I'm missing something obvious. Thank you!

Kevin Mandich


Exception in thread "refresh progress" Exception in thread "SparkListenerBus"
[2015-09-04 20:43:14,385] {bash_operator.py:58} INFO - Exception:
java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in
thread "SparkListenerBus"
[2015-09-04 20:43:30,999] {bash_operator.py:58} INFO - Exception in
thread "qtp268929808-35" java.lang.OutOfMemoryError: Java heap space
[2015-09-04 20:43:30,999] {bash_operator.py:58} INFO - at
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter(AbstractQueuedSynchronizer.java:606)
[2015-09-04 20:43:30,999] {bash_operator.py:58} INFO - at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:883)
[2015-09-04 20:43:31,000] {bash_operator.py:58} INFO - at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
[2015-09-04 20:43:32,562] {bash_operator.py:58} INFO - at
java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
[2015-09-04 20:43:32,562] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:333)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at
java.lang.Thread.run(Thread.java:745)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO -
java.lang.OutOfMemoryError: Java heap space
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - Exception in
thread "qtp1514449570-77" java.lang.OutOfMemoryError: Java heap space
[2015-09-04 20:43:37,366] {bash_operator.py:58} INFO - at
java.util.concurrent.ConcurrentHashMap$KeySet.iterator(ConcurrentHashMap.java:1428)
[2015-09-04 20:43:37,366] {bash_operator.py:58} INFO - at
org.spark-project.jetty.io.nio.SelectorManager$SelectSet$1.run(SelectorManager.java:712)
[2015-09-04 20:43:37,366] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
[2015-09-04 20:43:41,458] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
[2015-09-04 20:43:41,459] {bash_operator.py:58} INFO - at
java.lang.Thread.run(Thread.java:745)
[2015-09-04 20:55:04,411] {bash_operator.py:58} INFO - Exception in
thread "qtp1514449570-72"
[2015-09-04 20:55:04,412] {bash_operator.py:58} INFO - Exception:
java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in
thread "qtp1514449570-72"
[2015-09-04 20:58:25,671] {bash_operator.py:58} INFO - Exception in
thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java
heap space


Spark summit Asia

2015-09-07 Thread Kevin Jung
Is there any plan to hold Spark summit in Asia?
I'm very much looking forward to it.

Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-summit-Asia-tp24598.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Write parquet file from Spark Streaming

2016-08-27 Thread Kevin Tran
Hi Everyone,

Does anyone know how to write parquet file after parsing data in Spark
Streaming?



Thanks,

Kevin.


Spark StringType could hold how many characters ?

2016-08-28 Thread Kevin Tran
Hi,
I wrote to parquet file as following:

++
|word|
++
|THIS IS MY CHARACTERS ...|
|// ANOTHER LINE OF CHAC...|
++

These lines are not full text and it is being trimmed down.
Does anyone know how many chacters StringType could handle ?

In the Spark code:
org.apache.spark.sql.types.StringType
/**
   * The default size of a value of the StringType is 4096 bytes.
   */
  override def defaultSize: Int = 4096


Thanks,
Kevin.


Best practises to storing data in Parquet files

2016-08-28 Thread Kevin Tran
Hi,
Does anyone know what is the best practises to store data to parquet file?
Does parquet file has limit in size ( 1TB ) ?
Should we use SaveMode.APPEND for long running streaming app ?
How should we store in HDFS (directory structure, ... )?

Thanks,
Kevin.


Re: Best practises to storing data in Parquet files

2016-08-28 Thread Kevin Tran
Hi Mich,
My stack is as following:

Data sources:
 * IBM MQ
 * Oracle database

Kafka to store all messages from data sources
Spark Streaming fetching messages from Kafka and do a bit transform and
write parquet files to HDFS
Hive / SparkSQL / Impala will query on parquet files.

Do you have any reference architecture which HBase is apart of ?

Please share with me best practises you might know or your favourite
designs.

Thanks,
Kevin.









On Mon, Aug 29, 2016 at 5:18 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> Can you explain about you particular stack.
>
> Example what is the source of streaming data and the role that Spark plays.
>
> Are you dealing with Real Time and Batch and why Parquet and not something
> like Hbase to ingest data real time.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 28 August 2016 at 15:43, Kevin Tran  wrote:
>
>> Hi,
>> Does anyone know what is the best practises to store data to parquet file?
>> Does parquet file has limit in size ( 1TB ) ?
>> Should we use SaveMode.APPEND for long running streaming app ?
>> How should we store in HDFS (directory structure, ... )?
>>
>> Thanks,
>> Kevin.
>>
>
>


Best ID Generator for ID field in parquet ?

2016-09-04 Thread Kevin Tran
Hi everyone,
Please give me your opinions on what is the best ID Generator for ID field
in parquet ?

UUID.randomUUID();
AtomicReference currentTime = new
AtomicReference<>(System.currentTimeMillis());
AtomicLong counter = new AtomicLong(0);


Thanks,
Kevin.



https://issues.apache.org/jira/browse/SPARK-8406 (Race condition when
writing Parquet files)
https://github.com/apache/spark/pull/6864/files


Re: Best ID Generator for ID field in parquet ?

2016-09-04 Thread Kevin Tran
Hi Mich,
Thank you for your input.
Does monotonically incremental ensure about race condition and does it
duplicates the ids at some points with multi threads, multi instances, ... ?

Even System.currentTimeMillis() still has duplication?

Cheers,
Kevin.

On Mon, Sep 5, 2016 at 12:30 AM, Mich Talebzadeh 
wrote:

> You can create a monotonically incrementing ID column on your table
>
> scala> val ll_18740868 = spark.table("accounts.ll_18740868")
> scala> val startval = 1
> scala> val df = ll_18740868.withColumn("id",
> *monotonically_increasing_id()+* startval).show (2)
> +---+---+-+-+---
> ---+---++---+---+
> |transactiondate|transactiontype| sortcode|accountnumber|
> transactiondescription|debitamount|creditamount|balance| id|
> +---+---+-+-+---
> ---+---++---+---+
> | 2011-12-30|DEB|'30-64-72| 18740868|  WWW.GFT.COM CD
> 4628 |   50.0|null| 304.89|  1|
> | 2011-12-30|DEB|'30-64-72| 18740868|
> TDA.CONFECC.D.FRE...|  19.01|null| 354.89|  2|
> +---+---+-+-+---
> ---+---++---+---+
>
>
> Now you have a new ID column
>
> HTH
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 4 September 2016 at 12:43, Kevin Tran  wrote:
>
>> Hi everyone,
>> Please give me your opinions on what is the best ID Generator for ID
>> field in parquet ?
>>
>> UUID.randomUUID();
>> AtomicReference currentTime = new AtomicReference<>(System.curre
>> ntTimeMillis());
>> AtomicLong counter = new AtomicLong(0);
>> 
>>
>> Thanks,
>> Kevin.
>>
>>
>> 
>> https://issues.apache.org/jira/browse/SPARK-8406 (Race condition when
>> writing Parquet files)
>> https://github.com/apache/spark/pull/6864/files
>>
>
>


call() function being called 3 times

2016-09-07 Thread Kevin Tran
Hi Everyone,
Does anyone know why call() function being called *3 times* for each
message arrive

JavaDStream message = messagesDStream.map(new
>> Function, String>() {
>
> @Override
>
> public String call(Tuple2 tuple2) {
>
> return tuple2._2();
>
> }
>
> });
>
>
>>
>
> message.foreachRDD(rdd -> {
>
> logger.debug("---> New RDD with " + rdd.partitions().size() + " partitions
>> and " + rdd.count() + " records");   *<== 1*
>
> SQLContext sqlContext = new SQLContext(rdd.context());
>
>
>> JavaRDD rowRDD = rdd.map(new Function() {
>
> public JavaBean call(String record) {
>>   *<== being called 3 times*
>
>

What I tried:
 * *cache()*
 * cleaning up *checkpoint dir*

Thanks,
Kevin.


Re: call() function being called 3 times

2016-09-07 Thread Kevin Tran
It turns out that call() function runs in different stages

...
2016-09-07 20:37:21,086 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Running task 0.0 in stage 11.0 (TID
11)
2016-09-07 20:37:21,087 [Executor task launch worker-0] DEBUG
org.apache.spark.executor.Executor - Task 11's epoch is 0
...
2016-09-07 20:37:21,096 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Finished task 0.0 in stage 11.0 (TID
11). 2412 bytes result sent to driver
...
<=== call() called here !!

2016-09-07 20:37:22,341 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Running task 0.0 in stage 12.0 (TID
12)
2016-09-07 20:37:22,343 [Executor task launch worker-0] DEBUG
org.apache.spark.executor.Executor - Task 12's epoch is 0

<=== call() called here !!

2016-09-07 20:37:22,362 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Finished task 0.0 in stage 12.0 (TID
12). 2518 bytes result sent to driver


Does anyone have any ideas?




On Wed, Sep 7, 2016 at 7:30 PM, Kevin Tran  wrote:

> Hi Everyone,
> Does anyone know why call() function being called *3 times* for each
> message arrive
>
> JavaDStream message = messagesDStream.map(new
>>> Function, String>() {
>>
>> @Override
>>
>> public String call(Tuple2 tuple2) {
>>
>> return tuple2._2();
>>
>> }
>>
>> });
>>
>>
>>>
>>
>> message.foreachRDD(rdd -> {
>>
>> logger.debug("---> New RDD with " + rdd.partitions().size() + "
>>> partitions and " + rdd.count() + " records");   *<== 1*
>>
>> SQLContext sqlContext = new SQLContext(rdd.context());
>>
>>
>>> JavaRDD rowRDD = rdd.map(new Function() {
>>
>> public JavaBean call(String record) {
>>>   *<== being called 3 times*
>>
>>
>
> What I tried:
>  * *cache()*
>  * cleaning up *checkpoint dir*
>
> Thanks,
> Kevin.
>
>
>


Selecting the top 100 records per group by?

2016-09-10 Thread Kevin Burton
I'm trying to figure out a way to group by and return the top 100 records
in that group.

Something like:

SELECT TOP(100, user_id) FROM posts GROUP BY user_id;

But I can't really figure out the best way to do this...

There is a FIRST and LAST aggregate function but this only returns one
column.

I could do something like:

SELECT * FROM posts WHERE user_id IN ( /* select top users here */ ) LIMIT
100;

But that limit is applied for ALL the records. Not each individual user.

The only other thing I can think of is to do a manual map reduce and then
have the reducer only return the top 100 each time...

Would LOVE some advice here...

-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile



Re: Selecting the top 100 records per group by?

2016-09-10 Thread Kevin Burton
Ah.. might actually. I'll have to mess around with that.

On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley  wrote:

> Would `topByKey` help?
>
> https://github.com/apache/spark/blob/master/mllib/src/
> main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42
>
> Best,
> Karl
>
> On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton  wrote:
>
>> I'm trying to figure out a way to group by and return the top 100 records
>> in that group.
>>
>> Something like:
>>
>> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>>
>> But I can't really figure out the best way to do this...
>>
>> There is a FIRST and LAST aggregate function but this only returns one
>> column.
>>
>> I could do something like:
>>
>> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ )
>> LIMIT 100;
>>
>> But that limit is applied for ALL the records. Not each individual user.
>>
>> The only other thing I can think of is to do a manual map reduce and then
>> have the reducer only return the top 100 each time...
>>
>> Would LOVE some advice here...
>>
>> --
>>
>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>> Engineers!
>>
>> Founder/CEO Spinn3r.com
>> Location: *San Francisco, CA*
>> blog: http://burtonator.wordpress.com
>> … or check out my Google+ profile
>> <https://plus.google.com/102718274791889610666/posts>
>>
>>


-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>


Re: Selecting the top 100 records per group by?

2016-09-10 Thread Kevin Burton
Looks like you can do it with dense_rank functions.

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

I setup some basic records and seems like it did the right thing.

Now time to throw 50TB and 100 spark nodes at this problem and see what
happens :)

On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton  wrote:

> Ah.. might actually. I'll have to mess around with that.
>
> On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley  wrote:
>
>> Would `topByKey` help?
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/
>> scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42
>>
>> Best,
>> Karl
>>
>> On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton  wrote:
>>
>>> I'm trying to figure out a way to group by and return the top 100
>>> records in that group.
>>>
>>> Something like:
>>>
>>> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>>>
>>> But I can't really figure out the best way to do this...
>>>
>>> There is a FIRST and LAST aggregate function but this only returns one
>>> column.
>>>
>>> I could do something like:
>>>
>>> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ )
>>> LIMIT 100;
>>>
>>> But that limit is applied for ALL the records. Not each individual user.
>>>
>>>
>>> The only other thing I can think of is to do a manual map reduce and
>>> then have the reducer only return the top 100 each time...
>>>
>>> Would LOVE some advice here...
>>>
>>> --
>>>
>>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>>> Engineers!
>>>
>>> Founder/CEO Spinn3r.com
>>> Location: *San Francisco, CA*
>>> blog: http://burtonator.wordpress.com
>>> … or check out my Google+ profile
>>> <https://plus.google.com/102718274791889610666/posts>
>>>
>>>
>
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> <https://plus.google.com/102718274791889610666/posts>
>
>


-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>


"Too many elements to create a power set" on Elasticsearch

2016-09-11 Thread Kevin Burton
1.6.1 and 1.6.2 don't work on our Elasticsearch setup because we use daily
indexes.

We get the error:

"Too many elements to create a power set"

It works on SINGLE indexes.. but if I specify content_* then I get this
error.

I don't see this documented anywhere.  Is this a known issue?

Is there a potential work around?

The code works properly when I specify an explicit daily index.

-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile



Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Kevin Burton
I'm rather confused here as to what to do about creating a new SparkContext.

Spark 2.0 prevents it... (exception included below)

yet a TON of examples I've seen basically tell you to create a new
SparkContext as standard practice:

http://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties

val conf = new SparkConf()
 .setMaster("local[2]")
 .setAppName("CountingSheep")val sc = new SparkContext(conf)


I'm specifically running into a problem in that ES hadoop won't work with
its settings and I think its related to this problme.

Do we have to call sc.stop() first and THEN create a new spark context?

That works,, but I can't find any documentation anywhere telling us the
right course of action.



scala> val sc = new SparkContext();
org.apache.spark.SparkException: Only one SparkContext may be running in
this JVM (see SPARK-2243). To ignore this error, set
spark.driver.allowMultipleContexts = true. The currently running
SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)
org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
(:15)
(:31)
(:33)
.(:37)
.()
.$print$lzycompute(:7)
.$print(:6)
$print()
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
  at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2221)
  at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2217)
  at scala.Option.foreach(Option.scala:257)
  at
org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2217)
  at
org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2290)
  at org.apache.spark.SparkContext.(SparkContext.scala:89)
  at org.apache.spark.SparkContext.(SparkContext.scala:121)
  ... 48 elided


-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile



Re: Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Kevin Burton
The problem is that without a new spark context, with a custom conf,
elasticsearch-hadoop is refusing to read in settings about the ES setup...

if I do a sc.stop() , then create a new one, it seems to work fine.

But it isn't really documented anywhere and all the existing documentation
is now invalid because you get an exception when you try to create a new
spark context.

On Tue, Sep 13, 2016 at 11:13 AM, Mich Talebzadeh  wrote:

> I think this works in a shell but you need to allow multiple spark contexts
>
> Spark context Web UI available at http://50.140.197.217:5
> Spark context available as 'sc' (master = local, app id =
> local-1473789661846).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_77)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
> scala> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext
> scala>  val conf = new SparkConf().setMaster("local[2]").setAppName("
> CountingSheep").
> *set("spark.driver.allowMultipleContexts", "true")*conf:
> org.apache.spark.SparkConf = org.apache.spark.SparkConf@bb5f9d
> scala> val sc = new SparkContext(conf)
> sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4888425d
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 September 2016 at 18:57, Sean Owen  wrote:
>
>> But you're in the shell there, which already has a SparkContext for you
>> as sc.
>>
>> On Tue, Sep 13, 2016 at 6:49 PM, Kevin Burton  wrote:
>>
>>> I'm rather confused here as to what to do about creating a new
>>> SparkContext.
>>>
>>> Spark 2.0 prevents it... (exception included below)
>>>
>>> yet a TON of examples I've seen basically tell you to create a new
>>> SparkContext as standard practice:
>>>
>>> http://spark.apache.org/docs/latest/configuration.html#dynam
>>> ically-loading-spark-properties
>>>
>>> val conf = new SparkConf()
>>>  .setMaster("local[2]")
>>>  .setAppName("CountingSheep")val sc = new SparkContext(conf)
>>>
>>>
>>> I'm specifically running into a problem in that ES hadoop won't work
>>> with its settings and I think its related to this problme.
>>>
>>> Do we have to call sc.stop() first and THEN create a new spark context?
>>>
>>> That works,, but I can't find any documentation anywhere telling us the
>>> right course of action.
>>>
>>>
>>>
>>> scala> val sc = new SparkContext();
>>> org.apache.spark.SparkException: Only one SparkContext may be running
>>> in this JVM (see SPARK-2243). To ignore this error, set
>>> spark.driver.allowMultipleContexts = true. The currently running
>>> SparkContext was created at:
>>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkS
>>> ession.scala:823)
>>> org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
>>> (:15)
>>> (:31)
>>> (:33)
>>> .(:37)
>>> .()
>>> .$print$lzycompute(:7)
>>> .$print(:6)
>>> $print()
>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> java.lang.reflect.Method.invoke(Method.java:497)
>>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
>>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
>>> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$lo
>>> adAndRunReq$1.appl

Re: Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Kevin Burton
I sort of agree but the problem is that some of this should be code.

Some of our ES indexes have 100-200 columns.

Defining which ones are arrays on the command line is going to get ugly
fast.



On Tue, Sep 13, 2016 at 11:50 AM, Sean Owen  wrote:

> You would generally use --conf to set this on the command line if using
> the shell.
>
>
> On Tue, Sep 13, 2016, 19:22 Kevin Burton  wrote:
>
>> The problem is that without a new spark context, with a custom conf,
>> elasticsearch-hadoop is refusing to read in settings about the ES setup...
>>
>> if I do a sc.stop() , then create a new one, it seems to work fine.
>>
>> But it isn't really documented anywhere and all the existing
>> documentation is now invalid because you get an exception when you try to
>> create a new spark context.
>>
>> On Tue, Sep 13, 2016 at 11:13 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I think this works in a shell but you need to allow multiple spark
>>> contexts
>>>
>>> Spark context Web UI available at http://50.140.197.217:5
>>> Spark context available as 'sc' (master = local, app id =
>>> local-1473789661846).
>>> Spark session available as 'spark'.
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0
>>>   /_/
>>> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
>>> 1.8.0_77)
>>> Type in expressions to have them evaluated.
>>> Type :help for more information.
>>>
>>> scala> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkContext
>>> scala>  val conf = new SparkConf().setMaster("local[2]").setAppName("
>>> CountingSheep").
>>> *set("spark.driver.allowMultipleContexts", "true")*conf:
>>> org.apache.spark.SparkConf = org.apache.spark.SparkConf@bb5f9d
>>> scala> val sc = new SparkContext(conf)
>>> sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@
>>> 4888425d
>>>
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 13 September 2016 at 18:57, Sean Owen  wrote:
>>>
>>>> But you're in the shell there, which already has a SparkContext for you
>>>> as sc.
>>>>
>>>> On Tue, Sep 13, 2016 at 6:49 PM, Kevin Burton 
>>>> wrote:
>>>>
>>>>> I'm rather confused here as to what to do about creating a new
>>>>> SparkContext.
>>>>>
>>>>> Spark 2.0 prevents it... (exception included below)
>>>>>
>>>>> yet a TON of examples I've seen basically tell you to create a new
>>>>> SparkContext as standard practice:
>>>>>
>>>>> http://spark.apache.org/docs/latest/configuration.html#
>>>>> dynamically-loading-spark-properties
>>>>>
>>>>> val conf = new SparkConf()
>>>>>  .setMaster("local[2]")
>>>>>  .setAppName("CountingSheep")val sc = new SparkContext(conf)
>>>>>
>>>>>
>>>>> I'm specifically running into a problem in that ES hadoop won't work
>>>>> with its settings and I think its related to this problme.
>>>>>
>>>>> Do we have to call sc.stop() first and THEN create a new spark context?
>>>>>
>>>>> That works,, but I can't find any documentation anywhere telling us
>>>>> the right course of action.
>>>>>
>>>>>
>>>>>
>>>>> scala> val sc = 

  1   2   3   >