Re: Spark streaming with Kinesis broken?

2015-12-10 Thread Nick Pentreath
Yeah also the integration tests need to be specifically run - I would have 
thought the contributor would have run those tests and also tested the change 
themselves using live Kinesis :(



—
Sent from Mailbox

On Fri, Dec 11, 2015 at 6:18 AM, Burak Yavuz  wrote:

> I don't think the Kinesis tests specifically ran when that was merged into
> 1.5.2 :(
> https://github.com/apache/spark/pull/8957
> https://github.com/apache/spark/commit/883bd8fccf83aae7a2a847c9a6ca129fac86e6a3
> AFAIK pom changes don't trigger the Kinesis tests.
> Burak
> On Thu, Dec 10, 2015 at 8:09 PM, Nick Pentreath 
> wrote:
>> Yup also works for me on master branch as I've been testing DynamoDB
>> Streams integration. In fact works with latest KCL 1.6.1 also which I was
>> using.
>>
>> So theKCL version does seem like it could be the issue - somewhere along
>> the line an exception must be getting swallowed. Though the tests should
>> have picked this up? Will dig deeper.
>>
>> —
>> Sent from Mailbox 
>>
>>
>> On Thu, Dec 10, 2015 at 11:07 PM, Brian London 
>> wrote:
>>
>>> Yes, it worked in the 1.6 branch as of commit
>>> db5165246f2888537dd0f3d4c5a515875c7358ed.  That makes it much less
>>> serious of an issue, although it would be nice to know what the root cause
>>> is to avoid a regression.
>>>
>>> On Thu, Dec 10, 2015 at 4:03 PM Burak Yavuz  wrote:
>>>
 I've noticed this happening when there was some dependency conflicts,
 and it is super hard to debug.
 It seems that the KinesisClientLibrary version in Spark 1.5.2 is 1.3.0,
 but it is 1.2.1 in Spark 1.5.1.
 I feel like that seems to be the problem...

 Brian, did you verify that it works with the 1.6.0 branch?

 Thanks,
 Burak

 On Thu, Dec 10, 2015 at 11:45 AM, Brian London 
 wrote:

> Nick's symptoms sound identical to mine.  I should mention that I just
> pulled the latest version from github and it seems to be working there.  
> To
> reproduce:
>
>
>1. Download spark 1.5.2 from http://spark.apache.org/downloads.html
>2. build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests
>clean package
>3. build/mvn -Pkinesis-asl -DskipTests clean package
>4. Then run simultaneously:
>1. bin/run-example streaming.KinesisWordCountASL [Kinesis app name]
>   [Kinesis stream name] [endpoint URL]
>   2.   bin/run-example streaming.KinesisWordProducerASL [Kinesis
>   stream name] [endpoint URL] 100 10
>
>
> On Thu, Dec 10, 2015 at 2:05 PM Jean-Baptiste Onofré 
> wrote:
>
>> Hi Nick,
>>
>> Just to be sure: don't you see some ClassCastException in the log ?
>>
>> Thanks,
>> Regards
>> JB
>>
>> On 12/10/2015 07:56 PM, Nick Pentreath wrote:
>> > Could you provide an example / test case and more detail on what
>> issue
>> > you're facing?
>> >
>> > I've just tested a simple program reading from a dev Kinesis stream
>> and
>> > using stream.print() to show the records, and it works under 1.5.1
>> but
>> > doesn't appear to be working under 1.5.2.
>> >
>> > UI for 1.5.2:
>> >
>> > Inline image 1
>> >
>> > UI for 1.5.1:
>> >
>> > Inline image 2
>> >
>> > On Thu, Dec 10, 2015 at 5:50 PM, Brian London <
>> brianmlon...@gmail.com
>> > > wrote:
>> >
>> > Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The
>> > Kinesis ASL that ships with 1.5.2 appears to not work for me
>> > although 1.5.1 is fine. I spent some time with Amazon earlier in
>> the
>> > week and the only thing we could do to make it work is to change
>> the
>> > version to 1.5.1.  Can someone please attempt to reproduce
>> before I
>> > open a JIRA issue for it?
>> >
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>

>>

Re: architecture though experiment: what is the advantage of using kafka with spark streaming?

2015-12-10 Thread Cody Koeninger
Kafka provides buffering, ordering, decoupling of producers from multiple
consumers.  So pretty much any time you have requirements for asynchronous
process, fault tolerance, and/or a common view of the order of events
across multiple consumers kafka is worth a look.

Spark provides a much richer language for processing data than what you'd
get with writing kafka consumers yourself.

On Thu, Dec 10, 2015 at 8:00 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I noticed that many people are using Kafka and spark streaming. Can some
> one provide a couple of use case
>
> I image some possible use cases might be
>
> Is the purpose using  Kafka
>
>1. provide some buffering?
>2. implementing some sort of load balancing for the over all system?
>3. Provide filtering /sorting of data?
>4. Simplify client connection. Easy for thousands of producers to
>connect to kafka. Probably hard to do with spark streaming
>5. ???
>
> Kind regards
>
> Andy
>


Re: Spark Java.lang.NullPointerException

2015-12-10 Thread michael_han
Hi Sarala,
I found the reason, it's because when spark run it still needs Hadoop
support, I think it's a bug in Spark and still not fixed now ;)

After I download winutils.exe and following the steps from bellow
workaround, it works fine:
http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-lang-NullPointerException-tp25562p25687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



org.apache.spark.SparkException: Task failed while writing rows.+ Spark output data to hive table

2015-12-10 Thread Divya Gehlot
Hi,

I am using HDP2.3.2 with Spark 1.4.1 and trying to insert data in hive
table using hive context.

Below is the sample code


   1. spark-shell   --master yarn-client --driver-memory 512m
--executor-memory 512m
   2. //Sample code
   3. import org.apache.spark.sql.SQLContext
   4. import sqlContext.implicits._
   5. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
   6. val people = sc.textFile("/user/spark/people.txt")
   7. val schemaString = "name age"
   8. import org.apache.spark.sql.Row;
   9. import org.apache.spark.sql.types.{StructType,StructField,StringType};
   10. val schema =
   11.   StructType(
   12. schemaString.split(" ").map(fieldName =>
StructField(fieldName, StringType, true)))
   13. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
   14. //Create hive context
   15. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
   16. //Apply the schema to the
   17. val df = hiveContext.createDataFrame(rowRDD, schema);
   18. val options = Map("path" ->
"hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/personhivetable")
   19. 
df.write.format("org.apache.spark.sql.hive.orc.DefaultSource").options(options).saveAsTable("personhivetable")

Getting below error :


   1. org.apache.spark.SparkException: Task failed while writing rows.
   2.   at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$writeRows$1(commands.scala:191)
   3.   at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$anonfun$insert$1.apply(commands.scala:160)
   4.   at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$anonfun$insert$1.apply(commands.scala:160)
   5.   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
   6.   at org.apache.spark.scheduler.Task.run(Task.scala:70)
   7.   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
   8.   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   9.   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   10.  at java.lang.Thread.run(Thread.java:745)
   11. Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
   12.  at 
$line30.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$2.apply(:29)
   13.  at 
$line30.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$2.apply(:29)
   14.  at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
   15.  at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
   16.  at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$writeRows$1(commands.scala:182)
   17.  ... 8 more

Is it configuration issue?

When I googled it I found out that Environment variable named HIVE_CONF_DIR
should be there in spark-env.sh

Then I checked spark-env.sh in HDP2.3.2,I couldnt find the Environment
variable named HIVE_CONF_DIR .

Do I need to add above mentioned variables to insert spark output data to
hive tables.

Would really appreciate pointers.

Thanks,

Divya
Add comment



Spark assembly in Maven repo?

2015-12-10 Thread Xiaoyong Zhu
Hi Experts,

We have a project which has a dependency for the following jar

spark-assembly--hadoop.jar
for example:
spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar

since this assembly might be updated in the future, I am not sure if there is a 
Maven repo that has the above spark assembly jar? Or should we create & upload 
it to Maven central?

Thanks!

Xiaoyong



Re: memory leak when saving Parquet files in Spark

2015-12-10 Thread Cheng Lian
This is probably caused by schema merging. Were you using Spark 1.4 or 
earlier versions? Could you please try the following snippet to see 
whether it helps:


df.write
  .format("parquet")
  .option("mergeSchema", "false")
  .partitionBy(partitionCols: _*)
  .mode(saveMode)
  .save(targetPath)

In 1.5, we've disabled schema merging by default.

Cheng

On 12/11/15 5:33 AM, Matt K wrote:

Hi all,

I have a process that's continuously saving data as Parquet with 
Spark. The bulk of the saving logic simply looks like this:


  df.write
.format("parquet")
.partitionBy(partitionCols: _*)
.mode(saveMode).save(targetPath)

After running for a day or so, my process ran out of memory. I took a 
memory-dump. I see that a single thread is holding 32,189 
org.apache.parquet.hadoop.Footer objects, which in turn hold 
ParquetMetadata. This is highly suspicious, since each thread 
processes under 1GB of data at a time, and there's usually no more 
than 10 files in a single batch (no small file problem). So there may 
be a memory leak somewhere in the saveAsParquet code-path.


I've attached a screen-shot from Eclipse MemoryAnalyzer showing the 
above. Note 32,189 references.


A shot in the dark, but is there a way to disable ParquetMetadata file 
generation?


Thanks,
-Matt


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark assembly in Maven repo?

2015-12-10 Thread Jeff Zhang
I don't think make the assembly jar as dependency a good practice. You may
meet jar hell issue in that case.

On Fri, Dec 11, 2015 at 2:46 PM, Xiaoyong Zhu 
wrote:

> Hi Experts,
>
>
>
> We have a project which has a dependency for the following jar
>
>
>
> spark-assembly--hadoop.jar
>
> for example:
>
> spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar
>
>
>
> since this assembly might be updated in the future, I am not sure if there
> is a Maven repo that has the above spark assembly jar? Or should we create
> & upload it to Maven central?
>
>
>
> Thanks!
>
>
>
> Xiaoyong
>
>
>



-- 
Best Regards

Jeff Zhang


RE: Spark assembly in Maven repo?

2015-12-10 Thread Xiaoyong Zhu
Sorry – I didn’t make it clear. It’s actually not a “dependency” – it’s 
actually that we are building a certain plugin for IntelliJ where we want to 
distribute this jar. But since the jar is updated frequently we don't want to 
distribute it together with our plugin but we would like to download it via 
Maven.

In this case what’s the recommended way?

Xiaoyong

From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Thursday, December 10, 2015 11:03 PM
To: Xiaoyong Zhu 
Cc: user@spark.apache.org
Subject: Re: Spark assembly in Maven repo?

I don't think make the assembly jar as dependency a good practice. You may meet 
jar hell issue in that case.

On Fri, Dec 11, 2015 at 2:46 PM, Xiaoyong Zhu 
mailto:xiaoy...@microsoft.com>> wrote:
Hi Experts,

We have a project which has a dependency for the following jar

spark-assembly--hadoop.jar
for example:
spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar

since this assembly might be updated in the future, I am not sure if there is a 
Maven repo that has the above spark assembly jar? Or should we create & upload 
it to Maven central?

Thanks!

Xiaoyong




--
Best Regards

Jeff Zhang


Re: RE: Spark assembly in Maven repo?

2015-12-10 Thread fightf...@163.com
Using maven to download the assembly jar is fine. I would recommend to deploy 
this 
assembly jar to your local maven repo, i.e. nexus repo, Or more likey a 
snapshot repository



fightf...@163.com
 
From: Xiaoyong Zhu
Date: 2015-12-11 15:10
To: Jeff Zhang
CC: user@spark.apache.org; Zhaomin Xu; Joe Zhang (SDE)
Subject: RE: Spark assembly in Maven repo?
Sorry – I didn’t make it clear. It’s actually not a “dependency” – it’s 
actually that we are building a certain plugin for IntelliJ where we want to 
distribute this jar. But since the jar is updated frequently we don't want to 
distribute it together with our plugin but we would like to download it via 
Maven.
 
In this case what’s the recommended way? 
 
Xiaoyong
 
From: Jeff Zhang [mailto:zjf...@gmail.com] 
Sent: Thursday, December 10, 2015 11:03 PM
To: Xiaoyong Zhu 
Cc: user@spark.apache.org
Subject: Re: Spark assembly in Maven repo?
 
I don't think make the assembly jar as dependency a good practice. You may meet 
jar hell issue in that case. 
 
On Fri, Dec 11, 2015 at 2:46 PM, Xiaoyong Zhu  wrote:
Hi Experts,
 
We have a project which has a dependency for the following jar
 
spark-assembly--hadoop.jar
for example:
spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar
 
since this assembly might be updated in the future, I am not sure if there is a 
Maven repo that has the above spark assembly jar? Or should we create & upload 
it to Maven central?
 
Thanks!
 
Xiaoyong
 


 
-- 
Best Regards

Jeff Zhang


Re: RE: Spark assembly in Maven repo?

2015-12-10 Thread Mark Hamstra
No, publishing a spark assembly jar is not fine.  See the doc attached to
https://issues.apache.org/jira/browse/SPARK-11157 and be aware that a
likely goal of Spark 2.0 will be the elimination of assemblies.

On Thu, Dec 10, 2015 at 11:19 PM, fightf...@163.com 
wrote:

> Using maven to download the assembly jar is fine. I would recommend to
> deploy this
> assembly jar to your local maven repo, i.e. nexus repo, Or more likey a
> snapshot repository
>
> --
> fightf...@163.com
>
>
> *From:* Xiaoyong Zhu 
> *Date:* 2015-12-11 15:10
> *To:* Jeff Zhang 
> *CC:* user@spark.apache.org; Zhaomin Xu ; Joe Zhang
> (SDE) 
> *Subject:* RE: Spark assembly in Maven repo?
>
> Sorry – I didn’t make it clear. It’s actually not a “dependency” – it’s
> actually that we are building a certain plugin for IntelliJ where we want
> to distribute this jar. But since the jar is updated frequently we don't
> want to distribute it together with our plugin but we would like to
> download it via Maven.
>
>
>
> In this case what’s the recommended way?
>
>
>
> Xiaoyong
>
>
>
> *From:* Jeff Zhang [mailto:zjf...@gmail.com]
> *Sent:* Thursday, December 10, 2015 11:03 PM
> *To:* Xiaoyong Zhu 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark assembly in Maven repo?
>
>
>
> I don't think make the assembly jar as dependency a good practice. You may
> meet jar hell issue in that case.
>
>
>
> On Fri, Dec 11, 2015 at 2:46 PM, Xiaoyong Zhu 
> wrote:
>
> Hi Experts,
>
>
>
> We have a project which has a dependency for the following jar
>
>
>
> spark-assembly--hadoop.jar
>
> for example:
>
> spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar
>
>
>
> since this assembly might be updated in the future, I am not sure if there
> is a Maven repo that has the above spark assembly jar? Or should we create
> & upload it to Maven central?
>
>
>
> Thanks!
>
>
>
> Xiaoyong
>
>
>
>
>
>
>
> --
>
> Best Regards
>
> Jeff Zhang
>
>


Re: How to use collections inside foreach block

2015-12-10 Thread Madabhattula Rajesh Kumar
Hi Rishi and Ted,

Thank you for the response. Now I'm using Accumulators and getting results.
I have a another query, how to start parallel the code.

Example :-

var listOfIds is a ListBuffer with 2 records

I'm creating batches. For each batch size is 500. It means, total batches
are : 40.

listOfIds.grouped(500).foreach { x =>
{
val r = sc.parallelize(x).toDF()
r.registerTempTable("r")
 val acc = sc.accumulator(0, "My Accumulator")
var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = t.id
")
 result.foreach{ y =>
 {
 acc += y
  }
}
acc.value.foreach(f => // saveing values to other db)
}

Above code is working in sequence. I want to run these 40 batches in
parallel.

*How to start these 40 bathes in parallel instead of sequence. *

Could you please help me to resolve this use case.

Regards,
Rajesh


On Wed, Dec 9, 2015 at 4:46 PM, Ted Yu  wrote:

> To add onto what Rishi said, you can use foreachPartition() on result
> where you can save values to DB.
>
> Cheers
>
> On Wed, Dec 9, 2015 at 12:51 AM, Rishi Mishra 
> wrote:
>
>> Your list is defined on the driver, whereas function specified in forEach
>> will be evaluated on each executor.
>> You might want to add an accumulator or handle a Sequence of list from
>> each partition.
>>
>> On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar <
>> mrajaf...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a below query. Please help me to solve this
>>>
>>> I have a 2 ids. I want to join these ids to table. This table
>>> contains some blob data. So i can not join these 2000 ids to this table in
>>> one step.
>>>
>>> I'm planning to join this table in a chunks. For example, each step I
>>> will join 5000 ids.
>>>
>>> Below code is not working. I'm not able to add result to ListBuffer.
>>> Result s giving always ZERO
>>>
>>> *Code Block :-*
>>>
>>> var listOfIds is a ListBuffer with 2 records
>>>
>>> listOfIds.grouped(5000).foreach { x =>
>>> {
>>> var v1 = new ListBuffer[String]()
>>> val r = sc.parallelize(x).toDF()
>>> r.registerTempTable("r")
>>> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id =
>>> t.id")
>>>  result.foreach{ y =>
>>>  {
>>>  v1 += y
>>>   }
>>> }
>>> println(" SIZE OF V1 === "+ v1.size)  ==>
>>>
>>> *THIS VALUE PRINTING AS ZERO*
>>>
>>> *// Save v1 values to other db*
>>> }
>>>
>>> Please help me on this.
>>>
>>> Regards,
>>> Rajesh
>>>
>>
>>
>>
>> --
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>


Re: SparkStreaming variable scope

2015-12-10 Thread Pinela
exactly :)
thanks Harsh :D

On Thu, Dec 10, 2015 at 3:18 AM, Harsh J  wrote:

> > and then calling getRowID() in the lambda, because the function gets
> sent to the executor right?
>
> Yes, that is correct (vs. a one time evaluation, as was with your
> assignment earlier).
>
> On Thu, Dec 10, 2015 at 3:34 AM Pinela  wrote:
>
>> Hey Bryan,
>>
>> Thank for the answer ;) I knew it was a basic python/spark-noob thing :)
>>
>> this also worked
>>
>> *def getRowID():*
>> * return datetime.now().strftime("%Y%m%d%H%M%S")*
>>
>>
>> and then calling getRowID() in the lambda, because the function gets sent
>> to the executor right?
>>
>> Thanks again for the quick reply :)
>>
>> All the best and Happy Holidays.
>> Jpinela.
>>
>>
>>
>> On Wed, Dec 9, 2015 at 8:22 PM, Bryan Cutler  wrote:
>>
>>> rowid from your code is a variable in the driver, so it will be
>>> evaluated once and then only the value is sent to words.map.  You probably
>>> want to have rowid be a lambda itself, so that it will get the value at the
>>> time it is evaluated.  For example if I have the following:
>>>
>>> >>> data = sc.parallelize([1,2,3])
>>> >>> from datetime import datetime
>>> >>> rowid = lambda: datetime.now().strftime("%Y%m%d%H%M%S")
>>> >>> data.map(lambda x: (rowid(), x))
>>> >>> mdata = data.map(lambda x: (rowid(), x))
>>> >>> mdata.collect()
>>> [('20151209121532', 1), ('20151209121532', 2), ('20151209121532', 3)]
>>> >>> mdata.collect()
>>> [('20151209121540', 1), ('20151209121540', 2), ('20151209121540', 3)]
>>>
>>> here rowid is evaluated whenever an action is called on the RDD, i.e.
>>> collect
>>>
>>> On Wed, Dec 9, 2015 at 10:23 AM, jpinela  wrote:
>>>
 Hi Guys,
 I am sure this is a simple question, but I can't find it in the docs
 anywhere.
 This reads from flume and writes to hbase (as you can see).
 But has a variable scope problem (I believe).
 I have the following code:

 *
 from pyspark.streaming import StreamingContext
 from pyspark.streaming.flume import FlumeUtils
 from datetime import datetime
 ssc = StreamingContext(sc, 5)
 conf = {"hbase.zookeeper.quorum": "ubuntu3",
 "hbase.mapred.outputtable": "teste2",
 "mapreduce.outputformat.class":
 "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
 "mapreduce.job.output.key.class":
 "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
 "mapreduce.job.output.value.class":
 "org.apache.hadoop.io.Writable"}


 keyConv =

 "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
 valueConv =
 "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

 lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997)
 words = lines.map(lambda line: line[1])
 rowid = datetime.now().strftime("%Y%m%d%H%M%S")
 outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x]))
 print("ok 1")
 outrdd.pprint()

 outrdd.foreachRDD(lambda x:

 x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv))

 ssc.start()
 ssc.awaitTermination()*

 the issue is that the rowid variable is allways at the point that the
 streaming was began.
 How can I go around this? I tried a function, an application, nothing
 worked.
 Thank you.
 jp



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>


Re: INotifyDStream - where to find it?

2015-12-10 Thread Harsh J
I couldn't spot it anywhere on the web so it doesn't look to be contributed
yet, but note that the HDFS APIs are already available per
https://issues.apache.org/jira/browse/HDFS-6634 (you can see the test case
for an implementation guideline in Java:
https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java#L107
)

On Wed, Dec 9, 2015 at 2:55 AM octagon blue 
wrote:

> Hi All,
>
> I am using pyspark streaming to ETL raw data files as they land on HDFS.
> While researching this topic I found this great presentation about Spark
> and Spark Streaming at Uber
> (http://www.slideshare.net/databricks/spark-meetup-at-uber), where they
> mention this INotifyDStream that sounds very interesting and like it may
> suit my use case well.
>
> Does anyone know if this code has been submitted to apache, or how I
> might otherwise come upon it?
>
> Reference: https://issues.apache.org/jira/browse/SPARK-10555 - Add
> INotifyDStream to Spark Streaming
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Unable to acces hive table (created through hive context) in hive console

2015-12-10 Thread Harsh J
Are you certain you are providing Spark with the right Hive configuration?
Is there a valid HIVE_CONF_DIR defined in your spark-env.sh, with a
hive-site.xml detailing the location/etc. of the metastore service and/or
DB?

Without a valid metastore config, Hive may switch to using a local
(embedded) Derby DB and create metadata under that - thereby giving the
impression of things working, but not actually updating your true metastore
database.

You should be able to see your spark driver connect to your metastore if it
has been properly configured.

There's nothing incorrect with your program, it works just fine on my CDH5
Spark-on-YARN instance (and SHOW TABLES; inside Hive does show the
"persontable" created, after the program's completion).

On Tue, Dec 8, 2015 at 11:47 AM Divya Gehlot 
wrote:

> Hi,
>
> I am new bee to Spark and using HDP 2.2 which comes with Spark 1.3.1
> I tried following  code example
>
>> import org.apache.spark.sql.SQLContext
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>> import sqlContext.implicits._
>>
>> val personFile = "/user/hdfs/TestSpark/Person.csv"
>> val df = sqlContext.load(
>> "com.databricks.spark.csv",
>> Map("path" -> personFile, "header" -> "true", "inferSchema" ->
>> "true"))
>> df.printSchema()
>> val selectedData = df.select("Name", "Age")
>> selectedData.save("NewPerson.csv", "com.databricks.spark.csv")
>> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> hiveContext.sql("CREATE TABLE IF NOT EXISTS PersonTable (Name STRING, Age
>> STRING)")
>> hiveContext.sql("LOAD DATA  INPATH '/user/hdfs/NewPerson.csv' INTO TABLE
>> PersonTable")
>> hiveContext.sql("from PersonTable SELECT Name, Age
>> ").collect.foreach(println)
>
>
> I am able to access above table in HDFS
>
>> [hdfs@sandbox ~]$ hadoop fs -ls /user/hive/warehouse/persontable
>> Found 3 items
>> -rw-r--r--   1 hdfs hdfs  0 2015-12-08 04:40
>> /user/hive/warehouse/persontable/_SUCCESS
>> -rw-r--r--   1 hdfs hdfs 47 2015-12-08 04:40
>> /user/hive/warehouse/persontable/part-0
>> -rw-r--r--   1 hdfs hdfs 33 2015-12-08 04:40
>> /user/hive/warehouse/persontable/part-1
>
>
> But when I try show tables in hive console ,I couldnt find the table.
>
>> hive> use default ;
>> OK
>> Time taken: 0.864 seconds
>> hive> show tables;
>> OK
>> dataframe_test
>> sample_07
>> sample_08
>> Time taken: 0.521 seconds, Fetched: 3 row(s)
>> hive> use xademo ;
>> OK
>> Time taken: 0.791 seconds
>> hive> show tables;
>> OK
>> call_detail_records
>> customer_details
>> recharge_details
>> Time taken: 0.256 seconds, Fetched: 3 row(s)
>
>
> Can somebody guide me to right direction ,if something is wrong with the
> code or I am unable to understand the concepts.
> Would really appreciate your help.
>
> Thanks,
> Divya
>
>


Help: Get Timeout error and FileNotFoundException when shuffling large files

2015-12-10 Thread kendal
Hi there, 
My application is simply easy, just read huge files from HDFS with 
textFile()
Then I will map to to tuples, after than a reduceByKey(), finally
saveToTextFile().

The problem is when I am dealing with large inputs (2.5T), when the
application enter to the 2nd stage -- reduce by key. It fail with the
exception of FileNotFoundException when trying to fetch the temp files. I
also see Timeout (120s) error before that exception. No other exception or
error. (OOM, to many files, etc..)

I had done a lot of google searches, and tried to increase executor memory,
repartition the RDD to more splits, etc but in vain. 
I also find another post here:
http://permalink.gmane.org/gmane.comp.lang.scala.spark.user/5449
which has exactly the same problem with mine. 

Any idea? thanks so much for the help



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-Get-Timeout-error-and-FileNotFoundException-when-shuffling-large-files-tp25662.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: About the bottleneck of parquet file reading in Spark

2015-12-10 Thread Cheng Lian
Cc Spark user list since this information is generally useful.

On Thu, Dec 10, 2015 at 3:31 PM, Lionheart <87249...@qq.com> wrote:

> Dear, Cheng
>  I'm a user of Spark. Our current Spark version is 1.4.1
>  In our project, I find there is a bottleneck when loading huge amount
> of parquet files. We tried to load more than 5 parquet files into the
> spark. The total size of the data is about 150G bytes. We find that Spark
> spent more than 30 minutes to do
>  sqlContext.read.option("mergSchema","false") .parquet(filelist:_*)
>  During this time, the network, disk and cpu are not busy. And based
> on the profile, all time is used by the FileSystem.globStatus(). Then I
> find the commit SPARK-8125 by you which accelerates the speed.
>  Then I update Spark to 1.5.1. Base on the test, the driver spent 13
> minutes to do the parquet reading. But I think there is still some
> possibility to improve this speed.
>   Base on the profile and reading the code, I find that the
> DataFrameReader method parquet is implemented in a serial manner to process
> the Path. Do you think if change parquet method into a concurrent version,
> the performance will become much better since there are many CPU core in
> the drive node of Spark?
>

Usually there shouldn't be many distinct paths passed to
DataFrameReader.parquet(). For those data files living under the same
parent directory, you can pass the path of their parent directory instead
of paths of all data files. Then I think this won't be a huge bottleneck.


>   By the way, when will the issud SPARK-8824 be solved. In my opinion,
> loss some precision with a warning message is better than throw a exception
> and say it is not supported.
>

This is a good question. For all those 4 data types,

   - DATE: It's actually already been supported, just resolved that JIRA
   ticket.
   - INTERVAL: We can start woking on this since now we've finally got
   CalendarIntervalType.
   - TIMESTAMP_MILLIS: We can start working on support this on the read
   path and convert extracted millisec timestamps to microsec ones. For the
   write path, maybe we can have an option to indicate whether
   TIMESTAMP_MILLIS or INT96 should be used to store timestamp values. If the
   former is chose, microsec part of the timestamp will be truncated.
   - TIMESTAMP_MICROS: Unfortunately this one depends on parquet-format and
   parquet-mr, which haven't added TIMESTAMP_MILLIS as OriginalType.



>
> Sincerely,
> Zhizhou Li
>
>
>
>


RE: FileNotFoundException in appcache shuffle files

2015-12-10 Thread kendal
I have similar issues... Exception only with very large data. 
And I tried to double the memory or partition as suggested by some google
search, but in vain..
any idea?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p25663.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark groupby and agg inconsistent and missing data

2015-12-10 Thread Kapil Raaj
Hi Folks,

I am also getting similar issue:

(df.groupBy("email").agg(last("user_id") as
"user_id").select("user_id").count,df.groupBy("email").agg(last("user_id")
as "user_id").select("user_id").distinct.count)

When run on one computer it gives: (15123144,15123144)

When run on cluster it gives:  (15123144,24)

The first one is expected and looks correct but second one is horribly
wrong. One more observation - even if I change data where total count is
more/less than 15123144 I get distinct = 24 on cluster. Any clue? or Jira
ticket? or what can be fix for now?

On Thu, Oct 22, 2015 at 9:59 PM,  wrote:

> nevermind my last email. res2 is filtered so my test does not make sense.
> The issue is not reproduced there. I have the problem somwhere else.
>
>
>
> *From:* Ellafi, Saif A.
> *Sent:* Thursday, October 22, 2015 12:57 PM
> *To:* 'Xiao Li'
> *Cc:* user
> *Subject:* RE: Spark groupby and agg inconsistent and missing data
>
>
>
> Thanks, sorry I cannot share the data and not sure how much significant it
> will be for you.
>
> I am reproducing the issue on a smaller piece of the content and see
> wether I find a reason on the inconsistence.
>
>
>
> val res2 = data.filter($"closed" === $"ever_closed").groupBy("product",
> "band ", "aget", "vine", "time",
> "mm").agg(count($"account_id").as("N"), sum($"balance").as("balance"),
> sum($"spend").as("spend"), sum($"payment").as("payment")).persist()
>
>
>
> then I collect distinct values of “vine” (which is StringType) both from
> data and res2, and res2 is missing a lot of values:
>
>
>
> val t1 = res2.select("vine").distinct.collect
>
> scala> t1.size
>
> res10: Int = 617
>
>
>
> val t_real = data.select("vine").distinct.collect
>
> scala> t_real.size
>
> res9: Int = 639
>
>
>
>
>
> *From:* Xiao Li [mailto:gatorsm...@gmail.com ]
> *Sent:* Thursday, October 22, 2015 12:45 PM
> *To:* Ellafi, Saif A.
> *Cc:* user
> *Subject:* Re: Spark groupby and agg inconsistent and missing data
>
>
>
> Hi, Saif,
>
>
>
> Could you post your code here? It might help others reproduce the errors
> and give you a correct answer.
>
>
>
> Thanks,
>
>
>
> Xiao Li
>
>
>
> 2015-10-22 8:27 GMT-07:00 :
>
> Hello everyone,
>
>
>
> I am doing some analytics experiments under a 4 server stand-alone cluster
> in a spark shell, mostly involving a huge database with groupBy and
> aggregations.
>
>
>
> I am picking 6 groupBy columns and returning various aggregated results in
> a dataframe. GroupBy fields are of two types, most of them are StringType
> and the rest are LongType.
>
>
>
> The data source is a splitted json file dataframe,  once the data is
> persisted, the result is consistent. But if I unload the memory and reload
> the data, the groupBy action returns different content results, missing
> data.
>
>
>
> Could I be missing something? this is rather serious for my analytics, and
> not sure how to properly diagnose this situation.
>
>
>
> Thanks,
>
> Saif
>
>
>
>
>



-- 
-Kapil Rajak 


Re: FileNotFoundException in appcache shuffle files

2015-12-10 Thread Jiří Syrový
Usually there is another error or log message before FileNotFoundException.
Try to check your logs for something like that.

2015-12-10 10:47 GMT+01:00 kendal :

> I have similar issues... Exception only with very large data.
> And I tried to double the memory or partition as suggested by some google
> search, but in vain..
> any idea?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p25663.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is Spark History Server supported for Mesos?

2015-12-10 Thread Steve Loughran

On 9 Dec 2015, at 22:01, Kelvin Chu 
<2dot7kel...@gmail.com> wrote:

Spark on YARN can use History Server by setting the configuration 
spark.yarn.historyServer.address.


That's the stuff in SPARK-1537 which isn' actually built in yet.

But, I can't find similar config for Mesos. Is History Server supported by 
Spark on Mesos? Thanks.



you set up your jobs to log to HDFS and run the history server to poll and 
replay the logs that appear there.

see: http://spark.apache.org/docs/latest/monitoring.html




Apache spark Web UI on Amazon EMR not working

2015-12-10 Thread sonal sharma
Hi,

We are using Spark on Amazon EMR 4.1. To access Spark web UI, we are using
the link in yarn resource manager, but we are seeing a blank page on it.
Further, using Firefox debugging we noticed that we got a HTTP 500 error in
response.

We have tried configuring proxy settings for AWS and also replacing the
internal hostname with external hostname/ip address but are still not able
to access the web ui.

The same steps worked for us in the past on EMR 3.x version.

Any guidance would be helpful.

Many Thanks,
Sonal


Can't filter

2015-12-10 Thread Бобров Виктор
Hi, I can’t filter my rdd.

 

def filter1(tp: ((Array[String], Int), (Array[String], Int))): Boolean= {
  tp._1._2 > tp._2._2
}
val mail_rdd = sc.parallelize(A.toSeq).cache()
val step1 = mail_rdd.cartesian(mail_rdd)
val step2 = step1.filter(filter1)

 

Get error “Class not found”. What I’m doing wrong ? Thanks for help.

 

 

 



Re: Can't filter

2015-12-10 Thread Ndjido Ardo Bar
Please send your call stack with the full description of the exception .

> On 10 Dec 2015, at 12:10, Бобров Виктор  wrote:
> 
> Hi, I can’t filter my rdd.
>  
> def filter1(tp: ((Array[String], Int), (Array[String], Int))): Boolean= {
>   tp._1._2 > tp._2._2
> }
> val mail_rdd = sc.parallelize(A.toSeq).cache()
> val step1 = mail_rdd.cartesian(mail_rdd)
> val step2 = step1.filter(filter1)
>  
> Get error “Class not found”. What I’m doing wrong ? Thanks for help.
>  
>  
>  


RE: Can't filter

2015-12-10 Thread Бобров Виктор
0 = {StackTraceElement@7132} 
"com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.a(Unknown 
Source)"

1 = {StackTraceElement@7133} 
"com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
 Source)"

2 = {StackTraceElement@7134} 
"org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)"

3 = {StackTraceElement@7135} 
"org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:81)"

4 = {StackTraceElement@7136} 
"org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187)"

5 = {StackTraceElement@7137} 
"org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)"

6 = {StackTraceElement@7138} 
"org.apache.spark.SparkContext.clean(SparkContext.scala:2030)"

7 = {StackTraceElement@7139} 
"org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:331)"

8 = {StackTraceElement@7140} 
"org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:330)"

9 = {StackTraceElement@7141} 
"org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)"

10 = {StackTraceElement@7142} 
"org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)"

11 = {StackTraceElement@7143} 
"org.apache.spark.rdd.RDD.withScope(RDD.scala:306)"

12 = {StackTraceElement@7144} "org.apache.spark.rdd.RDD.filter(RDD.scala:330)"

13 = {StackTraceElement@7145} 
"SimpleApp$GeneratedEvaluatorClass$44$1.invoke(FileToCompile0.scala:30)"

14 = {StackTraceElement@7146} "SimpleApp$.main(test1.scala:26)"

15 = {StackTraceElement@7147} "SimpleApp.main(test1.scala)"

 

From: Ndjido Ardo Bar [mailto:ndj...@gmail.com] 
Sent: Thursday, December 10, 2015 2:20 PM
To: Бобров Виктор 
Cc: user@spark.apache.org
Subject: Re: Can't filter

 

Please send your call stack with the full description of the exception .


On 10 Dec 2015, at 12:10, Бобров Виктор mailto:ma...@bk.ru> > 
wrote:

Hi, I can’t filter my rdd.

 

def filter1(tp: ((Array[String], Int), (Array[String], Int))): Boolean= {
  tp._1._2 > tp._2._2
}
val mail_rdd = sc.parallelize(A.toSeq).cache()
val step1 = mail_rdd.cartesian(mail_rdd)
val step2 = step1.filter(filter1)

 

Get error “Class not found”. What I’m doing wrong ? Thanks for help.

 

 

 



Re: Help: Get Timeout error and FileNotFoundException when shuffling large files

2015-12-10 Thread Sudhanshu Janghel
Can you please paste the stack trace.

Sudhanshu


Inverse of the matrix

2015-12-10 Thread Arunkumar Pillai
Hi

I need to find inverse (X(Transpose) * X) matrix. I have found X transpose
and matrix multiplication.

is there any way to find to find the inverse of the matrix.



-- 
Thanks and Regards
Arun


example of querying LDA model

2015-12-10 Thread Olga Syrova
Dear Spark users,

I created an LDA model using Spark in Java and would like to do some similarity 
queries now, I'm especially interested in "query -> most similar docs" method. 
I spent many hours looking for some examples how to map the query to LDA space, 
but didn't come out with any clear solution. I would be very grateful if you 
could suggest some resources on that point.

String query = "java developer"

DistributedLDAModel ldaModel = DistributedLDAModel.load(sc.sc(), 
"test_lda_model");

// topic distribution over docs

JavaPairRDD 
topicDistributionsOverDocs = ldaModel.javaTopicDistributions();



// Inferred topics, where each topic is represented by a distribution over 
terms. k is the number of topics
Matrix topics = ldaModel.topicsMatrix();



The main question is how I can convert my query to LDA vector space..



Thank you and have a nice day!

Olga




Re: Can't filter

2015-12-10 Thread Harsh J
Are you sure you do not have any messages preceding the trace, such as one
quoting which class is found to be missing? That'd be helpful to see and
suggest what may (exactly) be going wrong. It appear similar to
https://issues.apache.org/jira/browse/SPARK-8368, but I cannot tell for
certain cause I don't know if your code uses the SparkSQL features.

Also, what version is your Spark running?

I am able to run your program without a problem in Spark 1.5.x (with a
sample Seq).

On Thu, Dec 10, 2015 at 5:01 PM Бобров Виктор  wrote:

> 0 = {StackTraceElement@7132}
> "com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.a(Unknown
> Source)"
>
> 1 = {StackTraceElement@7133}
> "com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
> Source)"
>
> 2 = {StackTraceElement@7134}
> "org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)"
>
> 3 = {StackTraceElement@7135}
> "org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:81)"
>
> 4 = {StackTraceElement@7136}
> "org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187)"
>
> 5 = {StackTraceElement@7137}
> "org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)"
>
> 6 = {StackTraceElement@7138}
> "org.apache.spark.SparkContext.clean(SparkContext.scala:2030)"
>
> 7 = {StackTraceElement@7139}
> "org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:331)"
>
> 8 = {StackTraceElement@7140}
> "org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:330)"
>
> 9 = {StackTraceElement@7141}
> "org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)"
>
> 10 = {StackTraceElement@7142}
> "org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)"
>
> 11 = {StackTraceElement@7143}
> "org.apache.spark.rdd.RDD.withScope(RDD.scala:306)"
>
> 12 = {StackTraceElement@7144}
> "org.apache.spark.rdd.RDD.filter(RDD.scala:330)"
>
> 13 = {StackTraceElement@7145}
> "SimpleApp$GeneratedEvaluatorClass$44$1.invoke(FileToCompile0.scala:30)"
>
> 14 = {StackTraceElement@7146} "SimpleApp$.main(test1.scala:26)"
>
> 15 = {StackTraceElement@7147} "SimpleApp.main(test1.scala)"
>
>
>
> *From:* Ndjido Ardo Bar [mailto:ndj...@gmail.com]
> *Sent:* Thursday, December 10, 2015 2:20 PM
> *To:* Бобров Виктор 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Can't filter
>
>
>
> Please send your call stack with the full description of the exception .
>
>
> On 10 Dec 2015, at 12:10, Бобров Виктор  wrote:
>
> Hi, I can’t filter my rdd.
>
>
>
> *def *filter1(tp: ((Array[String], Int), (Array[String], Int))): Boolean=
> {
>   tp._1._2 > tp._2._2
> }
> *val *mail_rdd = sc.parallelize(A.toSeq).cache()
> *val *step1 = mail_rdd.cartesian(mail_rdd)
> *val *step2 = step1.filter(filter1)
>
>
>
> Get error “Class not found”. What I’m doing wrong ? Thanks for help.
>
>
>
>
>
>
>
>


RE: Can't filter

2015-12-10 Thread Бобров Виктор
Spark – 1.5.1, ty for help.

 

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.io.Source


object SimpleApp {
def main(args: Array[String]) {
var A = scala.collection.mutable.Map[Array[String], Int]()
val filename = "C:\\Users\\bobrov\\IdeaProjects\\spark\\file\\spark1.txt"
for((line, i) <- Source.fromFile(filename).getLines().zipWithIndex){
  val lst = line.split(" ")
  A += (lst -> i)
}

def filter1(tp: ((Array[String], Int), (Array[String], Int))): Boolean= {
  tp._1._2 < tp._2._2
  }

val conf = new 
SparkConf().setMaster("spark://web01:7077").setAppName("Simple Application")
val sc = new SparkContext(conf)
val mail_rdd = sc.parallelize(A.toSeq).cache()
val step1 = mail_rdd.cartesian(mail_rdd)
val step2 = step1.filter(filter1)
//step1.collect().foreach(println)
  }
}

 

From: Harsh J [mailto:ha...@cloudera.com] 
Sent: Thursday, December 10, 2015 2:50 PM
To: Бобров Виктор ; Ndjido Ardo Bar 
Cc: user@spark.apache.org
Subject: Re: Can't filter

 

Are you sure you do not have any messages preceding the trace, such as one 
quoting which class is found to be missing? That'd be helpful to see and 
suggest what may (exactly) be going wrong. It appear similar to 
https://issues.apache.org/jira/browse/SPARK-8368, but I cannot tell for certain 
cause I don't know if your code uses the SparkSQL features.

 

Also, what version is your Spark running?

 

I am able to run your program without a problem in Spark 1.5.x (with a sample 
Seq).

 

On Thu, Dec 10, 2015 at 5:01 PM Бобров Виктор mailto:ma...@bk.ru> 
> wrote:

0 = {StackTraceElement@7132} 
"com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.a(Unknown 
Source)"

1 = {StackTraceElement@7133} 
"com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
 Source)"

2 = {StackTraceElement@7134} 
"org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)"

3 = {StackTraceElement@7135} 
"org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:81)"

4 = {StackTraceElement@7136} 
"org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187)"

5 = {StackTraceElement@7137} 
"org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)"

6 = {StackTraceElement@7138} 
"org.apache.spark.SparkContext.clean(SparkContext.scala:2030)"

7 = {StackTraceElement@7139} 
"org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:331)"

8 = {StackTraceElement@7140} 
"org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:330)"

9 = {StackTraceElement@7141} 
"org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)"

10 = {StackTraceElement@7142} 
"org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)"

11 = {StackTraceElement@7143} 
"org.apache.spark.rdd.RDD.withScope(RDD.scala:306)"

12 = {StackTraceElement@7144} "org.apache.spark.rdd.RDD.filter(RDD.scala:330)"

13 = {StackTraceElement@7145} 
"SimpleApp$GeneratedEvaluatorClass$44$1.invoke(FileToCompile0.scala:30)"

14 = {StackTraceElement@7146} "SimpleApp$.main(test1.scala:26)"

15 = {StackTraceElement@7147} "SimpleApp.main(test1.scala)"

 

From: Ndjido Ardo Bar [mailto:ndj...@gmail.com  ] 
Sent: Thursday, December 10, 2015 2:20 PM
To: Бобров Виктор mailto:ma...@bk.ru> >
Cc: user@spark.apache.org  
Subject: Re: Can't filter

 

Please send your call stack with the full description of the exception .


On 10 Dec 2015, at 12:10, Бобров Виктор mailto:ma...@bk.ru> > 
wrote:

Hi, I can’t filter my rdd.

 

def filter1(tp: ((Array[String], Int), (Array[String], Int))): Boolean= {
  tp._1._2 > tp._2._2
}
val mail_rdd = sc.parallelize(A.toSeq).cache()
val step1 = mail_rdd.cartesian(mail_rdd)
val step2 = step1.filter(filter1)

 

Get error “Class not found”. What I’m doing wrong ? Thanks for help.

 

 

 

x25bec77b 27f314b3 x4f15e72c 60591dbe x6389e0e8 2db60710 2ee71432 410012b 
x11f1e542 x66f9dbb8 4db90a23 x4755f018 x514fe778 x1b45df4e 439e275d x67a8e170 
66391e17 728723b1 xcded08 x2e3ed2f 515615aa 4e7015ed 236c1a1d x58a3e0d2 
x17e4e5c6 5fa4167f x5195e71e x504ce778 x7830f262 x12ece867 x5f0df142 x6838f1b7 
x2b5deeef x1b58e5e8 6f2e23c6 68681e08 49af1538 918030a x1a8d9cb x43ded8c 
xf43e569 4da62f7f 231713b9 920030c x7740e828 x2b30 2b0614b8 c1c1b19 
x1b64df41 35fa1c88 73e0198 5ac26a4 x6b95e10b x6d0ef1d2 415012c 54691d1a 74e0199 
x3dc2ef5f x7f77e8d1 8181320 30a10780 x55c8f139 1396037d x6667e175 52400a4f 
b861ad5 x2495ee8b x4bbae719 x42a6efbf x55c6f137 28821440 5c991ce1 x1632e52f 
x1d2ed1d x4d93e75b x1551e573 x7fb6f2c1 24e5141f 9130309 275f0719 34f1073f 
58fe0aa0 xf94e527 328a14e7
x25bec77b 27f314b3 x4f15e72c 60591dbe x6389e0e8 2db60710 2ee71432 410012b 
x11f1e542 x66f9dbb8 4db90a23 x4755f018 2e72075f x514fe778 x1b45df4e 439e275d 
x67a8e170 66391e17 728723b1 

RE: Can't filter

2015-12-10 Thread Бобров Виктор
Build.sbt

 

name := "spark"

version := "1.0"

scalaVersion := "2.11.7"

 

libraryDependencies ++= Seq(
  "org.apache.spark"  % "spark-core_2.11"  % "1.5.1",
  "org.apache.spark"  % "spark-streaming_2.11" % "1.5.1",
  "org.apache.spark"  % "spark-mllib_2.11" % "1.5.1"

 

 

From: Бобров Виктор [mailto:ma...@bk.ru] 
Sent: Thursday, December 10, 2015 2:54 PM
To: 'Harsh J' 
Cc: user@spark.apache.org
Subject: RE: Can't filter

 

Spark – 1.5.1, ty for help.

 

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.io.Source


object SimpleApp {
def main(args: Array[String]) {
var A = scala.collection.mutable.Map[Array[String], Int]()
val filename = "C:\\Users\\bobrov\\IdeaProjects\\spark\\file\\spark1.txt"
for((line, i) <- Source.fromFile(filename).getLines().zipWithIndex){
  val lst = line.split(" ")
  A += (lst -> i)
}

def filter1(tp: ((Array[String], Int), (Array[String], Int))): Boolean= {
  tp._1._2 < tp._2._2
  }

val conf = new 
SparkConf().setMaster("spark://web01:7077").setAppName("Simple Application")
val sc = new SparkContext(conf)
val mail_rdd = sc.parallelize(A.toSeq).cache()
val step1 = mail_rdd.cartesian(mail_rdd)
val step2 = step1.filter(filter1)
//step1.collect().foreach(println)
  }
}

 

From: Harsh J [  mailto:ha...@cloudera.com] 
Sent: Thursday, December 10, 2015 2:50 PM
To: Бобров Виктор <  ma...@bk.ru>; Ndjido Ardo Bar < 
 ndj...@gmail.com>
Cc:   user@spark.apache.org
Subject: Re: Can't filter

 

Are you sure you do not have any messages preceding the trace, such as one 
quoting which class is found to be missing? That'd be helpful to see and 
suggest what may (exactly) be going wrong. It appear similar to 
https://issues.apache.org/jira/browse/SPARK-8368, but I cannot tell for certain 
cause I don't know if your code uses the SparkSQL features.

 

Also, what version is your Spark running?

 

I am able to run your program without a problem in Spark 1.5.x (with a sample 
Seq).

 

On Thu, Dec 10, 2015 at 5:01 PM Бобров Виктор mailto:ma...@bk.ru> 
> wrote:

0 = {StackTraceElement@7132} 
"com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.a(Unknown 
Source)"

1 = {StackTraceElement@7133} 
"com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
 Source)"

2 = {StackTraceElement@7134} 
"org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)"

3 = {StackTraceElement@7135} 
"org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:81)"

4 = {StackTraceElement@7136} 
"org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187)"

5 = {StackTraceElement@7137} 
"org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)"

6 = {StackTraceElement@7138} 
"org.apache.spark.SparkContext.clean(SparkContext.scala:2030)"

7 = {StackTraceElement@7139} 
"org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:331)"

8 = {StackTraceElement@7140} 
"org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:330)"

9 = {StackTraceElement@7141} 
"org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)"

10 = {StackTraceElement@7142} 
"org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)"

11 = {StackTraceElement@7143} 
"org.apache.spark.rdd.RDD.withScope(RDD.scala:306)"

12 = {StackTraceElement@7144} "org.apache.spark.rdd.RDD.filter(RDD.scala:330)"

13 = {StackTraceElement@7145} 
"SimpleApp$GeneratedEvaluatorClass$44$1.invoke(FileToCompile0.scala:30)"

14 = {StackTraceElement@7146} "SimpleApp$.main(test1.scala:26)"

15 = {StackTraceElement@7147} "SimpleApp.main(test1.scala)"

 

From: Ndjido Ardo Bar [mailto:ndj...@gmail.com  ] 
Sent: Thursday, December 10, 2015 2:20 PM
To: Бобров Виктор mailto:ma...@bk.ru> >
Cc: user@spark.apache.org  
Subject: Re: Can't filter

 

Please send your call stack with the full description of the exception .


On 10 Dec 2015, at 12:10, Бобров Виктор mailto:ma...@bk.ru> > 
wrote:

Hi, I can’t filter my rdd.

 

def filter1(tp: ((Array[String], Int), (Array[String], Int))): Boolean= {
  tp._1._2 > tp._2._2
}
val mail_rdd = sc.parallelize(A.toSeq).cache()
val step1 = mail_rdd.cartesian(mail_rdd)
val step2 = step1.filter(filter1)

 

Get error “Class not found”. What I’m doing wrong ? Thanks for help.

 

 

 



Re: Sharing object/state accross transformations

2015-12-10 Thread JayKay
I solved the problem by passing the HLL object to the function, updating it
and returning it as new state. This was obviously a thinking barrier... ;-)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-object-state-accross-transformations-tp25544p25665.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Error Handling approach for SparkSQL queries in Spark version 1.4

2015-12-10 Thread satish chandra j
HI All,
Any inputs on error handling approach for Spark SQL or DataFrames

Thanks for all your valuable inputs in advance

Regards,
Satish Chandra


Re: Can't filter

2015-12-10 Thread Sudhanshu Janghel
be sure to mention the class name using the *--class* parameter to
spark-submit ..

I see no other reason for a "class not found" exception.


Sudhanshu

On Thu, Dec 10, 2015 at 11:50 AM, Harsh J  wrote:

> Are you sure you do not have any messages preceding the trace, such as one
> quoting which class is found to be missing? That'd be helpful to see and
> suggest what may (exactly) be going wrong. It appear similar to
> https://issues.apache.org/jira/browse/SPARK-8368, but I cannot tell for
> certain cause I don't know if your code uses the SparkSQL features.
>
> Also, what version is your Spark running?
>
> I am able to run your program without a problem in Spark 1.5.x (with a
> sample Seq).
>
> On Thu, Dec 10, 2015 at 5:01 PM Бобров Виктор  wrote:
>
>> 0 = {StackTraceElement@7132}
>> "com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.a(Unknown
>> Source)"
>>
>> 1 = {StackTraceElement@7133}
>> "com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
>> Source)"
>>
>> 2 = {StackTraceElement@7134}
>> "org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)"
>>
>> 3 = {StackTraceElement@7135}
>> "org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:81)"
>>
>> 4 = {StackTraceElement@7136}
>> "org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187)"
>>
>> 5 = {StackTraceElement@7137}
>> "org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)"
>>
>> 6 = {StackTraceElement@7138}
>> "org.apache.spark.SparkContext.clean(SparkContext.scala:2030)"
>>
>> 7 = {StackTraceElement@7139}
>> "org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:331)"
>>
>> 8 = {StackTraceElement@7140}
>> "org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:330)"
>>
>> 9 = {StackTraceElement@7141}
>> "org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)"
>>
>> 10 = {StackTraceElement@7142}
>> "org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)"
>>
>> 11 = {StackTraceElement@7143}
>> "org.apache.spark.rdd.RDD.withScope(RDD.scala:306)"
>>
>> 12 = {StackTraceElement@7144}
>> "org.apache.spark.rdd.RDD.filter(RDD.scala:330)"
>>
>> 13 = {StackTraceElement@7145}
>> "SimpleApp$GeneratedEvaluatorClass$44$1.invoke(FileToCompile0.scala:30)"
>>
>> 14 = {StackTraceElement@7146} "SimpleApp$.main(test1.scala:26)"
>>
>> 15 = {StackTraceElement@7147} "SimpleApp.main(test1.scala)"
>>
>>
>>
>> *From:* Ndjido Ardo Bar [mailto:ndj...@gmail.com]
>> *Sent:* Thursday, December 10, 2015 2:20 PM
>> *To:* Бобров Виктор 
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Can't filter
>>
>>
>>
>> Please send your call stack with the full description of the exception .
>>
>>
>> On 10 Dec 2015, at 12:10, Бобров Виктор  wrote:
>>
>> Hi, I can’t filter my rdd.
>>
>>
>>
>> *def *filter1(tp: ((Array[String], Int), (Array[String], Int))):
>> Boolean= {
>>   tp._1._2 > tp._2._2
>> }
>> *val *mail_rdd = sc.parallelize(A.toSeq).cache()
>> *val *step1 = mail_rdd.cartesian(mail_rdd)
>> *val *step2 = step1.filter(filter1)
>>
>>
>>
>> Get error “Class not found”. What I’m doing wrong ? Thanks for help.
>>
>>
>>
>>
>>
>>
>>
>>


Re: GLM in apache spark in MLlib

2015-12-10 Thread Yanbo Liang
Hi Arunkumar,

LinearRegression, LogisticRegression and AFTSurvivalRegression are parts of
GLMs, they are already parts of MLlib. Actually GLM in SparkR calling MLlib
as backend execution engine, but only "gaussian" and "binomial" family are
supported currently. MLlib will continue to improve GLMs.

Yanbo

2015-12-10 14:54 GMT+08:00 Arunkumar Pillai :

> Hi
>
> I'm started using apache spark 1.5.2 version. I'm able to see GLM using
> SparkR but it is not there in MLlib. Is there any plans or road map for
> that
>
>
>
> --
> Thanks and Regards
> Arun
>


Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-10 Thread Bonsen
I do like this "val secondData = rawData.flatMap(_.split("\t").take(3))"

and I find:
15/12/10 22:36:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
219.216.65.129): java.lang.ClassCastException: java.lang.String cannot be
cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at org.apache.spark.examples.SparkPi$$anonfun$1.apply(SparkPi.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HELP-I-get-java-lang-String-cannot-be-cast-to-java-lang-Intege-for-a-long-time-tp25666p25668.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark on EMR: out-of-the-box solution for real-time application logs monitoring?

2015-12-10 Thread Roberto Coluccio
Hello,

I'm investigating on a solution to real-time monitor Spark logs produced by
my EMR cluster in order to collect statistics and trigger alarms. Being on
EMR, I found the CloudWatch Logs + Lambda pretty straightforward and, since
I'm on AWS, those service are pretty well integrated together..but I could
just find examples about it using on standalone EC2 instances.

In my use case, EMR 3.9 and Spark 1.4.1 drivers running on YARN (cluster
mode), I would like to be able to real-time monitor Spark logs, so not just
about when the processing ends and they are copied to S3. Is there any
out-of-the-box solution or best-practice for accomplish this goal when
running on EMR that I'm not aware of?

Spark logs are written on the Data Nodes (Core Instances) local file
systems as YARN containers logs, so probably installing the awslogs agent
on them and pointing to those logfiles would help pushing such logs on
CloudWatch, but I was wondering how the community real-time monitors
application logs when running Spark on YARN on EMR.

Or maybe I'm looking at a wrong solution. Maybe the correct way would be
using something like a CloudwatchSink so to make Spark (log4j) pushing logs
directly to the sink and the sink pushing them to CloudWatch (I do like the
out-of-the-box EMR logging experience and I want to keep the usual eventual
logs archiving on S3 when the EMR cluster is terminated).

Any ideas or experience about this problem?

Thank you.

Roberto


Spark Streaming Kinesis - DynamoDB Streams compatability

2015-12-10 Thread Nick Pentreath
Hi Spark users & devs

I was just wondering if anyone out there has interest in DynamoDB Streams (
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)
as an input source for Spark Streaming Kinesis?

Because DynamoDB Streams provides an adaptor client that works with the
KCL, making this work is fairly straightforward, but would require a little
bit of work to add it to Spark Streaming Kinesis as an option. It also
requires updating the AWS SDK version.

For those using AWS heavily, there are other ways of achieving the same
outcome indirectly, the easiest of which I've found so far is using AWS
Lambdas to read from the DynamoDB Stream, (optionally) transform the
events, and write to a Kinesis stream, allowing one to just use the
existing Spark integration. Still, I'd like to know if there is sufficient
interest or demand for this among the user base to work on a PR adding
DynamoDB Streams support to Spark.

(At the same time, the implementation details happen to provide an
opportunity to address https://issues.apache.org/jira/browse/SPARK-10969,
though not sure how much need there is for that either?)

N


Spark streaming with Kinesis broken?

2015-12-10 Thread Brian London
Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The Kinesis ASL
that ships with 1.5.2 appears to not work for me although 1.5.1 is fine. I
spent some time with Amazon earlier in the week and the only thing we could
do to make it work is to change the version to 1.5.1.  Can someone please
attempt to reproduce before I open a JIRA issue for it?


Spark job submission REST API

2015-12-10 Thread mvle
Hi,

I would like to use Spark as a service through REST API calls
for uploading and submitting a job, getting results, etc.

There is a project by the folks at Ooyala:
https://github.com/spark-jobserver/spark-jobserver

I also encountered some hidden job REST APIs in Spark:
http://arturmkrtchyan.com/apache-spark-hidden-rest-api

To help determine which set of APIs to use, I would like to know
the plans for those hidden Spark APIs.
Will they be made public and supported at some point?

Thanks,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-submission-REST-API-tp25670.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Rule Engine for Spark

2015-12-10 Thread Luciano Resende
Adrian,

Do you have an example that would show how to change the filters at runtime
by goind RDD -> DF -> SQL -> RDD ? A non-working pseudo code would work.


Thank you


On Wed, Nov 4, 2015 at 3:02 AM, Adrian Tanase  wrote:

> Another way to do it is to extract your filters as SQL code and load it in
> a transform – which allows you to change the filters at runtime.
>
> Inside the transform you could apply the filters by goind RDD -> DF -> SQL
> -> RDD.
>
> Lastly, depending on how complex your filters are, you could skip SQL and
> create your own mini-DSL that runs inside transform. I’d definitely start
> here if the filter predicates are simple enough…
>
> -adrian
>
> From: Stefano Baghino
> Date: Wednesday, November 4, 2015 at 10:15 AM
> To: Cassa L
> Cc: user
> Subject: Re: Rule Engine for Spark
>
> Hi LCassa,
> unfortunately I don't have actual experience on this matter, however for a
> similar use case I have briefly evaluated Decision
>  (then called literally Streaming
> CEP Engine) and it looked interesting. I hope it may help.
>
> On Wed, Nov 4, 2015 at 1:42 AM, Cassa L  wrote:
>
>> Hi,
>>  Has anyone used rule engine with spark streaming? I have a case where
>> data is streaming from Kafka and I need to apply some rules on it (instead
>> of hard coding in a code).
>>
>> Thanks,
>> LCassa
>>
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>



-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Warning: Master endpoint spark://ip:7077 was not a REST server. Falling back to legacy submission gateway instead.

2015-12-10 Thread Andy Davidson
Hi

I am using spark-1.5.1-bin-hadoop2.6. Any idea why I get this warning. My
job seems to run with out any problem.

Kind regards 

Andy

+ /root/spark/bin/spark-submit --class com.pws.spark.streaming.IngestDriver
--master spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
--total-executor-cores 2 --deploy-mode cluster
hdfs:///home/ec2-user/build/ingest-all.jar --clusterMode --dirPath week_3

Running Spark using the REST application submission protocol.

15/12/10 16:46:33 WARN RestSubmissionClient: Unable to connect to server
spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077.

Warning: Master endpoint
ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077 was not a REST
server. Falling back to legacy submission gateway instead.




How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread unk1102
Hi I have spark job which reads Hive-ORC data and processes and generates csv
file in the end. Now this ORC files are hive partitions and I have around
2000 partitions to process every day. These hive partitions size is around
800 GB in HDFS. I have the following method code which I call it from a
thread spawn from spark driver. So in this case 2000 threads gets processed
and those runs painfully slow around 12 hours making huge data shuffling
each executor shuffles around 50 GB of data. I am using 40 executors of 4
core and 30 GB memory each. I am using Hadoop 2.6 and Spark 1.5.2 release. 

public void callThisFromThread() {
DataFrame sourceFrame =
hiveContext.read().format("orc").load("/path/in/hdfs");
DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz"));
DataFrame frameToProcess = sourceFrame.except(filterFrame1);
JavaRDD updatedRDD = frameToProcess.toJavaRDD().mapPartitions() {
.
}
DataFrame updatedFrame =
hiveContext.createDataFrame(updatedRdd,sourceFrame.schema());
DataFrame selectFrame = updatedFrame.select("col1","col2...","col8");
DataFrame groupFrame =
selectFrame.groupBy("col1","col2","col8").agg("..");//8 column group
by
groupFrame.coalesec(1).save();//save as csv only one file so coalesce(1)
}

Please guide me how can I optimize above code I cant avoid group by which is
evil I know I have to do group on 8 fields mentioned above.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Workflow manager for Spark and Spark SQL

2015-12-10 Thread Alexander Pivovarov
Hi Everyone

I'm curious what people usually use to build ETL workflows based on
DataFrames and Spark API?

In Hadoop/Hive world people usually use Oozie. Is it different in Spark
world?


Structured Vector Format

2015-12-10 Thread Hayri Volkan Agun
Hi everyone,

I have a machine learning problem with many numeric features. I need to
specify the attribute name of a specific vector index in terms of
readability of my feature selection process.

Is it possible to name the data frame vector elements like a structured
expression. I am using pipeline models...

Thanks
Hayri Volkan Agun
PhD. Student - Anadolu University


Spark 1.3.1 - Does SparkConext in multi-threaded env requires SparkEnv.set(env) anymore

2015-12-10 Thread Nirav Patel
As subject says, do we still need to use static env in every thread that
access sparkContext? I read some ref here.

http://qnalist.com/questions/4956211/is-spark-context-in-local-mode-thread-safe

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark streaming with Kinesis broken?

2015-12-10 Thread Jean-Baptiste Onofré

Hi Nick,

Just to be sure: don't you see some ClassCastException in the log ?

Thanks,
Regards
JB

On 12/10/2015 07:56 PM, Nick Pentreath wrote:

Could you provide an example / test case and more detail on what issue
you're facing?

I've just tested a simple program reading from a dev Kinesis stream and
using stream.print() to show the records, and it works under 1.5.1 but
doesn't appear to be working under 1.5.2.

UI for 1.5.2:

Inline image 1

UI for 1.5.1:

Inline image 2

On Thu, Dec 10, 2015 at 5:50 PM, Brian London mailto:brianmlon...@gmail.com>> wrote:

Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The
Kinesis ASL that ships with 1.5.2 appears to not work for me
although 1.5.1 is fine. I spent some time with Amazon earlier in the
week and the only thing we could do to make it work is to change the
version to 1.5.1.  Can someone please attempt to reproduce before I
open a JIRA issue for it?




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread Benyi Wang
DataFrame filterFrame1 =
sourceFrame.filter(col("col1").contains("xyz"));DataFrame
frameToProcess = sourceFrame.except(filterFrame1);

except is really expensive. Do you actually want this:

sourceFrame.filter(! col("col1").contains("xyz"))

​

On Thu, Dec 10, 2015 at 9:57 AM, unk1102  wrote:

> Hi I have spark job which reads Hive-ORC data and processes and generates
> csv
> file in the end. Now this ORC files are hive partitions and I have around
> 2000 partitions to process every day. These hive partitions size is around
> 800 GB in HDFS. I have the following method code which I call it from a
> thread spawn from spark driver. So in this case 2000 threads gets processed
> and those runs painfully slow around 12 hours making huge data shuffling
> each executor shuffles around 50 GB of data. I am using 40 executors of 4
> core and 30 GB memory each. I am using Hadoop 2.6 and Spark 1.5.2 release.
>
> public void callThisFromThread() {
> DataFrame sourceFrame =
> hiveContext.read().format("orc").load("/path/in/hdfs");
> DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz"));
> DataFrame frameToProcess = sourceFrame.except(filterFrame1);
> JavaRDD updatedRDD = frameToProcess.toJavaRDD().mapPartitions() {
> .
> }
> DataFrame updatedFrame =
> hiveContext.createDataFrame(updatedRdd,sourceFrame.schema());
> DataFrame selectFrame = updatedFrame.select("col1","col2...","col8");
> DataFrame groupFrame =
> selectFrame.groupBy("col1","col2","col8").agg("..");//8 column
> group
> by
> groupFrame.coalesec(1).save();//save as csv only one file so coalesce(1)
> }
>
> Please guide me how can I optimize above code I cant avoid group by which
> is
> evil I know I have to do group on 8 fields mentioned above.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread Benyi Wang
I don't understand this: "I have the following method code which I call it
from a thread spawn from spark driver. So in this case 2000 threads ..."

Why do you call it from a thread?
Are you process one partition in one thread?

On Thu, Dec 10, 2015 at 11:13 AM, Benyi Wang  wrote:

> DataFrame filterFrame1 = 
> sourceFrame.filter(col("col1").contains("xyz"));DataFrame frameToProcess = 
> sourceFrame.except(filterFrame1);
>
> except is really expensive. Do you actually want this:
>
> sourceFrame.filter(! col("col1").contains("xyz"))
>
> ​
>
> On Thu, Dec 10, 2015 at 9:57 AM, unk1102  wrote:
>
>> Hi I have spark job which reads Hive-ORC data and processes and generates
>> csv
>> file in the end. Now this ORC files are hive partitions and I have around
>> 2000 partitions to process every day. These hive partitions size is around
>> 800 GB in HDFS. I have the following method code which I call it from a
>> thread spawn from spark driver. So in this case 2000 threads gets
>> processed
>> and those runs painfully slow around 12 hours making huge data shuffling
>> each executor shuffles around 50 GB of data. I am using 40 executors of 4
>> core and 30 GB memory each. I am using Hadoop 2.6 and Spark 1.5.2 release.
>>
>> public void callThisFromThread() {
>> DataFrame sourceFrame =
>> hiveContext.read().format("orc").load("/path/in/hdfs");
>> DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz"));
>> DataFrame frameToProcess = sourceFrame.except(filterFrame1);
>> JavaRDD updatedRDD = frameToProcess.toJavaRDD().mapPartitions() {
>> .
>> }
>> DataFrame updatedFrame =
>> hiveContext.createDataFrame(updatedRdd,sourceFrame.schema());
>> DataFrame selectFrame = updatedFrame.select("col1","col2...","col8");
>> DataFrame groupFrame =
>> selectFrame.groupBy("col1","col2","col8").agg("..");//8 column
>> group
>> by
>> groupFrame.coalesec(1).save();//save as csv only one file so coalesce(1)
>> }
>>
>> Please guide me how can I optimize above code I cant avoid group by which
>> is
>> evil I know I have to do group on 8 fields mentioned above.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-10 Thread Jakob Odersky
Could you provide some more context? What is rawData?

On 10 December 2015 at 06:38, Bonsen  wrote:

> I do like this "val secondData = rawData.flatMap(_.split("\t").take(3))"
>
> and I find:
> 15/12/10 22:36:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 219.216.65.129): java.lang.ClassCastException: java.lang.String cannot be
> cast to java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
> at
> org.apache.spark.examples.SparkPi$$anonfun$1.apply(SparkPi.scala:32)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/HELP-I-get-java-lang-String-cannot-be-cast-to-java-lang-Intege-for-a-long-time-tp25666p25668.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread Umesh Kacha
Hi Benyi thanks for the reply yes I call each hive partition/ hdfs
directory in one thread so that I can make it faster if I dont use threads
then job is even more slow. Like I mentioned I have to process 2000 hive
partitions so 2000 hdfs direcotories containing ORC files right? If I dont
use threads then these 2000 directories will get processed one by one. By
using Executor Service threads I can make it faster by using thread pool of
20 jobs so that at a time 20 jobs are running in one main job.

On Fri, Dec 11, 2015 at 12:49 AM, Benyi Wang  wrote:

> I don't understand this: "I have the following method code which I call it
> from a thread spawn from spark driver. So in this case 2000 threads ..."
>
> Why do you call it from a thread?
> Are you process one partition in one thread?
>
> On Thu, Dec 10, 2015 at 11:13 AM, Benyi Wang 
> wrote:
>
>> DataFrame filterFrame1 = 
>> sourceFrame.filter(col("col1").contains("xyz"));DataFrame frameToProcess = 
>> sourceFrame.except(filterFrame1);
>>
>> except is really expensive. Do you actually want this:
>>
>> sourceFrame.filter(! col("col1").contains("xyz"))
>>
>> ​
>>
>> On Thu, Dec 10, 2015 at 9:57 AM, unk1102  wrote:
>>
>>> Hi I have spark job which reads Hive-ORC data and processes and
>>> generates csv
>>> file in the end. Now this ORC files are hive partitions and I have around
>>> 2000 partitions to process every day. These hive partitions size is
>>> around
>>> 800 GB in HDFS. I have the following method code which I call it from a
>>> thread spawn from spark driver. So in this case 2000 threads gets
>>> processed
>>> and those runs painfully slow around 12 hours making huge data shuffling
>>> each executor shuffles around 50 GB of data. I am using 40 executors of 4
>>> core and 30 GB memory each. I am using Hadoop 2.6 and Spark 1.5.2
>>> release.
>>>
>>> public void callThisFromThread() {
>>> DataFrame sourceFrame =
>>> hiveContext.read().format("orc").load("/path/in/hdfs");
>>> DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz"));
>>> DataFrame frameToProcess = sourceFrame.except(filterFrame1);
>>> JavaRDD updatedRDD = frameToProcess.toJavaRDD().mapPartitions() {
>>> .
>>> }
>>> DataFrame updatedFrame =
>>> hiveContext.createDataFrame(updatedRdd,sourceFrame.schema());
>>> DataFrame selectFrame = updatedFrame.select("col1","col2...","col8");
>>> DataFrame groupFrame =
>>> selectFrame.groupBy("col1","col2","col8").agg("..");//8 column
>>> group
>>> by
>>> groupFrame.coalesec(1).save();//save as csv only one file so coalesce(1)
>>> }
>>>
>>> Please guide me how can I optimize above code I cant avoid group by
>>> which is
>>> evil I know I have to do group on 8 fields mentioned above.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Spark streaming with Kinesis broken?

2015-12-10 Thread Brian London
Nick's symptoms sound identical to mine.  I should mention that I just
pulled the latest version from github and it seems to be working there.  To
reproduce:


   1. Download spark 1.5.2 from http://spark.apache.org/downloads.html
   2. build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests
   clean package
   3. build/mvn -Pkinesis-asl -DskipTests clean package
   4. Then run simultaneously:
   1. bin/run-example streaming.KinesisWordCountASL [Kinesis app name]
  [Kinesis stream name] [endpoint URL]
  2.   bin/run-example streaming.KinesisWordProducerASL [Kinesis stream
  name] [endpoint URL] 100 10


On Thu, Dec 10, 2015 at 2:05 PM Jean-Baptiste Onofré 
wrote:

> Hi Nick,
>
> Just to be sure: don't you see some ClassCastException in the log ?
>
> Thanks,
> Regards
> JB
>
> On 12/10/2015 07:56 PM, Nick Pentreath wrote:
> > Could you provide an example / test case and more detail on what issue
> > you're facing?
> >
> > I've just tested a simple program reading from a dev Kinesis stream and
> > using stream.print() to show the records, and it works under 1.5.1 but
> > doesn't appear to be working under 1.5.2.
> >
> > UI for 1.5.2:
> >
> > Inline image 1
> >
> > UI for 1.5.1:
> >
> > Inline image 2
> >
> > On Thu, Dec 10, 2015 at 5:50 PM, Brian London  > > wrote:
> >
> > Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The
> > Kinesis ASL that ships with 1.5.2 appears to not work for me
> > although 1.5.1 is fine. I spent some time with Amazon earlier in the
> > week and the only thing we could do to make it work is to change the
> > version to 1.5.1.  Can someone please attempt to reproduce before I
> > open a JIRA issue for it?
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[mesos][docker] addFile doesn't work properly

2015-12-10 Thread PHELIPOT, REMY
Hello!

I'm using Apache Spark with Mesos, and I've launched a job with 
coarse-mode=true. In my job, I must download a file from the internet, so I'm 
using:

import org.apache.spark.SparkFiles
sc.addFile("http://samplecsvs.s3.amazonaws.com/Sacramentorealestatetransactions.csv";)
val path = SparkFiles.get("Sacramentorealestatetransactions.csv")
val textRDD = sc.textFile(path)
... some stuff

But the job failed with the following error:

Job aborted due to stage failure: Task 1 in stage 8.0 failed 4 times, most 
recent failure: Lost task 1.3 in stage 8.0 (TID 58, slave-1): 
java.io.FileNotFoundException: File 
file:/tmp/spark-5dde1847-b433-4282-a535-57ba5e2c9b81/userFiles-0885c136-9df1-44b9-a531-343268edfb6c/Sacramentorealestatetransactions.csv
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:140)
at 
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
at 
org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:108)
at 
org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:239)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Indeed, the file is not downloaded inside the executor container. However it is 
downloaded in the driver container.

It seems spark doesn't copy this file on executor containers, can someone 
confirm this issue? Am I doing something wrong?

Kind regards,

Rémy

Ce message et toutes les pièces jointes (ci-après le "message") sont établis à 
l’intention exclusive des destinataires désignés. Il contient des informations 
confidentielles et pouvant être protégé par le secret professionnel. Si vous 
recevez ce message par erreur, merci d'en avertir immédiatement l'expéditeur et 
de détruire le message. Toute utilisation de ce message non conforme à sa 
destination, toute diffusion ou toute publication, totale ou partielle, est 
interdite, sauf autorisation expresse de l’émetteur. L'internet ne garantissant 
pas l'intégrité de ce message lors de son acheminement, Atos (et ses filiales) 
décline(nt) toute responsabilité au titre de son contenu. Bien que ce message 
ait fait l’objet d’un traitement anti-virus lors de son envoi, l’émetteur ne 
peut garantir l’absence totale de logiciels malveillants dans son contenu et ne 
pourrait être tenu pour responsable des dommages engendrés par la transmission 
de l’un d’eux.

This message and any attachments (the "message") are intended solely for the 
addressee(s). It contains confidential information, that may be privileged. If 
you receive this message in error, please notify the sender immediately and 
delete the message. Any use of the message in violation of its purpose, any 
dissemination or disclosure, either wholly or partially is strictly prohibited, 
unless it has been explicitly authorized by the sender. As its integrity cannot 
be secured on the internet, Atos and its subsidiaries decline any liability for 
the content of this message. Although the sender endeavors to maintain a 
computer virus-free network, the sender does not warrant that this transmission 
is virus-free and will not be liable for any damages resulting from any virus 
transmitted.


Replaying an RDD in spark streaming to update an accumulator

2015-12-10 Thread AliGouta
I am actually running out of options. In my spark streaming application. I
want to keep a state on some keys. I am getting events from Kafka. Then I
extract keys from the event, say userID. When there is no events coming from
Kafka I want to keep updating a counter relative to each user ID each 3
seconds, since I configured the batchduration of my StreamingContext with 3
seconds.

Now the way I am doing it might be ugly, but at least it works: I have an
accumulableCollection like this:

/al userID = ssc.sparkContext.accumulableCollection(new
mutable.HashMap[String,Long]())/

Then I create a "fake" event and keep pushing it to my spark streaming
context as the following:

/val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()
for ( i <- 1 to  100) {
  rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE"))
  Thread.sleep(3000)
}
val inputStream = ssc.queueStream(rddQueue)

inputStream.foreachRDD( UPDATE_MY_ACCUMULATOR )/

This would let me access to my accumulatorCollection and update all the
counters of all userIDs. Up to now everything works fine, however when I
change my loop from:

/for ( i <- 1 to  100) {} #This is for test/

To:

/while (true) {} #This is to let me access and update my accumulator through
the whole application life cycle/

Then when I run my ./spark-submit, my application gets stuck on this stage:

/15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager
slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1,
slave1.cluster.example, 38959)/

Any clue on how to resolve this ? Is there a pretty straightforward way that
would allow me updating the values of my userIDs (rather than creating an
unuseful RDD and pushing it periodically to the queuestream)?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Replaying-an-RDD-in-spark-streaming-to-update-an-accumulator-tp25672.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Workflow manager for Spark and Spark SQL

2015-12-10 Thread Ali Tajeldin EDU
Hi Alexander,
  We developed SMV to address the exact issue you mentioned. While it is not a 
workflow engine per-se, It does allow for the creation of modules with 
dependency and automates the execution of these modules.  See 
https://github.com/TresAmigosSD/SMV/blob/master/docs/user/smv_intro.md for an 
introduction and https://github.com/TresAmigosSD/SMV for the source.

SmvModules provide the following:
* Grouping of Dataframe operations with dependency.
* Automatic versioning of module results so that subsequent runs do not require 
re-running of entire app.
* Automatic detection of module code changes.
* Grouping of modules into stages and publishing of stage results to allow 
large independent teams to work independently.
* Module level dependency graphs.  The higher level graphs tend to show intent 
of application better than low level Relation Algebra graphs.
--
Ali


On Dec 10, 2015, at 10:50 AM, Alexander Pivovarov  wrote:

> Hi Everyone
> 
> I'm curious what people usually use to build ETL workflows based on 
> DataFrames and Spark API?
> 
> In Hadoop/Hive world people usually use Oozie. Is it different in Spark 
> world? 



Re: Spark streaming with Kinesis broken?

2015-12-10 Thread Burak Yavuz
I've noticed this happening when there was some dependency conflicts, and
it is super hard to debug.
It seems that the KinesisClientLibrary version in Spark 1.5.2 is 1.3.0, but
it is 1.2.1 in Spark 1.5.1.
I feel like that seems to be the problem...

Brian, did you verify that it works with the 1.6.0 branch?

Thanks,
Burak

On Thu, Dec 10, 2015 at 11:45 AM, Brian London 
wrote:

> Nick's symptoms sound identical to mine.  I should mention that I just
> pulled the latest version from github and it seems to be working there.  To
> reproduce:
>
>
>1. Download spark 1.5.2 from http://spark.apache.org/downloads.html
>2. build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests
>clean package
>3. build/mvn -Pkinesis-asl -DskipTests clean package
>4. Then run simultaneously:
>1. bin/run-example streaming.KinesisWordCountASL [Kinesis app name]
>   [Kinesis stream name] [endpoint URL]
>   2.   bin/run-example streaming.KinesisWordProducerASL [Kinesis
>   stream name] [endpoint URL] 100 10
>
>
> On Thu, Dec 10, 2015 at 2:05 PM Jean-Baptiste Onofré 
> wrote:
>
>> Hi Nick,
>>
>> Just to be sure: don't you see some ClassCastException in the log ?
>>
>> Thanks,
>> Regards
>> JB
>>
>> On 12/10/2015 07:56 PM, Nick Pentreath wrote:
>> > Could you provide an example / test case and more detail on what issue
>> > you're facing?
>> >
>> > I've just tested a simple program reading from a dev Kinesis stream and
>> > using stream.print() to show the records, and it works under 1.5.1 but
>> > doesn't appear to be working under 1.5.2.
>> >
>> > UI for 1.5.2:
>> >
>> > Inline image 1
>> >
>> > UI for 1.5.1:
>> >
>> > Inline image 2
>> >
>> > On Thu, Dec 10, 2015 at 5:50 PM, Brian London > > > wrote:
>> >
>> > Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The
>> > Kinesis ASL that ships with 1.5.2 appears to not work for me
>> > although 1.5.1 is fine. I spent some time with Amazon earlier in the
>> > week and the only thing we could do to make it work is to change the
>> > version to 1.5.1.  Can someone please attempt to reproduce before I
>> > open a JIRA issue for it?
>> >
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Replaying an RDD in spark streaming to update an accumulator

2015-12-10 Thread Cody Koeninger
I'm a little confused as to why you have fake events rather than just doing
foreachRDD or foreachPartition on your kafka stream and updating the
accumulator there.  I'd expect that to run each batch even if the batch had
0 kafka messages in it.


On Thu, Dec 10, 2015 at 2:05 PM, AliGouta  wrote:

> I am actually running out of options. In my spark streaming application. I
> want to keep a state on some keys. I am getting events from Kafka. Then I
> extract keys from the event, say userID. When there is no events coming
> from
> Kafka I want to keep updating a counter relative to each user ID each 3
> seconds, since I configured the batchduration of my StreamingContext with 3
> seconds.
>
> Now the way I am doing it might be ugly, but at least it works: I have an
> accumulableCollection like this:
>
> /al userID = ssc.sparkContext.accumulableCollection(new
> mutable.HashMap[String,Long]())/
>
> Then I create a "fake" event and keep pushing it to my spark streaming
> context as the following:
>
> /val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()
> for ( i <- 1 to  100) {
>   rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE"))
>   Thread.sleep(3000)
> }
> val inputStream = ssc.queueStream(rddQueue)
>
> inputStream.foreachRDD( UPDATE_MY_ACCUMULATOR )/
>
> This would let me access to my accumulatorCollection and update all the
> counters of all userIDs. Up to now everything works fine, however when I
> change my loop from:
>
> /for ( i <- 1 to  100) {} #This is for test/
>
> To:
>
> /while (true) {} #This is to let me access and update my accumulator
> through
> the whole application life cycle/
>
> Then when I run my ./spark-submit, my application gets stuck on this stage:
>
> /15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager
> slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1,
> slave1.cluster.example, 38959)/
>
> Any clue on how to resolve this ? Is there a pretty straightforward way
> that
> would allow me updating the values of my userIDs (rather than creating an
> unuseful RDD and pushing it periodically to the queuestream)?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Replaying-an-RDD-in-spark-streaming-to-update-an-accumulator-tp25672.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark on EMR: out-of-the-box solution for real-time application logs monitoring?

2015-12-10 Thread Steve Loughran

> On 10 Dec 2015, at 14:52, Roberto Coluccio  wrote:
> 
> Hello,
> 
> I'm investigating on a solution to real-time monitor Spark logs produced by 
> my EMR cluster in order to collect statistics and trigger alarms. Being on 
> EMR, I found the CloudWatch Logs + Lambda pretty straightforward and, since 
> I'm on AWS, those service are pretty well integrated together..but I could 
> just find examples about it using on standalone EC2 instances.
> 
> In my use case, EMR 3.9 and Spark 1.4.1 drivers running on YARN (cluster 
> mode), I would like to be able to real-time monitor Spark logs, so not just 
> about when the processing ends and they are copied to S3. Is there any 
> out-of-the-box solution or best-practice for accomplish this goal when 
> running on EMR that I'm not aware of?
> 
> Spark logs are written on the Data Nodes (Core Instances) local file systems 
> as YARN containers logs, so probably installing the awslogs agent on them and 
> pointing to those logfiles would help pushing such logs on CloudWatch, but I 
> was wondering how the community real-time monitors application logs when 
> running Spark on YARN on EMR.
> 
> Or maybe I'm looking at a wrong solution. Maybe the correct way would be 
> using something like a CloudwatchSink so to make Spark (log4j) pushing logs 
> directly to the sink and the sink pushing them to CloudWatch (I do like the 
> out-of-the-box EMR logging experience and I want to keep the usual eventual 
> logs archiving on S3 when the EMR cluster is terminated).
> 
> Any ideas or experience about this problem?
> 
> Thank you.
> 
> Roberto


are you talking about event logs as used by the history server, or application 
logs?

the current spark log server writes events to a file, but as the hadoop s3 fs 
client doesn't write except in close(), they won't be pushed out while thing 
are running. Someone (you?) could have a go at implementing a new event 
listener; some stuff that will come out in Spark 2.0 will make it easier to 
wire this up (SPARK-11314), which is coming as part of some work on spark-YARN 
timelineserver itnegration.

In Hadoop 2.7.1 The log4j logs can be regularly captured by the Yarn 
Nodemanagers and automatically copied out, look at 
yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds . For that to 
work you need to set up your log wildcard patterns to for the NM to locate 
(i.e. have rolling logs with the right extensions)...the details escape me 
right now

In earlier versions, you can use "yarn logs' to grab them and pull them down.

I don't know anything about cloudwatch integration, sorry

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming with Kinesis broken?

2015-12-10 Thread Brian London
Yes, it worked in the 1.6 branch as of commit
db5165246f2888537dd0f3d4c5a515875c7358ed.  That makes it much less serious
of an issue, although it would be nice to know what the root cause is to
avoid a regression.

On Thu, Dec 10, 2015 at 4:03 PM Burak Yavuz  wrote:

> I've noticed this happening when there was some dependency conflicts, and
> it is super hard to debug.
> It seems that the KinesisClientLibrary version in Spark 1.5.2 is 1.3.0,
> but it is 1.2.1 in Spark 1.5.1.
> I feel like that seems to be the problem...
>
> Brian, did you verify that it works with the 1.6.0 branch?
>
> Thanks,
> Burak
>
> On Thu, Dec 10, 2015 at 11:45 AM, Brian London 
> wrote:
>
>> Nick's symptoms sound identical to mine.  I should mention that I just
>> pulled the latest version from github and it seems to be working there.  To
>> reproduce:
>>
>>
>>1. Download spark 1.5.2 from http://spark.apache.org/downloads.html
>>2. build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests
>>clean package
>>3. build/mvn -Pkinesis-asl -DskipTests clean package
>>4. Then run simultaneously:
>>1. bin/run-example streaming.KinesisWordCountASL [Kinesis app name]
>>   [Kinesis stream name] [endpoint URL]
>>   2.   bin/run-example streaming.KinesisWordProducerASL [Kinesis
>>   stream name] [endpoint URL] 100 10
>>
>>
>> On Thu, Dec 10, 2015 at 2:05 PM Jean-Baptiste Onofré 
>> wrote:
>>
>>> Hi Nick,
>>>
>>> Just to be sure: don't you see some ClassCastException in the log ?
>>>
>>> Thanks,
>>> Regards
>>> JB
>>>
>>> On 12/10/2015 07:56 PM, Nick Pentreath wrote:
>>> > Could you provide an example / test case and more detail on what issue
>>> > you're facing?
>>> >
>>> > I've just tested a simple program reading from a dev Kinesis stream and
>>> > using stream.print() to show the records, and it works under 1.5.1 but
>>> > doesn't appear to be working under 1.5.2.
>>> >
>>> > UI for 1.5.2:
>>> >
>>> > Inline image 1
>>> >
>>> > UI for 1.5.1:
>>> >
>>> > Inline image 2
>>> >
>>> > On Thu, Dec 10, 2015 at 5:50 PM, Brian London >> > > wrote:
>>> >
>>> > Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The
>>> > Kinesis ASL that ships with 1.5.2 appears to not work for me
>>> > although 1.5.1 is fine. I spent some time with Amazon earlier in
>>> the
>>> > week and the only thing we could do to make it work is to change
>>> the
>>> > version to 1.5.1.  Can someone please attempt to reproduce before I
>>> > open a JIRA issue for it?
>>> >
>>> >
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: Spark 1.3.1 - Does SparkConext in multi-threaded env requires SparkEnv.set(env) anymore

2015-12-10 Thread Josh Rosen
Nope, you shouldn't have to do that anymore. As of
https://github.com/apache/spark/pull/2624, which is in Spark 1.2.0+,
SparkEnv's thread-local stuff was removed and replaced by a simple global
variable (since it was used in an *effectively* global way before (see my
comments on that PR)). As a result, there shouldn't really be any need for
you to call SparkEnv.set(env) in your user threads anymore.

On Thu, Dec 10, 2015 at 11:03 AM, Nirav Patel  wrote:

> As subject says, do we still need to use static env in every thread that
> access sparkContext? I read some ref here.
>
>
> http://qnalist.com/questions/4956211/is-spark-context-in-local-mode-thread-safe
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: Replaying an RDD in spark streaming to update an accumulator

2015-12-10 Thread Ali Gouta
Indeed, you are right! I felt like I was missing or misunderstanding
something.
Thank you so much!

Ali Gouta.

On Thu, Dec 10, 2015 at 10:04 PM, Cody Koeninger  wrote:

> I'm a little confused as to why you have fake events rather than just
> doing foreachRDD or foreachPartition on your kafka stream and updating the
> accumulator there.  I'd expect that to run each batch even if the batch had
> 0 kafka messages in it.
>
>
> On Thu, Dec 10, 2015 at 2:05 PM, AliGouta  wrote:
>
>> I am actually running out of options. In my spark streaming application. I
>> want to keep a state on some keys. I am getting events from Kafka. Then I
>> extract keys from the event, say userID. When there is no events coming
>> from
>> Kafka I want to keep updating a counter relative to each user ID each 3
>> seconds, since I configured the batchduration of my StreamingContext with
>> 3
>> seconds.
>>
>> Now the way I am doing it might be ugly, but at least it works: I have an
>> accumulableCollection like this:
>>
>> /al userID = ssc.sparkContext.accumulableCollection(new
>> mutable.HashMap[String,Long]())/
>>
>> Then I create a "fake" event and keep pushing it to my spark streaming
>> context as the following:
>>
>> /val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()
>> for ( i <- 1 to  100) {
>>   rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE"))
>>   Thread.sleep(3000)
>> }
>> val inputStream = ssc.queueStream(rddQueue)
>>
>> inputStream.foreachRDD( UPDATE_MY_ACCUMULATOR )/
>>
>> This would let me access to my accumulatorCollection and update all the
>> counters of all userIDs. Up to now everything works fine, however when I
>> change my loop from:
>>
>> /for ( i <- 1 to  100) {} #This is for test/
>>
>> To:
>>
>> /while (true) {} #This is to let me access and update my accumulator
>> through
>> the whole application life cycle/
>>
>> Then when I run my ./spark-submit, my application gets stuck on this
>> stage:
>>
>> /15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager
>> slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1,
>> slave1.cluster.example, 38959)/
>>
>> Any clue on how to resolve this ? Is there a pretty straightforward way
>> that
>> would allow me updating the values of my userIDs (rather than creating an
>> unuseful RDD and pushing it periodically to the queuestream)?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Replaying-an-RDD-in-spark-streaming-to-update-an-accumulator-tp25672.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: DataFrame creation delay?

2015-12-10 Thread Isabelle Phan
Hi Michael,

We have just upgraded to Spark 1.5.0 (actually 1.5.0_cdh-5.5 since we are
on cloudera), and Parquet formatted tables. I turned on  spark
.sql.hive.metastorePartitionPruning=true, but DataFrame creation still
takes a long time.
Is there any other configuration to consider?


Thanks a lot for your help,

Isabelle

On Fri, Sep 4, 2015 at 1:42 PM, Michael Armbrust 
wrote:

> If you run sqlContext.table("...").registerTempTable("...") that
> temptable will cache the lookup of partitions.
>
> On Fri, Sep 4, 2015 at 1:16 PM, Isabelle Phan  wrote:
>
>> Hi Michael,
>>
>> Thanks a lot for your reply.
>>
>> This table is stored as text file with tab delimited columns.
>>
>> You are correct, the problem is because my table has too many partitions
>> (1825 in total). Since I am on Spark 1.4, I think I am hitting bug 6984
>> .
>>
>> Not sure when my company can move to 1.5. Would you know some workaround
>> for this bug?
>> If I cannot find workaround for this, will have to change our schema
>> design to reduce number of partitions.
>>
>>
>> Thanks,
>>
>> Isabelle
>>
>>
>>
>> On Fri, Sep 4, 2015 at 12:56 PM, Michael Armbrust > > wrote:
>>
>>> Also, do you mean two partitions or two partition columns?  If there are
>>> many partitions it can be much slower.  In Spark 1.5 I'd consider setting 
>>> spark.sql.hive.metastorePartitionPruning=true
>>> if you have predicates over the partition columns.
>>>
>>> On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 What format is this table.  For parquet and other optimized formats we
 cache a bunch of file metadata on first access to make interactive queries
 faster.

 On Thu, Sep 3, 2015 at 8:17 PM, Isabelle Phan 
 wrote:

> Hello,
>
> I am using SparkSQL to query some Hive tables. Most of the time, when
> I create a DataFrame using sqlContext.sql("select * from table") command,
> DataFrame creation is less than 0.5 second.
> But I have this one table with which it takes almost 12 seconds!
>
> scala>  val start = scala.compat.Platform.currentTime; val logs =
> sqlContext.sql("select * from temp.log"); val execution =
> scala.compat.Platform.currentTime - start
> 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from
> temp.log
> 15/09/04 12:07:02 INFO ParseDriver: Parse Completed
> start: Long = 1441336022731
> logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int,
> log_time: string, tag: string, dt: string, test_id: int]
> execution: Long = *11567*
>
> This table has 3.6 B rows, and 2 partitions (on dt and test_id
> columns).
> I have created DataFrames on even larger tables and do not see such
> delay.
> So my questions are:
> - What can impact DataFrame creation time?
> - Is it related to the table partitions?
>
>
> Thanks much your help!
>
> Isabelle
>


>>>
>>
>


RE: Graph visualization tool for GraphX

2015-12-10 Thread Lin, Hao
Hi Andy, quick question, does Spark-Notebook include its own Spark engine, or I 
need to install Spark separately and point to it from Spark Notebook? thanks

From: Lin, Hao [mailto:hao@finra.org]
Sent: Tuesday, December 08, 2015 7:01 PM
To: andy petrella; Jörn Franke
Cc: user@spark.apache.org
Subject: RE: Graph visualization tool for GraphX

Thanks Andy, I certainly will give a try to your suggestion.

From: andy petrella [mailto:andy.petre...@gmail.com]
Sent: Tuesday, December 08, 2015 1:21 PM
To: Lin, Hao; Jörn Franke
Cc: user@spark.apache.org
Subject: Re: Graph visualization tool for GraphX

Hello Lin,

This is indeed a tough scenario when you have many vertices (and even worst) 
many edges...

So two-fold answer:
First, technically, there is a graph plotting support in the spark notebook 
(https://github.com/andypetrella/spark-notebook/[github.com]
 → check this notebook: 
https://github.com/andypetrella/spark-notebook/blob/master/notebooks/viz/Graph%20Plots.snb[github.com]).
 You can plot graph from scala, which will convert to D3 with force layout 
force field.
The number or the points which you will plot are "sampled" using a `Sampler` 
that you can provide yourself. Which leads to the second fold of this answer.

Plotting a large graph is rather tough because there is no real notion of 
dimension... there is always the option to dig the topological analysis theory 
to find good homeomorphism ... but won't be that efficient ;-D.
Best is to find a good approach to generalize/summarize the information, there 
are many many techniques (that you can find in mainly geospatial viz and 
biology viz theories...)
Best is to check what will match your need the fastest.
There are quick techniques like using unsupervised clustering models and then 
plot a voronoi diagram (which can be approached using force layout).

In general term I might say that multiscaling is intuitively what you want 
first: this is an interesting paper presenting the foundations: 
https://www.cs.ubc.ca/~tmm/courses/533-07/readings/auberIV03Seattle.pdf[cs.ubc.ca]

Oh and BTW, to end this longish mail, while looking for new papers on that, I 
felt on this one: 
http://vacommunity.org/egas2015/papers/IGAS2015-ScottLangevin.pdf[vacommunity.org]
 which is using
1. Spark !!!
2. a tile based approach (~ to tiling + pyramids in geospatial)

HTH

PS regarding the Spark Notebook, you can always come and discuss on gitter: 
https://gitter.im/andypetrella/spark-notebook[gitter.im]


On Tue, Dec 8, 2015 at 6:30 PM Lin, Hao 
mailto:hao@finra.org>> wrote:
Hello Jorn,

Thank you for the reply and being tolerant of my over simplified question. I 
should’ve been more specific.  Though ~TB of data, there will be about billions 
of records (edges) and 100,000 nodes. We need to visualize the social networks 
graph like what can be done by Gephi which has limitation on scalability to 
handle such amount of data. There will be dozens of users to access and the 
response time is also critical.  We would like to run the visualization tool on 
the remote ec2 server where webtool can be a good choice for us.

Please let me know if I need to be more specific ☺.  Thanks
hao

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Tuesday, December 08, 2015 11:31 AM
To: Lin, Hao
Cc: user@spark.apache.org
Subject: Re: Graph visualization tool for GraphX

I am not sure about your use case. How should a human interpret many terabytes 
of data in one lar

Re: [mesos][docker] addFile doesn't work properly

2015-12-10 Thread PhuDuc Nguyen
Have you tried setting spark.mesos.uri property like

val conf = new SparkConf().set("spark.mesos.uris", ...)
val sc = new SparkContext(conf)
...

http://spark.apache.org/docs/latest/running-on-mesos.html

HTH,
Duc







On Thu, Dec 10, 2015 at 1:04 PM, PHELIPOT, REMY 
wrote:

> Hello!
>
> I'm using Apache Spark with Mesos, and I've launched a job with
> coarse-mode=true. In my job, I must download a file from the internet, so
> I'm using:
>
> import org.apache.spark.SparkFiles
> sc.addFile("
> http://samplecsvs.s3.amazonaws.com/Sacramentorealestatetransactions.csv";)
> val path = SparkFiles.get("Sacramentorealestatetransactions.csv")
> val textRDD = sc.textFile(path)
> ... some stuff
>
> But the job failed with the following error:
>
> Job aborted due to stage failure: Task 1 in stage 8.0 failed 4 times, most 
> recent failure: Lost task 1.3 in stage 8.0 (TID 58, slave-1): 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-5dde1847-b433-4282-a535-57ba5e2c9b81/userFiles-0885c136-9df1-44b9-a531-343268edfb6c/Sacramentorealestatetransactions.csv
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:140)
>   at 
> org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:108)
>   at 
> org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:239)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
> Indeed, the file is not downloaded inside the executor container. However it 
> is downloaded in the driver container.
>
> It seems spark doesn't copy this file on executor containers, can someone 
> confirm this issue? Am I doing something wrong?
>
> Kind regards,
>
> Rémy
>
> Ce message et toutes les pièces jointes (ci-après le "message") sont
> établis à l’intention exclusive des destinataires désignés. Il contient des
> informations confidentielles et pouvant être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de détruire le message. Toute utilisation de
> ce message non conforme à sa destination, toute diffusion ou toute
> publication, totale ou partielle, est interdite, sauf autorisation expresse
> de l’émetteur. L'internet ne garantissant pas l'intégrité de ce message
> lors de son acheminement, Atos (et ses filiales) décline(nt) toute
> responsabilité au titre de son contenu. Bien que ce message ait fait
> l’objet d’un traitement anti-virus lors de son envoi, l’émetteur ne peut
> garantir l’absence totale de logiciels malveillants dans son contenu et ne
> pourrait être tenu pour responsable des dommages engendrés par la
> transmission de l’un d’eux.
>
> This message and any attachments (the "message") are intended solely for
> the addressee(s). It contains confidential information, that may be
> privileged. If you receive this message in error, please notify the sender
> immediately and delete the message. Any use of the message in violation of
> its purpose, any dissemination or disclosure, either wholly or partially is
> strictly prohibited, unless it has been explicitly authorized by the
> sender. As its integrity cannot be secured on the internet, Atos and its
> subsidiaries decline any liability for the content of this message.
> Although the sender endeavors to maintain a computer virus-free network,
> the sender does not warrant that this transmission is virus-free and will
> not be liable for any dam

Re: StackOverflowError when writing dataframe to table

2015-12-10 Thread Jakob Odersky
Can you give us some more info about the dataframe and caching? Ideally a
set of steps to reproduce the issue


On 9 December 2015 at 14:59, apu mishra . rr  wrote:

> The command
>
> mydataframe.write.saveAsTable(name="tablename")
>
> sometimes results in java.lang.StackOverflowError (see below for fuller
> error message).
>
> This is after I am able to successfully run cache() and show() methods on
> mydataframe.
>
> The issue is not deterministic, i.e. the same code sometimes works fine,
> sometimes not.
>
> I am running PySpark with:
>
> spark-submit --master local[*] --driver-memory 24g --executor-memory 24g
>
> Any help understanding this issue would be appreciated!
>
> Thanks, Apu
>
> Fuller error message:
>
> Exception in thread "dag-scheduler-event-loop" java.lang.StackOverflowError
>
> at
> java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2281)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1428)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>
> at
> scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
>
> at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>
> at
> scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
>
> at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea

Re: Warning: Master endpoint spark://ip:7077 was not a REST server. Falling back to legacy submission gateway instead.

2015-12-10 Thread Jakob Odersky
Is there any other process using port 7077?

On 10 December 2015 at 08:52, Andy Davidson 
wrote:

> Hi
>
> I am using spark-1.5.1-bin-hadoop2.6. Any idea why I get this warning. My
> job seems to run with out any problem.
>
> Kind regards
>
> Andy
>
> + /root/spark/bin/spark-submit --class
> com.pws.spark.streaming.IngestDriver --master spark://
> ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
> --total-executor-cores 2 --deploy-mode cluster
> hdfs:///home/ec2-user/build/ingest-all.jar --clusterMode --dirPath week_3
>
> Running Spark using the REST application submission protocol.
>
> 15/12/10 16:46:33 WARN RestSubmissionClient: Unable to connect to server
> spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077.
>
> Warning: Master endpoint
> ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077 was not a REST
> server. Falling back to legacy submission gateway instead.
>


architecture though experiment: what is the advantage of using kafka with spark streaming?

2015-12-10 Thread Andy Davidson
I noticed that many people are using Kafka and spark streaming. Can some one
provide a couple of use case

I image some possible use cases might be

Is the purpose using  Kafka
1. provide some buffering?
2. implementing some sort of load balancing for the over all system?
3. Provide filtering /sorting of data?
4. Simplify client connection. Easy for thousands of producers to connect to
kafka. Probably hard to do with spark streaming
5. ???
Kind regards

Andy




Re: Warning: Master endpoint spark://ip:7077 was not a REST server. Falling back to legacy submission gateway instead.

2015-12-10 Thread Andy Davidson
Hi Jakob

The cluster was set up using the spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2
script

Given my limited knowledge I think this looks okay?

Thanks

Andy

$ sudo netstat -peant | grep 7077

tcp0  0 :::172-31-30-51:7077:::*
LISTEN  0  311641427355/java

tcp0  0 :::172-31-30-51:7077:::172-31-30-51:57311
ESTABLISHED 0  311591927355/java

tcp0  0 :::172-31-30-51:7077:::172-31-30-51:42333
ESTABLISHED 0  373666427355/java

tcp0  0 :::172-31-30-51:7077:::172-31-30-51:49796
ESTABLISHED 0  311592527355/java

tcp0  0 :::172-31-30-51:7077:::172-31-30-51:42290
ESTABLISHED 0  311592327355/java



$ ps -aux | grep 27355

Warning: bad syntax, perhaps a bogus '-'? See
/usr/share/doc/procps-3.2.8/FAQ

ec2-user 23867  0.0  0.0 110404   872 pts/0S+   02:06   0:00 grep 27355

root 27355  0.5  6.7 3679096 515836 ?  Sl   Nov26 107:04
/usr/java/latest/bin/java -cp
/root/spark/sbin/../conf/:/root/spark/lib/spark-assembly-1.5.1-hadoop1.2.1.j
ar:/root/spark/lib/datanucleus-api-jdo-3.2.6.jar:/root/spark/lib/datanucleus
-rdbms-3.2.9.jar:/root/spark/lib/datanucleus-core-3.2.10.jar:/root/ephemeral
-hdfs/conf/ -Xms1g -Xmx1g org.apache.spark.deploy.master.Master --ip
ec2-54-215-217-122.us-west-1.compute.amazonaws.com --port 7077 --webui-port
8080


From:  Jakob Odersky 
Date:  Thursday, December 10, 2015 at 5:55 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: Warning: Master endpoint spark://ip:7077 was not a REST
server. Falling back to legacy submission gateway instead.

> Is there any other process using port 7077?
> 
> On 10 December 2015 at 08:52, Andy Davidson 
> wrote:
>> Hi
>> 
>> I am using spark-1.5.1-bin-hadoop2.6. Any idea why I get this warning. My job
>> seems to run with out any problem.
>> 
>> Kind regards 
>> 
>> Andy
>> 
>> + /root/spark/bin/spark-submit --class com.pws.spark.streaming.IngestDriver
>> --master spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>> 
>> --total-executor-cores 2 --deploy-mode cluster
>> hdfs:///home/ec2-user/build/ingest-all.jar --clusterMode --dirPath week_3
>> 
>> Running Spark using the REST application submission protocol.
>> 
>> 15/12/10 16:46:33 WARN RestSubmissionClient: Unable to connect to server
>> spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>>  .
>> 
>> Warning: Master endpoint
>> ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>>   was not a
>> REST server. Falling back to legacy submission gateway instead.
> 




Re: How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread manasdebashiskar
Have you tried persisting sourceFrame in (MEMORY_AND_DISK)?
May be you can cache updatedRDD which gets used in next two lines.

Are you sure you are paying the performance penalty because of shuffling
only?
Yes, group by is a killer. How much time does your code spend it GC?

Can't tell if your group by is actually unavoidable but there are times when
the data is temporal and operations need just one element before or after,
zipwithIndex and reduce may be used to avoid the group by call.


..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671p25673.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Warning: Master endpoint spark://ip:7077 was not a REST server. Falling back to legacy submission gateway instead.

2015-12-10 Thread Andrew Or
Hi Andy,

You must be running in cluster mode. The Spark Master accepts client mode
submissions on port 7077 and cluster mode submissions on port 6066. This is
because standalone cluster mode uses a REST API to submit applications by
default. If you submit to port 6066 instead the warning should go away.

-Andrew


2015-12-10 18:13 GMT-08:00 Andy Davidson :

> Hi Jakob
>
> The cluster was set up using the spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2
> script
>
> Given my limited knowledge I think this looks okay?
>
> Thanks
>
> Andy
>
> $ sudo netstat -peant | grep 7077
>
> tcp0  0 :::172-31-30-51:7077:::*
>  LISTEN  0  311641427355/java
>
> tcp0  0 :::172-31-30-51:7077:::172-31-30-51:57311
> ESTABLISHED 0  311591927355/java
>
> tcp0  0 :::172-31-30-51:7077:::172-31-30-51:42333
> ESTABLISHED 0  373666427355/java
>
> tcp0  0 :::172-31-30-51:7077:::172-31-30-51:49796
> ESTABLISHED 0  311592527355/java
>
> tcp0  0 :::172-31-30-51:7077:::172-31-30-51:42290
> ESTABLISHED 0  311592327355/java
>
>
> $ ps -aux | grep 27355
>
> Warning: bad syntax, perhaps a bogus '-'? See
> /usr/share/doc/procps-3.2.8/FAQ
>
> ec2-user 23867  0.0  0.0 110404   872 pts/0S+   02:06   0:00 grep 27355
>
> root 27355  0.5  6.7 3679096 515836 ?  Sl   Nov26 107:04
> /usr/java/latest/bin/java -cp
> /root/spark/sbin/../conf/:/root/spark/lib/spark-assembly-1.5.1-hadoop1.2.1.jar:/root/spark/lib/datanucleus-api-jdo-3.2.6.jar:/root/spark/lib/datanucleus-rdbms-3.2.9.jar:/root/spark/lib/datanucleus-core-3.2.10.jar:/root/ephemeral-hdfs/conf/
> -Xms1g -Xmx1g org.apache.spark.deploy.master.Master --ip
> ec2-54-215-217-122.us-west-1.compute.amazonaws.com --port 7077
> --webui-port 8080
>
> From: Jakob Odersky 
> Date: Thursday, December 10, 2015 at 5:55 PM
> To: Andrew Davidson 
> Cc: "user @spark" 
> Subject: Re: Warning: Master endpoint spark://ip:7077 was not a REST
> server. Falling back to legacy submission gateway instead.
>
> Is there any other process using port 7077?
>
> On 10 December 2015 at 08:52, Andy Davidson  > wrote:
>
>> Hi
>>
>> I am using spark-1.5.1-bin-hadoop2.6. Any idea why I get this warning.
>> My job seems to run with out any problem.
>>
>> Kind regards
>>
>> Andy
>>
>> + /root/spark/bin/spark-submit --class
>> com.pws.spark.streaming.IngestDriver --master spark://
>> ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>> --total-executor-cores 2 --deploy-mode cluster
>> hdfs:///home/ec2-user/build/ingest-all.jar --clusterMode --dirPath week_3
>>
>> Running Spark using the REST application submission protocol.
>>
>> 15/12/10 16:46:33 WARN RestSubmissionClient: Unable to connect to server
>> spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077.
>>
>> Warning: Master endpoint
>> ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077 was not a REST
>> server. Falling back to legacy submission gateway instead.
>>
>
>


Re: Spark job submission REST API

2015-12-10 Thread Andrew Or
Hello,

The hidden API was implemented for use internally and there are no plans to
make it public at this point. It was originally introduced to provide
backward compatibility in submission protocol across multiple versions of
Spark. A full-fledged stable REST API for submitting applications would
require a detailed design consensus among the community.

-Andrew

2015-12-10 8:26 GMT-08:00 mvle :

> Hi,
>
> I would like to use Spark as a service through REST API calls
> for uploading and submitting a job, getting results, etc.
>
> There is a project by the folks at Ooyala:
> https://github.com/spark-jobserver/spark-jobserver
>
> I also encountered some hidden job REST APIs in Spark:
> http://arturmkrtchyan.com/apache-spark-hidden-rest-api
>
> To help determine which set of APIs to use, I would like to know
> the plans for those hidden Spark APIs.
> Will they be made public and supported at some point?
>
> Thanks,
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-submission-REST-API-tp25670.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark job submission REST API

2015-12-10 Thread Harsh J
You could take a look at Livy also:
https://github.com/cloudera/livy#welcome-to-livy-the-rest-spark-server

On Fri, Dec 11, 2015 at 8:17 AM Andrew Or  wrote:

> Hello,
>
> The hidden API was implemented for use internally and there are no plans
> to make it public at this point. It was originally introduced to provide
> backward compatibility in submission protocol across multiple versions of
> Spark. A full-fledged stable REST API for submitting applications would
> require a detailed design consensus among the community.
>
> -Andrew
>
> 2015-12-10 8:26 GMT-08:00 mvle :
>
>> Hi,
>>
>> I would like to use Spark as a service through REST API calls
>> for uploading and submitting a job, getting results, etc.
>>
>> There is a project by the folks at Ooyala:
>> https://github.com/spark-jobserver/spark-jobserver
>>
>> I also encountered some hidden job REST APIs in Spark:
>> http://arturmkrtchyan.com/apache-spark-hidden-rest-api
>>
>> To help determine which set of APIs to use, I would like to know
>> the plans for those hidden Spark APIs.
>> Will they be made public and supported at some point?
>>
>> Thanks,
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-submission-REST-API-tp25670.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark job submission REST API

2015-12-10 Thread manasdebashiskar
We use ooyala job server. It is great. It has a great set of api's to cancel
job. Create adhoc or persistent context etc.
It has great support in remote deploy and tests too which helps faster
coding.

The current version is missing job progress bar but I could not find the
same in the hidden spark api's either.

In any case I think job server is better than the hidden api's because it is
not hidden.

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-submission-REST-API-tp25670p25674.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help: Get Timeout error and FileNotFoundException when shuffling large files

2015-12-10 Thread manasdebashiskar
Is that the only kind of error you are getting.
Is it possible something else dies that gets buried in other messages.
Try repairing HDFS (fsck etc) to find if blocks are intact.

Few things to check 
1) if you have too many small files.
2) Is your system complaining about too many inode etc..
3) Try smaller set while increasing the data set size to make sure it is
data volume related problem.
4) If you have monitoring turned on see what your driver, worker machines
cpu and disk io.
5) Have you tried increasing Driver memory(more partitions means driver
needs more memory to keep the metadata)

..Manas





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-Get-Timeout-error-and-FileNotFoundException-when-shuffling-large-files-tp25662p25675.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame: Compare each row to every other row?

2015-12-10 Thread manasdebashiskar
You can use the evil "group by key" and use a conventional method to compare
against each row with in that iterable.
If your similarity function is a n-1 iterable results for n input then you
can use a flatmap to do all that stuff on worker side.
spark also has cartesian product that might help in your case. Though for
500 M Record it won't be performant.

..Manas





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-Compare-each-row-to-every-other-row-tp25639p25676.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Comparisons between Ganglia and Graphite for monitoring the Streaming Cluster?

2015-12-10 Thread manasdebashiskar
We use graphite monitoring.
Currently we miss having email notifications for an alert. Not sure Ganglia
has the same caveat.

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Comparisons-between-Ganglia-and-Graphite-for-monitoring-the-Streaming-Cluster-tp25635p25677.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame creation delay?

2015-12-10 Thread Harsh J
The option of "spark.sql.hive.metastorePartitionPruning=true" will not work
unless you have a partition column predicate in your query. Your query of
"select * from temp.log" does not do this. The slowdown appears to be due
to the need of loading all partition metadata.

Have you also tried to see if Michael's temp-table suggestion helps you
cache the expensive partition lookup? (re-quoted below)

"""
If you run sqlContext.table("...").registerTempTable("...") that temptable
will cache the lookup of partitions [the first time is slow, but subsequent
lookups will be faster].
""" - X-Ref: Permalink


Also, do you absolutely need to use "select * from temp.log"? Adding a
where clause to the query with a partition condition will help Spark prune
the request to just the required partitions (vs. all, which is proving
expensive).

On Fri, Dec 11, 2015 at 3:59 AM Isabelle Phan  wrote:

> Hi Michael,
>
> We have just upgraded to Spark 1.5.0 (actually 1.5.0_cdh-5.5 since we are
> on cloudera), and Parquet formatted tables. I turned on  spark
> .sql.hive.metastorePartitionPruning=true, but DataFrame creation still
> takes a long time.
> Is there any other configuration to consider?
>
>
> Thanks a lot for your help,
>
> Isabelle
>
> On Fri, Sep 4, 2015 at 1:42 PM, Michael Armbrust 
> wrote:
>
>> If you run sqlContext.table("...").registerTempTable("...") that
>> temptable will cache the lookup of partitions.
>>
>> On Fri, Sep 4, 2015 at 1:16 PM, Isabelle Phan  wrote:
>>
>>> Hi Michael,
>>>
>>> Thanks a lot for your reply.
>>>
>>> This table is stored as text file with tab delimited columns.
>>>
>>> You are correct, the problem is because my table has too many partitions
>>> (1825 in total). Since I am on Spark 1.4, I think I am hitting bug 6984
>>> .
>>>
>>> Not sure when my company can move to 1.5. Would you know some workaround
>>> for this bug?
>>> If I cannot find workaround for this, will have to change our schema
>>> design to reduce number of partitions.
>>>
>>>
>>> Thanks,
>>>
>>> Isabelle
>>>
>>>
>>>
>>> On Fri, Sep 4, 2015 at 12:56 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Also, do you mean two partitions or two partition columns?  If there
 are many partitions it can be much slower.  In Spark 1.5 I'd consider
 setting spark.sql.hive.metastorePartitionPruning=true if you have
 predicates over the partition columns.

 On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> What format is this table.  For parquet and other optimized formats we
> cache a bunch of file metadata on first access to make interactive queries
> faster.
>
> On Thu, Sep 3, 2015 at 8:17 PM, Isabelle Phan 
> wrote:
>
>> Hello,
>>
>> I am using SparkSQL to query some Hive tables. Most of the time, when
>> I create a DataFrame using sqlContext.sql("select * from table") command,
>> DataFrame creation is less than 0.5 second.
>> But I have this one table with which it takes almost 12 seconds!
>>
>> scala>  val start = scala.compat.Platform.currentTime; val logs =
>> sqlContext.sql("select * from temp.log"); val execution =
>> scala.compat.Platform.currentTime - start
>> 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from
>> temp.log
>> 15/09/04 12:07:02 INFO ParseDriver: Parse Completed
>> start: Long = 1441336022731
>> logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int,
>> log_time: string, tag: string, dt: string, test_id: int]
>> execution: Long = *11567*
>>
>> This table has 3.6 B rows, and 2 partitions (on dt and test_id
>> columns).
>> I have created DataFrames on even larger tables and do not see such
>> delay.
>> So my questions are:
>> - What can impact DataFrame creation time?
>> - Is it related to the table partitions?
>>
>>
>> Thanks much your help!
>>
>> Isabelle
>>
>
>

>>>
>>
>


Re: Kryo Serialization in Spark

2015-12-10 Thread manasdebashiskar
Are you sure you are using Kryo serialization. 
You are getting a java serialization error.
Are you setting up your sparkcontext with kryo serialization enabled?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-Serialization-in-Spark-tp25628p25678.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark sql random number or sequence numbers ?

2015-12-10 Thread manasdebashiskar
use zipwithIndex to achieve the same behavior.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-random-number-or-sequence-numbers-tp25623p25679.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to change StreamingContext batch duration after loading from checkpoint

2015-12-10 Thread manasdebashiskar
Not sure what is your requirement there, but if you have a 2 second streaming
batch , you can create a 4 second stream out of it but the other way is not
possible.
Basically you can create one stream out of another stream.

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-change-StreamingContext-batch-duration-after-loading-from-checkpoint-tp25624p25680.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: State management in spark-streaming

2015-12-10 Thread manasdebashiskar
Have you taken a look at trackStateBykey in spark streaming (coming in spark
1.6)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/State-management-in-spark-streaming-tp25608p25681.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Shuffle to Disk

2015-12-10 Thread manasdebashiskar
how often do you checkpoint?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shuffle-to-Disk-tp25567p25682.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "Address already in use" after many streams on Kafka

2015-12-10 Thread manasdebashiskar
you can provide spark ui port at while executing your context. spark.ui.port
can be set to different port.

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Address-already-in-use-after-many-streams-on-Kafka-tp25545p25683.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does Spark SQL have to scan all the columns of a table in text format?

2015-12-10 Thread manasdebashiskar
Yes, 
 Text file is schema less. Spark does not know what to skip so it will read
everything.
 Parquet as you have stated is capable of taking advantage of predicate push
down.

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-SQL-have-to-scan-all-the-columns-of-a-table-in-text-format-tp25505p25684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to control number of parquet files generated when using partitionBy

2015-12-10 Thread manasdebashiskar
partitionBy is a suggestive field.
If your value is bigger then what spark calculates(based on the obvious you
stated) your value will be used.
But repartition is a forced shuffle (but give me exactly required number of
partition) operation.
You might have noticed that repartition caused a bit of delay(due to
shuffling)

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-number-of-parquet-files-generated-when-using-partitionBy-tp25436p25685.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming with Kinesis broken?

2015-12-10 Thread Nick Pentreath
Yup also works for me on master branch as I've been testing DynamoDB Streams 
integration. In fact works with latest KCL 1.6.1 also which I was using.




So theKCL version does seem like it could be the issue - somewhere along the 
line an exception must be getting swallowed. Though the tests should have 
picked this up? Will dig deeper.




—
Sent from Mailbox

On Thu, Dec 10, 2015 at 11:07 PM, Brian London 
wrote:

> Yes, it worked in the 1.6 branch as of commit
> db5165246f2888537dd0f3d4c5a515875c7358ed.  That makes it much less serious
> of an issue, although it would be nice to know what the root cause is to
> avoid a regression.
> On Thu, Dec 10, 2015 at 4:03 PM Burak Yavuz  wrote:
>> I've noticed this happening when there was some dependency conflicts, and
>> it is super hard to debug.
>> It seems that the KinesisClientLibrary version in Spark 1.5.2 is 1.3.0,
>> but it is 1.2.1 in Spark 1.5.1.
>> I feel like that seems to be the problem...
>>
>> Brian, did you verify that it works with the 1.6.0 branch?
>>
>> Thanks,
>> Burak
>>
>> On Thu, Dec 10, 2015 at 11:45 AM, Brian London 
>> wrote:
>>
>>> Nick's symptoms sound identical to mine.  I should mention that I just
>>> pulled the latest version from github and it seems to be working there.  To
>>> reproduce:
>>>
>>>
>>>1. Download spark 1.5.2 from http://spark.apache.org/downloads.html
>>>2. build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests
>>>clean package
>>>3. build/mvn -Pkinesis-asl -DskipTests clean package
>>>4. Then run simultaneously:
>>>1. bin/run-example streaming.KinesisWordCountASL [Kinesis app name]
>>>   [Kinesis stream name] [endpoint URL]
>>>   2.   bin/run-example streaming.KinesisWordProducerASL [Kinesis
>>>   stream name] [endpoint URL] 100 10
>>>
>>>
>>> On Thu, Dec 10, 2015 at 2:05 PM Jean-Baptiste Onofré 
>>> wrote:
>>>
 Hi Nick,

 Just to be sure: don't you see some ClassCastException in the log ?

 Thanks,
 Regards
 JB

 On 12/10/2015 07:56 PM, Nick Pentreath wrote:
 > Could you provide an example / test case and more detail on what issue
 > you're facing?
 >
 > I've just tested a simple program reading from a dev Kinesis stream and
 > using stream.print() to show the records, and it works under 1.5.1 but
 > doesn't appear to be working under 1.5.2.
 >
 > UI for 1.5.2:
 >
 > Inline image 1
 >
 > UI for 1.5.1:
 >
 > Inline image 2
 >
 > On Thu, Dec 10, 2015 at 5:50 PM, Brian London >>> > > wrote:
 >
 > Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The
 > Kinesis ASL that ships with 1.5.2 appears to not work for me
 > although 1.5.1 is fine. I spent some time with Amazon earlier in
 the
 > week and the only thing we could do to make it work is to change
 the
 > version to 1.5.1.  Can someone please attempt to reproduce before I
 > open a JIRA issue for it?
 >
 >

 --
 Jean-Baptiste Onofré
 jbono...@apache.org
 http://blog.nanthrax.net
 Talend - http://www.talend.com

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>

Re: Spark streaming with Kinesis broken?

2015-12-10 Thread Burak Yavuz
I don't think the Kinesis tests specifically ran when that was merged into
1.5.2 :(
https://github.com/apache/spark/pull/8957
https://github.com/apache/spark/commit/883bd8fccf83aae7a2a847c9a6ca129fac86e6a3

AFAIK pom changes don't trigger the Kinesis tests.

Burak

On Thu, Dec 10, 2015 at 8:09 PM, Nick Pentreath 
wrote:

> Yup also works for me on master branch as I've been testing DynamoDB
> Streams integration. In fact works with latest KCL 1.6.1 also which I was
> using.
>
> So theKCL version does seem like it could be the issue - somewhere along
> the line an exception must be getting swallowed. Though the tests should
> have picked this up? Will dig deeper.
>
> —
> Sent from Mailbox 
>
>
> On Thu, Dec 10, 2015 at 11:07 PM, Brian London 
> wrote:
>
>> Yes, it worked in the 1.6 branch as of commit
>> db5165246f2888537dd0f3d4c5a515875c7358ed.  That makes it much less
>> serious of an issue, although it would be nice to know what the root cause
>> is to avoid a regression.
>>
>> On Thu, Dec 10, 2015 at 4:03 PM Burak Yavuz  wrote:
>>
>>> I've noticed this happening when there was some dependency conflicts,
>>> and it is super hard to debug.
>>> It seems that the KinesisClientLibrary version in Spark 1.5.2 is 1.3.0,
>>> but it is 1.2.1 in Spark 1.5.1.
>>> I feel like that seems to be the problem...
>>>
>>> Brian, did you verify that it works with the 1.6.0 branch?
>>>
>>> Thanks,
>>> Burak
>>>
>>> On Thu, Dec 10, 2015 at 11:45 AM, Brian London 
>>> wrote:
>>>
 Nick's symptoms sound identical to mine.  I should mention that I just
 pulled the latest version from github and it seems to be working there.  To
 reproduce:


1. Download spark 1.5.2 from http://spark.apache.org/downloads.html
2. build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests
clean package
3. build/mvn -Pkinesis-asl -DskipTests clean package
4. Then run simultaneously:
1. bin/run-example streaming.KinesisWordCountASL [Kinesis app name]
   [Kinesis stream name] [endpoint URL]
   2.   bin/run-example streaming.KinesisWordProducerASL [Kinesis
   stream name] [endpoint URL] 100 10


 On Thu, Dec 10, 2015 at 2:05 PM Jean-Baptiste Onofré 
 wrote:

> Hi Nick,
>
> Just to be sure: don't you see some ClassCastException in the log ?
>
> Thanks,
> Regards
> JB
>
> On 12/10/2015 07:56 PM, Nick Pentreath wrote:
> > Could you provide an example / test case and more detail on what
> issue
> > you're facing?
> >
> > I've just tested a simple program reading from a dev Kinesis stream
> and
> > using stream.print() to show the records, and it works under 1.5.1
> but
> > doesn't appear to be working under 1.5.2.
> >
> > UI for 1.5.2:
> >
> > Inline image 1
> >
> > UI for 1.5.1:
> >
> > Inline image 2
> >
> > On Thu, Dec 10, 2015 at 5:50 PM, Brian London <
> brianmlon...@gmail.com
> > > wrote:
> >
> > Has anyone managed to run the Kinesis demo in Spark 1.5.2?  The
> > Kinesis ASL that ships with 1.5.2 appears to not work for me
> > although 1.5.1 is fine. I spent some time with Amazon earlier in
> the
> > week and the only thing we could do to make it work is to change
> the
> > version to 1.5.1.  Can someone please attempt to reproduce
> before I
> > open a JIRA issue for it?
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>>>
>