ROSE: Spark + R on the JVM.

2016-01-12 Thread David
Hi all,

I'd like to share news of the recent release of a new Spark package, 
[ROSE](http://spark-packages.org/package/onetapbeyond/opencpu-spark-executor).

ROSE is a Scala library offering access to the full scientific computing power 
of the R programming language to Apache Spark batch and streaming applications 
on the JVM. Where Apache SparkR lets data scientists use Spark from R, ROSE is 
designed to let Scala and Java developers use R from Spark.

The project is available and documented [on 
GitHub](https://github.com/onetapbeyond/opencpu-spark-executor) and I would 
encourage you to [take a 
look](https://github.com/onetapbeyond/opencpu-spark-executor). Any feedback, 
questions etc very welcome.

David

"All that is gold does not glitter, Not all those who wander are lost."

saveAsTextFile hangs with hdfs

2014-08-19 Thread David
I have a simple spark job that seems to hang when saving to hdfs.  When
looking at the spark web ui, the job reached 97 of 100 tasks completed. I
need some help determining why the job appears to hang.  The job hangs on
the "saveAsTextFile()" call.



https://www.dropbox.com/s/fdp7ck91hhm9w68/Screenshot%202014-08-19%2010.53.24.png


The job is pretty simple:

 JavaRDD analyticsLogs = context
.textFile(Joiner.on(",").join(hdfs.glob("/spark-dfs",
".*\\.log$")), 4);

JavaRDD flyweights = analyticsLogs
.map(line -> {
try {
AnalyticsLog log = GSON.fromJson(line,
AnalyticsLog.class);
AnalyticsLogFlyweight flyweight = new
AnalyticsLogFlyweight();
flyweight.ipAddress = log.getIpAddress();
flyweight.time = log.getTime();
flyweight.trackingId = log.getTrackingId();
return flyweight;

} catch (Exception e) {
LOG.error("error parsing json", e);
return null;
}
});


JavaRDD filtered = flyweights
.filter(log -> log != null);

JavaPairRDD partitioned = filtered
.mapToPair((AnalyticsLogFlyweight log) -> new
Tuple2<>(log.trackingId, log))
.partitionBy(new HashPartitioner(100)).cache();


Ordering ordering =
Ordering.natural().nullsFirst().onResultOf(new
Function() {
public Long apply(AnalyticsLogFlyweight log) {
return log.time;
}
});

JavaPairRDD>
stringIterableJavaPairRDD = partitioned.groupByKey();
JavaPairRDD stringIntegerJavaPairRDD =
stringIterableJavaPairRDD.mapToPair((log) -> {
List sorted =
Lists.newArrayList(log._2());
sorted.forEach(l -> LOG.info("sorted {}", l));
   return new Tuple2<>(log._1(), sorted.size());
});

String outputPath = "/summarized/groupedByTrackingId4";
hdfs.rm(outputPath, true);
stringIntegerJavaPairRDD.saveAsTextFile(String.format("%s/%s",
hdfs.getUrl(), outputPath));


Thanks in advance, David


sortByKey trouble

2014-09-24 Thread david
Hi,

  Does anybody know how to use sortbykey in scala on a RDD like  :

  val rddToSave = file.map(l => l.split("\\|")).map(r => (r(34)+"-"+r(3),
r(4), r(10), r(12)))

  besauce, i received ann error "sortByKey is not a member of
ord.apache.spark.rdd.RDD[(String,String,String,String)].  

What i try do do is sort on the first element.


Thank's

  





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-trouble-tp14989.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sortByKey trouble

2014-09-24 Thread david
thank's

 i've already try this solution but it does not compile (in Eclipse)

  I'm surprise to see that in Spark-shell, sortByKey works fine on 2
solutions :

   (String,String,String,String)
   (String,(String,String,String))





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-trouble-tp14989p15002.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



foreachPartition: write to multiple files

2014-10-08 Thread david
Hi,

  I want to write my RDDs to multiples files based on a key value. So, i
used groupByKey and iterate over partitions. Here is a the code :


rdd.map(f => (f.substring(0,4), f)).groupByKey().foreachPartition(iterator
=>  
   iterator.map  { case (key, values) =>
   val fs: FileSystem = FileSystem.get(new Configuration())
   val outputFile = fs.create(new Path("/my_path/" + key + ".txt"))
   
   values.foreach { x => outputFile.write(x.getBytes()) }
   
   outputFile.close()
   }
)

I don't see any error on the spark-shell LOG. But, no file is written. 

Does any body know where i missed something ?

thank's




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/foreachPartition-write-to-multiple-files-tp15925.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: foreachPartition: write to multiple files

2014-10-08 Thread david
Hi,

 I finally found a solution after reading the post :

http://apache-spark-user-list.1001560.n3.nabble.com/how-to-split-RDD-by-key-and-save-to-different-path-td11887.html#a11983





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/foreachPartition-write-to-multiple-files-tp15925p15937.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Key-Value decomposition

2014-11-03 Thread david
Hi,

  I'm a newbie in Spark and faces the following use case :

   val data = Array ( "A", "1;2;3")
   val rdd = sc.parallelize(data)

// Something here to produce RDD of (Key,value) 
// ( "A", "1") , ("A", "2"), ("A", "3)
  
Does anybody know how to do ?

Thank's

   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decomposition-tp17966.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Key-Value decomposition

2014-11-03 Thread david
Hi,

 But i've only one RDD. Hre is a more complete exemple :

my rdd is something like   ("A", "1;2;3"),  ("B", "2;5;6"), ("C", "3;2;1") 

And i expect to have the following result :

 ("A",1) , ("A",2) , ("A",3) , ("B",2) , ("B",5) , ("B",6) , ("C",3) ,
("C",2) , ("C",1)


Any idea about how can i achieve this ?

Thank's



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decomposition-tp17966p18036.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Key-Value decomposition

2014-11-04 Thread david
Thank's



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decomposition-tp17966p18050.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL (1.0)

2014-11-24 Thread david
Hi,

 I build 2 tables from files. Table F1 join with table F2 on c5=d4.

 F1 has 46730613 rows
 F2 has   3386740 rows

All keys d4 exists in F1.c5,  so i expect to retrieve 46730613  rows. But it
returns only 3437  rows

// --- begin code ---

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD


val rddFile = sc.textFile("hdfs://referential/F1/part-*")
case class F1(c1:String, c2:String,c3:Double, c3:String, c5: String)
val stkrdd = rddFile.map(x => x.split("|")).map(f =>
F1(f(44),f(3),f(10).toDouble, "",f(2)))
stkrdd.registerAsTable("F1")
sqlContext.cacheTable("F1")


val prdfile = sc.textFile("hdfs://referential/F2/part-*")
case class F2(d1: String, d2:String, d3:String,d4:String)
val productrdd = prdfile.map(x => x.split("|")).map(f =>
F2(f(0),f(2),f(101),f(3)))
productrdd.registerAsTable("F2")
sqlContext.cacheTable("F2")

val resrdd = sqlContext.sql("Select count(*) from F1, F2 where F1.c5 = F2.d4
").collect()

// --- end of code ---


Does anybody know what i missed ?

Thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-tp19651.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL Join returns less rows that expected

2014-11-25 Thread david
Hi,

 I have 2 files which come from csv import of 2 Oracle tables. 

 F1 has 46730613 rows
 F2 has   3386740 rows

I build 2 tables with spark.

Table F1 join with table F2 on c1=d1.


All keys F2.d1 exists in F1.c1,  so i expect to retrieve 46730613  rows. But
it returns only 3437  rows

// --- begin code ---

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD


val rddFile = sc.textFile("hdfs://referential/F1/part-*")
case class F1(c1:String, c2:String,c3:Double, c3:String, c5: String)
val stkrdd = rddFile.map(x => x.split("|")).map(f =>
F1(f(44),f(3),f(10).toDouble, "",f(2)))
stkrdd.registerAsTable("F1")
sqlContext.cacheTable("F1")


val prdfile = sc.textFile("hdfs://referential/F2/part-*")
case class F2(d1: String, d2:String, d3:String,d4:String)
val productrdd = prdfile.map(x => x.split("|")).map(f =>
F2(f(0),f(2),f(101),f(3)))
productrdd.registerAsTable("F2")
sqlContext.cacheTable("F2")

val resrdd = sqlContext.sql("Select count(*) from F1, F2 where F1.c1 = F2.d1
").count()

// --- end of code ---


Does anybody know what i missed ?

Thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-returns-less-rows-that-expected-tp19731.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark streaming kafa best practices ?

2014-12-05 Thread david
hi,

  What is the bet way to process a batch window in SparkStreaming :

kafkaStream.foreachRDD(rdd => {
  rdd.collect().foreach(event => {
// process the event
process(event)
  })
})


Or 

kafkaStream.foreachRDD(rdd => {
  rdd.map(event => {
// process the event
process(event)
  }).collect()
})


thank's



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafa-best-practices-tp20470.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark steaming : work with collect() but not without collect()

2014-12-11 Thread david
Hi,

  We use the following Spark Streaming code to collect and process Kafka
event :

kafkaStream.foreachRDD(rdd => {
  rdd.collect().foreach(event => {
  process(event._1, event._2)
  })
})

This work fine.

But without /collect()/ function, the following exception is raised for call
to function process:
*Loss was due to java.lang.ExceptionInInitializerError*


  We attempt to rewrite like this but the same exception is raised :

 kafkaStream.foreachRDD(rdd => {
  rdd.foreachPartition(iter =>
iter.foreach (event => {
process(event._1, event._2)
  })
  )
})


Does anybody can explain to us why and how to solve this issue ?

Thank's

Regards






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-steaming-work-with-collect-but-not-without-collect-tp20622.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Job is not able to perform Broadcast Join

2020-10-06 Thread David Edwards
After adding the sequential ids you might need a repartition? I've found
using monotically increasing id before that the df goes to a single
partition. Usually becomes clear in the spark ui though

On Tue, 6 Oct 2020, 20:38 Sachit Murarka,  wrote:

> Yes, Even I tried the same first. Then I moved to join method because
> shuffle spill was happening because row num without partition happens on
> single task. Instead of processinf entire dataframe on single task. I have
> broken down that into df1 and df2 and joining.
> Because df2 is having very less data set since it has 2 cols only.
>
> Thanks
> Sachit
>
> On Wed, 7 Oct 2020, 01:04 Eve Liao,  wrote:
>
>> Try to avoid broadcast. Thought this:
>> https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6
>> could be helpful.
>>
>> On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka 
>> wrote:
>>
>>> Thanks Eve for response.
>>>
>>> Yes I know we can use broadcast for smaller datasets,I increased the
>>> threshold (4Gb) for the same then also it did not work. and the df3 is
>>> somewhat greater than 2gb.
>>>
>>> Trying by removing broadcast as well.. Job is running since 1 hour. Will
>>> let you know.
>>>
>>>
>>> Thanks
>>> Sachit
>>>
>>> On Wed, 7 Oct 2020, 00:41 Eve Liao,  wrote:
>>>
 How many rows does df3 have? Broadcast joins are a great way to append
 data stored in relatively *small* single source of truth data files to
 large DataFrames. DataFrames up to 2GB can be broadcasted so a data file
 with tens or even hundreds of thousands of rows is a broadcast candidate.
 Your broadcast variable is probably too large.

 On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka 
 wrote:

> Hello Users,
>
> I am facing an issue in spark job where I am doing row number()
> without partition by clause because I need to add sequential increasing 
> IDs.
> But to avoid the large spill I am not doing row number() over the
> complete data frame.
>
> Instead I am applying monotically_increasing id on actual data set ,
> then create a new data frame from original data frame which will have
> just monotically_increasing id.
>
> So DF1 = All columns + monotically_increasing_id
> DF2 = Monotically_increasingID
>
> Now I am applying row number() on DF2 since this is a smaller
> dataframe.
>
> DF3 = Monotically_increasingID + Row_Number_ID
>
> Df.join(broadcast(DF3))
>
> This will give me sequential increment id in the original Dataframe.
>
> But below is the stack trace.
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o180.parquet.
> : org.apache.spark.SparkException: Job aborted.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:28

Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Hello,

I have an issue with my Pyspark job related to checkpoint.

Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
java.lang.IllegalStateException: Error reading delta file
file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
HDFSStateStoreProvider[id = (op=0,part=3),dir =
file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]:
*file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
does not exist*

This job is based on Spark 3.0.1 and Structured Streaming
This Spark cluster (1 driver and 6 executors) works without hdfs. And we
don't want to manage an hdfs cluster if possible.
Is it necessary to have a distributed filesystem ? What are the different
solutions/workarounds ?

Thanks in advance
David


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Thanks.
My Spark applications run on nodes based on docker images but this is a
standalone mode (1 driver - n workers)
Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
Consistent view
<https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html>
 ?

Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
écrit :

> Yes. It is necessary to have a distributed file system because all the
> workers need to read/write to the checkpoint. The distributed file system
> has to be immediately consistent: When one node writes to it, the other
> nodes should be able to read it immediately
>
> The solutions/workarounds depend on where you are hosting your Spark
> application.
>
>
>
> *From: *David Morin 
> *Date: *Wednesday, December 23, 2020 at 11:08 AM
> *To: *"user@spark.apache.org" 
> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints fail
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hello,
>
>
>
> I have an issue with my Pyspark job related to checkpoint.
>
>
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
> java.lang.IllegalStateException: Error reading delta file
> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
> HDFSStateStoreProvider[id = (op=0,part=3),dir =
> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
> does not exist*
>
>
>
> This job is based on Spark 3.0.1 and Structured Streaming
>
> This Spark cluster (1 driver and 6 executors) works without hdfs. And we
> don't want to manage an hdfs cluster if possible.
>
> Is it necessary to have a distributed filesystem ? What are the different
> solutions/workarounds ?
>
>
>
> Thanks in advance
>
> David
>


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Does it work with the standard AWS S3 solution and its new consistency model
<https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/>
?

Le mer. 23 déc. 2020 à 18:48, David Morin  a
écrit :

> Thanks.
> My Spark applications run on nodes based on docker images but this is a
> standalone mode (1 driver - n workers)
> Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
> Consistent view
> <https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html>
>  ?
>
> Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
> écrit :
>
>> Yes. It is necessary to have a distributed file system because all the
>> workers need to read/write to the checkpoint. The distributed file system
>> has to be immediately consistent: When one node writes to it, the other
>> nodes should be able to read it immediately
>>
>> The solutions/workarounds depend on where you are hosting your Spark
>> application.
>>
>>
>>
>> *From: *David Morin 
>> *Date: *Wednesday, December 23, 2020 at 11:08 AM
>> *To: *"user@spark.apache.org" 
>> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints fail
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hello,
>>
>>
>>
>> I have an issue with my Pyspark job related to checkpoint.
>>
>>
>>
>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
>> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
>> java.lang.IllegalStateException: Error reading delta file
>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
>> HDFSStateStoreProvider[id = (op=0,part=3),dir =
>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
>> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
>> does not exist*
>>
>>
>>
>> This job is based on Spark 3.0.1 and Structured Streaming
>>
>> This Spark cluster (1 driver and 6 executors) works without hdfs. And we
>> don't want to manage an hdfs cluster if possible.
>>
>> Is it necessary to have a distributed filesystem ? What are the different
>> solutions/workarounds ?
>>
>>
>>
>> Thanks in advance
>>
>> David
>>
>


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Thanks Jungtaek
Ok I got it. I'll test it and check if the loss of efficiency is acceptable.


Le mer. 23 déc. 2020 à 23:29, Jungtaek Lim  a
écrit :

> Please refer my previous answer -
> https://lists.apache.org/thread.html/r7dfc9e47cd9651fb974f97dde756013fd0b90e49d4f6382d7a3d68f7%40%3Cuser.spark.apache.org%3E
> Probably we may want to add it in the SS guide doc. We didn't need it as
> it just didn't work with eventually consistent model, and now it works
> anyway but is very inefficient.
>
>
> On Thu, Dec 24, 2020 at 6:16 AM David Morin 
> wrote:
>
>> Does it work with the standard AWS S3 solution and its new
>> consistency model
>> <https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/>
>> ?
>>
>> Le mer. 23 déc. 2020 à 18:48, David Morin  a
>> écrit :
>>
>>> Thanks.
>>> My Spark applications run on nodes based on docker images but this is a
>>> standalone mode (1 driver - n workers)
>>> Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
>>> Consistent view
>>> <https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html>
>>>  ?
>>>
>>> Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
>>> écrit :
>>>
>>>> Yes. It is necessary to have a distributed file system because all the
>>>> workers need to read/write to the checkpoint. The distributed file system
>>>> has to be immediately consistent: When one node writes to it, the other
>>>> nodes should be able to read it immediately
>>>>
>>>> The solutions/workarounds depend on where you are hosting your Spark
>>>> application.
>>>>
>>>>
>>>>
>>>> *From: *David Morin 
>>>> *Date: *Wednesday, December 23, 2020 at 11:08 AM
>>>> *To: *"user@spark.apache.org" 
>>>> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints
>>>> fail
>>>>
>>>>
>>>>
>>>> *CAUTION*: This email originated from outside of the organization. Do
>>>> not click links or open attachments unless you can confirm the sender and
>>>> know the content is safe.
>>>>
>>>>
>>>>
>>>> Hello,
>>>>
>>>>
>>>>
>>>> I have an issue with my Pyspark job related to checkpoint.
>>>>
>>>>
>>>>
>>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>>>> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
>>>> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
>>>> java.lang.IllegalStateException: Error reading delta file
>>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
>>>> HDFSStateStoreProvider[id = (op=0,part=3),dir =
>>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
>>>> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
>>>> does not exist*
>>>>
>>>>
>>>>
>>>> This job is based on Spark 3.0.1 and Structured Streaming
>>>>
>>>> This Spark cluster (1 driver and 6 executors) works without hdfs. And
>>>> we don't want to manage an hdfs cluster if possible.
>>>>
>>>> Is it necessary to have a distributed filesystem ? What are the
>>>> different solutions/workarounds ?
>>>>
>>>>
>>>>
>>>> Thanks in advance
>>>>
>>>> David
>>>>
>>>


S3a Committer

2021-02-02 Thread David Morin
Hi,

I have some issues at the moment with S3 API of Openstack Swift (S3a).
This one is eventually consistent and it causes lots of issues with my
distributed jobs in Spark.
Is the S3A committer able to fix that ? Or an "S3guard like" implementation
is the only way ?

David


Re: S3a Committer

2021-02-02 Thread David Morin
Yes, that's true but this is not (yet) the case of the Openstack Swift S3
API

Le mar. 2 févr. 2021 à 21:41, Henoc  a écrit :

> S3 is strongly consistent now
> https://aws.amazon.com/s3/consistency/
>
> Regards,
> Henoc
>
> On Tue, Feb 2, 2021, 10:27 PM David Morin 
> wrote:
>
>> Hi,
>>
>> I have some issues at the moment with S3 API of Openstack Swift (S3a).
>> This one is eventually consistent and it causes lots of issues with my
>> distributed jobs in Spark.
>> Is the S3A committer able to fix that ? Or an "S3guard like"
>> implementation is the only way ?
>>
>> David
>>
>


Missing stack function from SQL functions API

2021-06-14 Thread david . szakallas
I noticed that the stack SQL function is missing from the functions
API. Could we add it?


Re: Performance of PySpark jobs on the Kubernetes cluster

2021-08-11 Thread David Diebold
Hi Mich,

I don't quite understand why the driver node is using so much CPU, but it
may be unrelated to your executors being underused.
About your executors being underused, I would check that your job generated
enough tasks.
Then I would check spark.executor.cores and spark.tasks.cpus parameters to
see if I can give more work to the executors.

Cheers,
David



Le mar. 10 août 2021 à 12:20, Khalid Mammadov  a
écrit :

> Hi Mich
>
> I think you need to check your code.
> If code does not use PySpark API effectively you may get this. I.e. if you
> use pure Python/pandas api rather than Pyspark i.e.
> transform->transform->action. e.g df.select(..).withColumn(...)...count()
>
> Hope this helps to put you on right direction.
>
> Cheers
> Khalid
>
>
>
>
> On Mon, 9 Aug 2021, 20:20 Mich Talebzadeh, 
> wrote:
>
>> Hi,
>>
>> I have a basic question to ask.
>>
>> I am running a Google k8s cluster (AKA GKE) with three nodes each having
>> configuration below
>>
>> e2-standard-2 (2 vCPUs, 8 GB memory)
>>
>>
>> spark-submit is launched from another node (actually a data proc single
>> node that I have just upgraded to e2-custom (4 vCPUs, 8 GB mem). We call
>> this the launch node
>>
>> OK I know that the cluster is not much but Google was complaining about
>> the launch node hitting 100% cpus. So I added two more cpus to it.
>>
>> It appears that despite using k8s as the computational cluster, the
>> burden falls upon the launch node!
>>
>> The cpu utilisation for launch node shown below
>>
>> [image: image.png]
>> The dip is when 2 more cpus were added to  it so it had to reboot. so
>> around %70 usage
>>
>> The combined cpu usage for GKE nodes is shown below:
>>
>> [image: image.png]
>>
>> Never goes above 20%!
>>
>> I can see that the drive and executors as below:
>>
>> k get pods -n spark
>> NAME READY   STATUSRESTARTS
>>  AGE
>> pytest-c958c97b2c52b6ed-driver   1/1 Running   0
>> 69s
>> randomdatabigquery-e68a8a7b2c52f468-exec-1   1/1 Running   0
>> 51s
>> randomdatabigquery-e68a8a7b2c52f468-exec-2   1/1 Running   0
>> 51s
>> randomdatabigquery-e68a8a7b2c52f468-exec-3   0/1 Pending   0
>> 51s
>>
>> It is a PySpark 3.1.1 image using java 8 and pushing random data
>> generated into Google BigQuery data warehouse. The last executor (exec-3)
>> seems to be just pending. The spark-submit is as below:
>>
>> spark-submit --verbose \
>>--properties-file ${property_file} \
>>--master k8s://https://$KUBERNETES_MASTER_IP:443 \
>>--deploy-mode cluster \
>>--name pytest \
>>--conf
>> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
>>--py-files $CODE_DIRECTORY/DSBQ.zip \
>>--conf spark.kubernetes.namespace=$NAMESPACE \
>>--conf spark.executor.memory=5000m \
>>--conf spark.network.timeout=300 \
>>--conf spark.executor.instances=3 \
>>--conf spark.kubernetes.driver.limit.cores=1 \
>>--conf spark.driver.cores=1 \
>>--conf spark.executor.cores=1 \
>>--conf spark.executor.memory=2000m \
>>--conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
>>--conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
>>--conf spark.kubernetes.container.image=${IMAGEGCP} \
>>--conf
>> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
>>--conf
>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
>>--conf
>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>> \
>>--conf spark.sql.execution.arrow.pyspark.enabled="true" \
>>$CODE_DIRECTORY/${APPLICATION}
>>
>> Aren't the driver and executors running on K8s cluster? So why is the
>> launch node heavily used but k8s cluster is underutilized?
>>
>> Thanks
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Trying to hash cross features with mllib

2021-10-01 Thread David Diebold
Hello everyone,

In MLLib, I’m trying to rely essentially on pipelines to create features
out of the Titanic dataset, and show-case the power of feature hashing. I
want to:

-  Apply bucketization on some columns (QuantileDiscretizer is fine)

-  Then I want to cross all my columns with each other to have
cross features.

-  Then I would like to hash all of these cross features into a
vector.

-  Then give it to a logistic regression.

Looking at the documentation, it looks like the only way to hash features
is the *FeatureHasher* transformation. It takes multiple columns as input,
type can be numeric, bool, string (but no vector/array).

But now I’m left wondering how I can create my cross-feature columns. I’m
looking at a transformation that could take two columns as input, and
return a numeric, bool, or string. I didn't manage to find anything that
does the job. There are multiple transformations such as VectorAssembler,
that operate on vector, but this is not a typeaccepted by the FeatureHasher.

Of course, I could try to combine columns directly in my dataframe (before
the pipeline kicks-in), but then I would not be able to benefit any more
from QuantileDiscretizer and other cool functions.


Am I missing something in the transformation api ? Or is my approach to
hashing wrong ? Or should we consider to extend the api somehow ?



Thank you, kind regards,

David


Re: Trying to hash cross features with mllib

2021-10-04 Thread David Diebold
Hello Sean,

Thank you for the heads-up !
Interaction transform won't help for my use case as it returns a vector
that I won't be able to hash.
I will definitely dig further into custom transformations though.

Thanks !
David

Le ven. 1 oct. 2021 à 15:49, Sean Owen  a écrit :

> Are you looking for
> https://spark.apache.org/docs/latest/ml-features.html#interaction ?
> That's the closest built in thing I can think of.  Otherwise you can make
> custom transformations.
>
> On Fri, Oct 1, 2021, 8:44 AM David Diebold 
> wrote:
>
>> Hello everyone,
>>
>> In MLLib, I’m trying to rely essentially on pipelines to create features
>> out of the Titanic dataset, and show-case the power of feature hashing. I
>> want to:
>>
>> -  Apply bucketization on some columns (QuantileDiscretizer is
>> fine)
>>
>> -  Then I want to cross all my columns with each other to have
>> cross features.
>>
>> -  Then I would like to hash all of these cross features into a
>> vector.
>>
>> -  Then give it to a logistic regression.
>>
>> Looking at the documentation, it looks like the only way to hash features
>> is the *FeatureHasher* transformation. It takes multiple columns as
>> input, type can be numeric, bool, string (but no vector/array).
>>
>> But now I’m left wondering how I can create my cross-feature columns. I’m
>> looking at a transformation that could take two columns as input, and
>> return a numeric, bool, or string. I didn't manage to find anything that
>> does the job. There are multiple transformations such as VectorAssembler,
>> that operate on vector, but this is not a typeaccepted by the FeatureHasher.
>>
>> Of course, I could try to combine columns directly in my dataframe
>> (before the pipeline kicks-in), but then I would not be able to benefit any
>> more from QuantileDiscretizer and other cool functions.
>>
>>
>> Am I missing something in the transformation api ? Or is my approach to
>> hashing wrong ? Or should we consider to extend the api somehow ?
>>
>>
>>
>> Thank you, kind regards,
>>
>> David
>>
>


question about data skew and memory issues

2021-12-14 Thread David Diebold
Hello all,

I was wondering if it possible to encounter out of memory exceptions on
spark executors when doing some aggregation, when a dataset is skewed.
Let's say we have a dataset with two columns:
- key : int
- value : float
And I want to aggregate values by key.
Let's say that we have a tons of key equal to 0.

Is it possible to encounter some out of memory exception after the shuffle ?
My expectation would be that the executor responsible of aggregating the
'0' partition could indeed have some oom exception if it tries to put all
the files of this partition in memory before processing them.
But why would it need to put them in memory when doing in aggregation ? It
looks to me that aggregation can be performed in a stream fashion, so I
would not expect any oom at all..

Thank you in advance for your lights :)
David


Re: Pyspark debugging best practices

2022-01-03 Thread David Diebold
Hello Andy,

Are you sure you want to perform lots of join operations, and not simple
unions ?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each
individual dataset size ?
Have a look at execution plan would help, maybe the high amount of join
operations makes execution plan too complicated at the end of the day ;
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson  a
écrit :

> Hi Gourav
>
> I will give databricks a try.
>
> Each data gets loaded into a data frame.
> I select one column from the data frame
> I join the column to the  accumulated joins from previous data frames in
> the list
>
> To debug. I think am gaining to put an action and log statement after each
> join. I do not think it will change the performance. I believe the physical
> plan will be the same how ever hopefully it will shed some light.
>
> At the very least I will know if it making progress or not. And hopefully
> where it is breaking
>
> Happy new year
>
> Andy
>
> On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta 
> wrote:
>
>> Hi Andrew,
>>
>> Any chance you might give Databricks a try in GCP?
>>
>> The above transformations look complicated to me, why are you adding
>> dataframes to a list?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
>> wrote:
>>
>>> Hi
>>>
>>>
>>>
>>> I am having trouble debugging my driver. It runs correctly on smaller
>>> data set but fails on large ones.  It is very hard to figure out what the
>>> bug is. I suspect it may have something do with the way spark is installed
>>> and configured. I am using google cloud platform dataproc pyspark
>>>
>>>
>>>
>>> The log messages are not helpful. The error message will be something
>>> like
>>> "User application exited with status 1"
>>>
>>>
>>>
>>> And
>>>
>>>
>>>
>>> jsonPayload: {
>>>
>>> class: "server.TThreadPoolServer"
>>>
>>> filename: "hive-server2.log"
>>>
>>> message: "Error occurred during processing of message."
>>>
>>> thread: "HiveServer2-Handler-Pool: Thread-40"
>>>
>>> }
>>>
>>>
>>>
>>> I am able to access the spark history server however it does not capture
>>> anything if the driver crashes. I am unable to figure out how to access
>>> spark web UI.
>>>
>>>
>>>
>>> My driver program looks something like the pseudo code bellow. A long
>>> list of transforms with a single action, (i.e. write) at the end. Adding
>>> log messages is not helpful because of lazy evaluations. I am tempted to
>>> add something like
>>>
>>>
>>>
>>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and
>>> inline some sort of diagnostic message.
>>>
>>>
>>>
>>> What do you think?
>>>
>>>
>>>
>>> Is there a better way to debug this?
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> def run():
>>>
>>> listOfDF = []
>>>
>>> for filePath in listOfFiles:
>>>
>>> df = spark.read.load( filePath, ...)
>>>
>>> listOfDF.append(df)
>>>
>>>
>>>
>>>
>>>
>>> list2OfDF = []
>>>
>>> for df in listOfDF:
>>>
>>> df2 = df.select(  )
>>>
>>> lsit2OfDF.append( df2 )
>>>
>>>
>>>
>>> # will setting list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> listOfDF = None
>>>
>>>
>>>
>>>
>>>
>>> df3 = list2OfDF[0]
>>>
>>>
>>>
>>> for i in range( 1, len(list2OfDF) ):
>>>
>>> df = list2OfDF[i]
>>>
>>> df3 = df3.join(df ...)
>>>
>>>
>>>
>>> # will setting to list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> List2OfDF = None
>>>
>>>
>>>
>>>
>>>
>>> lots of narrow transformations on d3
>>>
>>>
>>>
>>> return df3
>>>
>>>
>>>
>>> def main() :
>>>
>>> df = run()
>>>
>>> df.write()
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: groupMapReduce

2022-01-14 Thread David Diebold
Hello,

In RDD api, you must be looking for reduceByKey.

Cheers

Le ven. 14 janv. 2022 à 11:56, frakass  a écrit :

> Is there a RDD API which is similar to Scala's groupMapReduce?
> https://blog.genuine.com/2019/11/scalas-groupmap-and-groupmapreduce/
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Question about spark.sql min_by

2022-02-21 Thread David Diebold
Hello all,

I'm trying to use the spark.sql min_by aggregation function with pyspark.
I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2

I have a dataframe made of these columns:
- productId : int
- sellerId : int
- price : double

For each product, I want to get the seller who sells the product for the
cheapest price.

Naive approach would be to do this, but I would expect two shuffles:

import spark.sql.functions as F
cheapest_prices_df  =
df.groupby('productId').agg(F.min('price').alias('price'))
cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId', 'price'])

I would had loved to do this instead :

import spark.sql.functions as F
cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
F.min_by('sellerId', 'price'))

Unfortunately min_by does not seem available in pyspark sql functions,
whereas I can see it in the doc :
https://spark.apache.org/docs/latest/api/sql/index.html

I have managed to use min_by with this approach but it looks slow (maybe
because of temp table creation ?):

df.createOrReplaceTempView("table")
cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
min(price) from table group by productId")

Is there a way I can rely on min_by directly in groupby ?
Is there some code missing in pyspark wrapper to make min_by visible
somehow ?

Thank you in advance for your help.

Cheers
David


Re: Question about spark.sql min_by

2022-02-21 Thread David Diebold
Thank you for your answers.
Indeed windowing should help there.
Also, I just realized maybe I can try to create a struct column with both
price and sellerId and apply min() on it, ordering would consider price
first for the ordering (https://stackoverflow.com/a/52669177/2015762)

Cheers!

Le lun. 21 févr. 2022 à 16:12, ayan guha  a écrit :

> Why this can not be done by window function? Or is min by is just a short
> hand?
>
> On Tue, 22 Feb 2022 at 12:42 am, Sean Owen  wrote:
>
>> From the source code, looks like this function was added to pyspark in
>> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
>> SQL with `spark.sql(...)` in Python though, not hard.
>>
>> On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
>> wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to use the spark.sql min_by aggregation function with pyspark.
>>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>>
>>> I have a dataframe made of these columns:
>>> - productId : int
>>> - sellerId : int
>>> - price : double
>>>
>>> For each product, I want to get the seller who sells the product for the
>>> cheapest price.
>>>
>>> Naive approach would be to do this, but I would expect two shuffles:
>>>
>>> import spark.sql.functions as F
>>> cheapest_prices_df  =
>>> df.groupby('productId').agg(F.min('price').alias('price'))
>>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>>> 'price'])
>>>
>>> I would had loved to do this instead :
>>>
>>> import spark.sql.functions as F
>>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>>> F.min_by('sellerId', 'price'))
>>>
>>> Unfortunately min_by does not seem available in pyspark sql functions,
>>> whereas I can see it in the doc :
>>> https://spark.apache.org/docs/latest/api/sql/index.html
>>>
>>> I have managed to use min_by with this approach but it looks slow (maybe
>>> because of temp table creation ?):
>>>
>>> df.createOrReplaceTempView("table")
>>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price)
>>> sellerId, min(price) from table group by productId")
>>>
>>> Is there a way I can rely on min_by directly in groupby ?
>>> Is there some code missing in pyspark wrapper to make min_by visible
>>> somehow ?
>>>
>>> Thank you in advance for your help.
>>>
>>> Cheers
>>> David
>>>
>> --
> Best Regards,
> Ayan Guha
>


Question about bucketing and custom partitioners

2022-04-11 Thread David Diebold
Hello,

I have a few questions related to bucketing and custom partitioning in
dataframe api.

I am considering bucketing to perform one-side free shuffle join in
incremental jobs, but there is one thing that I'm not happy with.
Data is likely to grow/skew over time. At some point, i would need to
change amount of buckets which would provoke shuffle.

Instead of this, I would like to use a custom partitioner, that would
replace shuffle by narrow transformation.
That is something that was feasible with RDD developer api. For example, I
could use such partitioning scheme:
partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) /
(Int.maxValue - Int.minValue)
When I multiply amount of partitions by 2 each new partition depends only
on one partition from parent (=> narrow transformation)

So, here are my questions :

1/ Is it possible to use custom partitioner when saving a dataframe with
bucketing ?
2/ Still with the API dataframe, is it possible to apply custom partitioner
to a dataframe ?
Is it possible to repartition the dataframe with a narrow
transformation like what could be done with RDD ?
Is there some sort of dataframe developer API ? Do you have any
pointers on this ?

Thanks !
David


Writing protobuf RDD to parquet

2023-01-20 Thread David Diebold
Hello,

I'm trying to write to parquet some RDD[T] where T is a protobuf message,
in scala.
I am wondering what is the best option to do this, and I would be
interested by your lights.
So far, I see two possibilities:
- use PairRDD method *saveAsNewAPIHadoopFile*, and I guess I need to
call *ParquetOutputFormat.setWriteSupportClass
*and *ProtoParquetOutputFormat.setProtobufClass *before. But in that case,
I'm not sure I have much control on how to partition files in different
folders on file system.
- or convert the RDD to dataframe then use *write.parquet ; *in that case,
I have more control, in case rely on *partitionBy *to arrange the files in
different folders. But I'm not sure there is some built-in way to convert
rdd of protobuf to dataframe in spark ? I would need to rely on this :
https://github.com/saurfang/sparksql-protobuf.

What do you think ?
Kind regards,
David


RDD boundaries and triggering processing using tags in the data

2015-05-27 Thread David Webber
Hi All,

I'm new to Spark and I'd like some help understanding if a particular use
case would be a good fit for Spark Streaming.

I have an imaginary stream of sensor data consisting of integers 1-10. 
Every time the sensor reads "10" I'd like to average all the numbers that
were received since the last "10"

example input: 10 5 8 4 6 2 1 2 8 8 8 1 6 9 1 3 10 1 3 10 ...
desired output: 4.8, 2.0

I'm confused about what happens if sensor readings fall into different RDDs.  

RDD1:  10 5 8 4 6 2 1 2 8 8 8
RDD2:  1 6 9 1 3 10 1 3 10
output: ???, 2.0

My imaginary sensor doesn't read at fixed time intervals, so breaking the
stream into RDDs by time interval won't ensure the data is packaged
properly.  Additionally, multiple sensors are writing to the same stream
(though I think flatMap can parse the origin stream into streams for
individual sensors, correct?).  

My best guess for processing goes like
1) flatMap() to break out individual sensor streams
2) Custom parser to accumulate input data until "10" is found, then create a
new output RDD for each sensor and data grouping
3) average the values from step 2

I would greatly appreciate pointers to some specific documentation or
examples if you have seen something like this before.

Thanks,
David



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-boundaries-and-triggering-processing-using-tags-in-the-data-tp23060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark sql - reading data from sql tables having space in column names

2015-06-02 Thread David Mitchell
I am having the same problem reading JSON.  There does not seem to be a way
of selecting a field that has a space, "Executor Info" from the Spark logs.

I suggest that we open a JIRA ticket to address this issue.
 On Jun 2, 2015 10:08 AM, "ayan guha"  wrote:

> I would think the easiest way would be to create a view in DB with column
> names with no space.
>
> In fact, you can "pass" a sql in place of a real table.
>
> From documentation: "The JDBC table that should be read. Note that
> anything that is valid in a `FROM` clause of a SQL query can be used. For
> example, instead of a full table you could also use a subquery in
> parentheses."
>
> Kindly let the community know if this works
>
> On Tue, Jun 2, 2015 at 6:43 PM, Sachin Goyal 
> wrote:
>
>> Hi,
>>
>> We are using spark sql (1.3.1) to load data from Microsoft sql server
>> using jdbc (as described in
>> https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
>> ).
>>
>> It is working fine except when there is a space in column names (we can't
>> modify the schemas to remove space as it is a legacy database).
>>
>> Sqoop is able to handle such scenarios by enclosing column names in '[ ]'
>> - the recommended method from microsoft sql server. (
>> https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java
>> - line no 319)
>>
>> Is there a way to handle this in spark sql?
>>
>> Thanks,
>> sachin
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Spark performance

2015-07-11 Thread David Mitchell
You can certainly query over 4 TB of data with Spark.  However, you will
get an answer in minutes or hours, not in milliseconds or seconds.  OLTP
databases are used for web applications, and typically return responses in
milliseconds.  Analytic databases tend to operate on large data sets, and
return responses in seconds, minutes or hours.  When running batch jobs
over large data sets, Spark can be a replacement for analytic databases
like Greenplum or Netezza.



On Sat, Jul 11, 2015 at 8:53 AM, Roman Sokolov  wrote:

> Hello. Had the same question. What if I need to store 4-6 Tb and do
> queries? Can't find any clue in documentation.
> Am 11.07.2015 03:28 schrieb "Mohammed Guller" :
>
>>  Hi Ravi,
>>
>> First, Neither Spark nor Spark SQL is a database. Both are compute
>> engines, which need to be paired with a storage system. Seconds, they are
>> designed for processing large distributed datasets. If you have only
>> 100,000 records or even a million records, you don’t need Spark. A RDBMS
>> will perform much better for that volume of data.
>>
>>
>>
>> Mohammed
>>
>>
>>
>> *From:* Ravisankar Mani [mailto:rrav...@gmail.com]
>> *Sent:* Friday, July 10, 2015 3:50 AM
>> *To:* user@spark.apache.org
>> *Subject:* Spark performance
>>
>>
>>
>> Hi everyone,
>>
>> I have planned to move mssql server to spark?.  I have using around
>> 50,000 to 1l records.
>>
>>  The spark performance is slow when compared to mssql server.
>>
>>
>>
>> What is the best data base(Spark or sql) to store or retrieve data around
>> 50,000 to 1l records ?
>>
>> regards,
>>
>> Ravi
>>
>>
>>
>


-- 
### Confidential e-mail, for recipient's (or recipients') eyes only, not
for distribution. ###


Re: No. of Task vs No. of Executors

2015-07-18 Thread David Mitchell
This is likely due to data skew.  If you are using key-value pairs, one key
has a lot more records, than the other keys.  Do you have any groupBy
operations?

David


On Tue, Jul 14, 2015 at 9:43 AM, shahid  wrote:

> hi
>
> I have a 10 node cluster  i loaded the data onto hdfs, so the no. of
> partitions i get is 9. I am running a spark application , it gets stuck on
> one of tasks, looking at the UI it seems application is not using all nodes
> to do calculations. attached is the screen shot of tasks, it seems tasks
> are
> put on each node more then once. looking at tasks 8 tasks get completed
> under 7-8 minutes and one task takes around 30 minutes so causing the delay
> in results.
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n23824/Screen_Shot_2015-07-13_at_9.png
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
### Confidential e-mail, for recipient's (or recipients') eyes only, not
for distribution. ###


Unable to Limit UI to localhost interface

2016-03-28 Thread David O'Gwynn
Greetings to all,

I've search around the mailing list, but it would seem that (nearly?)
everyone has the opposite problem as mine. I made a stab at looking in the
source for an answer, but I figured I might as well see if anyone else has
run into the same problem as I.

I'm trying to limit my Master/Worker UI to run only on localhost. As it
stands, I have the following two environment variables set in my
spark-env.sh:

SPARK_LOCAL_IP=127.0.0.1
SPARK_MASTER_IP=127.0.0.1

and my slaves file contains one line: 127.0.0.1

The problem is that when I run "start-all.sh", I can nmap my box's public
interface and get the following:

PORT STATE SERVICE
22/tcp   open  ssh
8080/tcp open  http-proxy
8081/tcp open  blackice-icecap

Furthermore, I can go to my box's public IP at port 8080 in my browser and
get the master node's UI. The UI even reports that the URL/REST URLs to be
127.0.0.1:

Spark Master at spark://127.0.0.1:7077
URL: spark://127.0.0.1:7077
REST URL: spark://127.0.0.1:6066 (cluster mode)

I'd rather not have spark available in any way to the outside world without
an explicit SSH tunnel.

There are variables to do with setting the Web UI port, but I'm not
concerned with the port, only the network interface to which the Web UI
binds.

Any help would be greatly appreciated.


Re: Unable to Limit UI to localhost interface

2016-03-29 Thread David O'Gwynn
/etc/hosts

127.0.0.1 localhost

conf/slaves
127.0.0.1


On Mon, Mar 28, 2016 at 5:36 PM, Mich Talebzadeh 
wrote:

> in your /etc/hosts what do you have for localhost
>
> 127.0.0.1 localhost.localdomain localhost
>
> conf/slave should have one entry in your case
>
> cat slaves
> # A Spark Worker will be started on each of the machines listed below.
> localhost
> ...
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 15:32, David O'Gwynn  wrote:
>
>> Greetings to all,
>>
>> I've search around the mailing list, but it would seem that (nearly?)
>> everyone has the opposite problem as mine. I made a stab at looking in the
>> source for an answer, but I figured I might as well see if anyone else has
>> run into the same problem as I.
>>
>> I'm trying to limit my Master/Worker UI to run only on localhost. As it
>> stands, I have the following two environment variables set in my
>> spark-env.sh:
>>
>> SPARK_LOCAL_IP=127.0.0.1
>> SPARK_MASTER_IP=127.0.0.1
>>
>> and my slaves file contains one line: 127.0.0.1
>>
>> The problem is that when I run "start-all.sh", I can nmap my box's public
>> interface and get the following:
>>
>> PORT STATE SERVICE
>> 22/tcp   open  ssh
>> 8080/tcp open  http-proxy
>> 8081/tcp open  blackice-icecap
>>
>> Furthermore, I can go to my box's public IP at port 8080 in my browser
>> and get the master node's UI. The UI even reports that the URL/REST URLs to
>> be 127.0.0.1:
>>
>> Spark Master at spark://127.0.0.1:7077
>> URL: spark://127.0.0.1:7077
>> REST URL: spark://127.0.0.1:6066 (cluster mode)
>>
>> I'd rather not have spark available in any way to the outside world
>> without an explicit SSH tunnel.
>>
>> There are variables to do with setting the Web UI port, but I'm not
>> concerned with the port, only the network interface to which the Web UI
>> binds.
>>
>> Any help would be greatly appreciated.
>>
>>
>


Re: Unable to Limit UI to localhost interface

2016-03-30 Thread David O'Gwynn
Thanks much, Akhil. iptables is certainly a bandaid, but from an OpSec
perspective, it's troubling.

Is there any way to limit which interfaces the WebUI listens on? Is there a
Jetty configuration that I'm missing?

Thanks again for your help,
David

On Wed, Mar 30, 2016 at 2:25 AM, Akhil Das 
wrote:

> In your case, you will be able to see the webui (unless restricted with
> iptables) but you won't be able to submit jobs to that machine from a
> remote machine since the spark master is spark://127.0.0.1:7077
>
> Thanks
> Best Regards
>
> On Tue, Mar 29, 2016 at 8:12 PM, David O'Gwynn  wrote:
>
>> /etc/hosts
>>
>> 127.0.0.1 localhost
>>
>> conf/slaves
>> 127.0.0.1
>>
>>
>> On Mon, Mar 28, 2016 at 5:36 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> in your /etc/hosts what do you have for localhost
>>>
>>> 127.0.0.1 localhost.localdomain localhost
>>>
>>> conf/slave should have one entry in your case
>>>
>>> cat slaves
>>> # A Spark Worker will be started on each of the machines listed below.
>>> localhost
>>> ...
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 28 March 2016 at 15:32, David O'Gwynn  wrote:
>>>
>>>> Greetings to all,
>>>>
>>>> I've search around the mailing list, but it would seem that (nearly?)
>>>> everyone has the opposite problem as mine. I made a stab at looking in the
>>>> source for an answer, but I figured I might as well see if anyone else has
>>>> run into the same problem as I.
>>>>
>>>> I'm trying to limit my Master/Worker UI to run only on localhost. As it
>>>> stands, I have the following two environment variables set in my
>>>> spark-env.sh:
>>>>
>>>> SPARK_LOCAL_IP=127.0.0.1
>>>> SPARK_MASTER_IP=127.0.0.1
>>>>
>>>> and my slaves file contains one line: 127.0.0.1
>>>>
>>>> The problem is that when I run "start-all.sh", I can nmap my box's
>>>> public interface and get the following:
>>>>
>>>> PORT STATE SERVICE
>>>> 22/tcp   open  ssh
>>>> 8080/tcp open  http-proxy
>>>> 8081/tcp open  blackice-icecap
>>>>
>>>> Furthermore, I can go to my box's public IP at port 8080 in my browser
>>>> and get the master node's UI. The UI even reports that the URL/REST URLs to
>>>> be 127.0.0.1:
>>>>
>>>> Spark Master at spark://127.0.0.1:7077
>>>> URL: spark://127.0.0.1:7077
>>>> REST URL: spark://127.0.0.1:6066 (cluster mode)
>>>>
>>>> I'd rather not have spark available in any way to the outside world
>>>> without an explicit SSH tunnel.
>>>>
>>>> There are variables to do with setting the Web UI port, but I'm not
>>>> concerned with the port, only the network interface to which the Web UI
>>>> binds.
>>>>
>>>> Any help would be greatly appreciated.
>>>>
>>>>
>>>
>>
>


RE: DStream how many RDD's are created by batch

2016-04-12 Thread David Newberger
Hi,

Time is usually the criteria if I’m understanding your question. An RDD is 
created for each batch interval. If your interval is 500ms then an RDD would be 
created every 500ms. If it’s 2 seconds then an RDD is created every 2 seconds.

Cheers,

David

From: Natu Lauchande [mailto:nlaucha...@gmail.com]
Sent: Tuesday, April 12, 2016 7:09 AM
To: user@spark.apache.org
Subject: DStream how many RDD's are created by batch

Hi,
What's the criteria for the number of RDD's created for each micro bath 
iteration  ?

Thanks,
Natu


RE: DStream how many RDD's are created by batch

2016-04-12 Thread David Newberger
Hi Natu,

I believe you are correct one RDD would be created for each file.

Cheers,

David

From: Natu Lauchande [mailto:nlaucha...@gmail.com]
Sent: Tuesday, April 12, 2016 1:48 PM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: DStream how many RDD's are created by batch

Hi David,
Thanks for you answer.
I have a follow up question :
I am using textFileStream , and listening in an S3 bucket for new files to 
process.  Files are created every 5 minutes and my batch interval is 2 minutes .

Does it mean that each file will be for one RDD ?

Thanks,
Natu

On Tue, Apr 12, 2016 at 7:46 PM, David Newberger 
mailto:david.newber...@wandcorp.com>> wrote:
Hi,

Time is usually the criteria if I’m understanding your question. An RDD is 
created for each batch interval. If your interval is 500ms then an RDD would be 
created every 500ms. If it’s 2 seconds then an RDD is created every 2 seconds.

Cheers,

David

From: Natu Lauchande [mailto:nlaucha...@gmail.com<mailto:nlaucha...@gmail.com>]
Sent: Tuesday, April 12, 2016 7:09 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: DStream how many RDD's are created by batch

Hi,
What's the criteria for the number of RDD's created for each micro bath 
iteration  ?

Thanks,
Natu



RE: Spark replacing Hadoop

2016-04-14 Thread David Newberger
Can we assume your question is “Will Spark replace Hadoop MapReduce?” or do you 
literally mean replacing the whole of Hadoop?

David

From: Ashok Kumar [mailto:ashok34...@yahoo.com.INVALID]
Sent: Thursday, April 14, 2016 2:13 PM
To: User
Subject: Spark replacing Hadoop

Hi,

I hear that some saying that Hadoop is getting old and out of date and will be 
replaced by Spark!

Does this make sense and if so how accurate is it?

Best


RE: Can not set spark dynamic resource allocation

2016-05-20 Thread David Newberger
Hi All,

The error you are seeing looks really similar to Spark-13514 to me. I could be 
wrong though

https://issues.apache.org/jira/browse/SPARK-13514

Can you check yarn.nodemanager.local-dirs  in your YARN configuration for 
"file://"


Cheers!
David Newberger

-Original Message-
From: Cui, Weifeng [mailto:weife...@a9.com] 
Sent: Friday, May 20, 2016 4:26 PM
To: Marcelo Vanzin
Cc: Ted Yu; Rodrick Brown; user; Zhao, Jun; Aulakh, Sahib; Song, Yiwei
Subject: Re: Can not set spark dynamic resource allocation

Sorry, here is the node-manager log. application_1463692924309_0002 is my test. 
Hope this will help.
http://pastebin.com/0BPEcgcW



On 5/20/16, 2:09 PM, "Marcelo Vanzin"  wrote:

>Hi Weifeng,
>
>That's the Spark event log, not the YARN application log. You get the 
>latter using the "yarn logs" command.
>
>On Fri, May 20, 2016 at 1:14 PM, Cui, Weifeng  wrote:
>> Here is the application log for this spark job.
>>
>> http://pastebin.com/2UJS9L4e
>>
>>
>>
>> Thanks,
>> Weifeng
>>
>>
>>
>>
>>
>> From: "Aulakh, Sahib" 
>> Date: Friday, May 20, 2016 at 12:43 PM
>> To: Ted Yu 
>> Cc: Rodrick Brown , Cui Weifeng 
>> , user , "Zhao, Jun"
>> 
>> Subject: Re: Can not set spark dynamic resource allocation
>>
>>
>>
>> Yes it is yarn. We have configured spark shuffle service w yarn node 
>> manager but something must be off.
>>
>>
>>
>> We will send u app log on paste bin.
>>
>> Sent from my iPhone
>>
>>
>> On May 20, 2016, at 12:35 PM, Ted Yu  wrote:
>>
>> Since yarn-site.xml was cited, I assume the cluster runs YARN.
>>
>>
>>
>> On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown 
>>  wrote:
>>
>> Is this Yarn or Mesos? For the later you need to start an external 
>> shuffle service.
>>
>> Get Outlook for iOS
>>
>>
>>
>>
>>
>> On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" 
>> 
>> wrote:
>>
>> Hi guys,
>>
>>
>>
>> Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set 
>> dynamic resource allocation for spark and we followed the following 
>> link. After the changes, all spark jobs failed.
>>
>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-reso
>> urce-allocation
>>
>> This test was on a test cluster which has 1 master machine (running 
>> namenode, resourcemanager and hive server), 1 worker machine (running 
>> datanode and nodemanager) and 1 machine as client( running spark shell).
>>
>>
>>
>> What I updated in config :
>>
>>
>>
>> 1. Update in spark-defaults.conf
>>
>> spark.dynamicAllocation.enabled true
>>
>> spark.shuffle.service.enabledtrue
>>
>>
>>
>> 2. Update yarn-site.xml
>>
>> 
>>
>>  yarn.nodemanager.aux-services
>>   mapreduce_shuffle,spark_shuffle
>> 
>>
>> 
>> yarn.nodemanager.aux-services.spark_shuffle.class
>> org.apache.spark.network.yarn.YarnShuffleService
>> 
>>
>> 
>> spark.shuffle.service.enabled
>>  true
>> 
>>
>> 3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath
>> ($HADOOP_HOME/share/hadoop/yarn/*) in python code
>>
>> 4. Restart namenode, datanode, resourcemanager, nodemanger... retart 
>> everything
>>
>> 5. The config will update in all machines, resourcemanager and nodemanager.
>> We update the config in one place and copy to all machines.
>>
>>
>>
>> What I tested:
>>
>>
>>
>> 1. I started a scala spark shell and check its environment variables, 
>> spark.dynamicAllocation.enabled is true.
>>
>> 2. I used the following code:
>>
>> scala > val line =
>> sc.textFile("/spark-events/application_1463681113470_0006")
>>
>> line: org.apache.spark.rdd.RDD[String] =
>> /spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at 
>> textFile at :27
>>
>> scala > line.count # This command just stuck here
>>
>>
>>
>> 3. In the beginning, there is only 1 executor(this is for driver) and 
>> after line.count, I could see 3 executors, then dropped to 1.
>>
>> 4. Several jobs were launched and all of them failed.   Tasks (for all
>&

RE: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image

2016-05-31 Thread David Newberger
Is 
https://github.com/alonsoir/awesome-recommendation-engine/blob/master/build.sbt 
  the build.sbt you are using?

David Newberger
QA Analyst
WAND  -  The Future of Restaurant Technology
(W)  www.wandcorp.com<http://www.wandcorp.com/>
(E)   david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com>
(P)   952.361.6200

From: Alonso [mailto:alons...@gmail.com]
Sent: Tuesday, May 31, 2016 11:11 AM
To: user@spark.apache.org
Subject: About a problem when mapping a file located within a HDFS vmware 
cdh-5.7 image


I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X 
as my development machine, and the cdh image to run the code, i upload the code 
using git to the cdh image, i have modified my /etc/hosts file located in the 
cdh image with a line like this:

127.0.0.1   quickstart.cloudera quickstart  localhost   
localhost.domain



192.168.30.138   quickstart.cloudera quickstart  localhost   
localhost.domain

The cloudera version that i am running is:

[cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties



# Autogenerated build properties

version=2.6.0-cdh5.7.0

git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a

cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a

cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1

cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76

cloudera.base-branch=cdh5-base-2.6.0

cloudera.build-branch=cdh5-2.6.0_5.7.0

cloudera.pkg.version=2.6.0+cdh5.7.0+1280

cloudera.pkg.release=1.cdh5.7.0.p0.92

cloudera.cdh.release=cdh5.7.0

cloudera.build.time=2016.03.23-18:30:29GMT

I can do a ls command in the vmware machine:

[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv

-rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 
/user/cloudera/ratings.csv

I can read its content:

[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l

568454

The code is quite simple, just trying to map its content:

val ratingFile="hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv"



case class AmazonRating(userId: String, productId: String, rating: Double)



val NumRecommendations = 10

val MinRecommendationsPerUser = 10

val MaxRecommendationsPerUser = 20

val MyUsername = "myself"

val NumPartitions = 20





println("Using this ratingFile: " + ratingFile)

  // first create an RDD out of the rating file

val rawTrainingRatings = sc.textFile(ratingFile).map {

line =>

  val Array(userId, productId, scoreStr) = line.split(",")

  AmazonRating(userId, productId, scoreStr.toDouble)

}



  // only keep users that have rated between MinRecommendationsPerUser and 
MaxRecommendationsPerUser products

val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => 
MinRecommendationsPerUser <= r._2.size  && r._2.size < 
MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache()



println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of 
${rawTrainingRatings.count()}")

I am getting this message:

Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 0 
ratings out of 568454

because if i run the exact code within the spark-shell, i got this message:

Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 
ratings out of 568454

Why is it working fine within the spark-shell but it is not running fine 
programmatically  in the vmware image?

I am running the code using sbt-pack plugin to generate unix commands and run 
them within the vmware image which has the spark pseudocluster,

This is the code i use to instantiate the sparkconf:

val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")

   
.setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true")

val sc = new SparkContext(sparkConf)

val sqlContext = new SQLContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))

//this checkpointdir should be in a conf file, for now it is hardcoded!

val streamingCheckpointDir = 
"/home/cloudera/my-recommendation-spark-engine/checkpoint"

ssc.checkpoint(streamingCheckpointDir)

I have tried to use this way of setting spark master, but an exception raises, 
i suspect that this is symptomatic of my problem.  
//.setMaster("spark://quickstart.cloudera:7077")

The exception when i try to use the fully qualified domain name:

.setMaster("spark://quickstart.cloudera:7077")



java.io.IOException: Failed to connect to 
quickstart.cloudera/127.0.0.1:7077<http://127.0.0.1:7077>

at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)

at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)

at 
org.a

RE: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image

2016-05-31 Thread David Newberger
Have you tried it without either of the setMaster lines?

Also, CDH 5.7 uses spark 1.6.0 with some patches. I would recommend using the 
cloudera repo for spark files in build sbt. I’d also check other files in the 
build sbt to see if there are cdh specific versions.

David Newberger

From: Alonso Isidoro Roman [mailto:alons...@gmail.com]
Sent: Tuesday, May 31, 2016 1:23 PM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: About a problem when mapping a file located within a HDFS vmware 
cdh-5.7 image

Hi David, the one of the develop branch. I think It should be the same, but 
actually not sure...

Regards

Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2016-05-31 19:40 GMT+02:00 David Newberger 
mailto:david.newber...@wandcorp.com>>:
Is 
https://github.com/alonsoir/awesome-recommendation-engine/blob/master/build.sbt 
  the build.sbt you are using?

David Newberger
QA Analyst
WAND  -  The Future of Restaurant Technology
(W)  www.wandcorp.com<http://www.wandcorp.com/>
(E)   david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com>
(P)   952.361.6200

From: Alonso [mailto:alons...@gmail.com<mailto:alons...@gmail.com>]
Sent: Tuesday, May 31, 2016 11:11 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: About a problem when mapping a file located within a HDFS vmware 
cdh-5.7 image


I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X 
as my development machine, and the cdh image to run the code, i upload the code 
using git to the cdh image, i have modified my /etc/hosts file located in the 
cdh image with a line like this:

127.0.0.1   quickstart.cloudera quickstart  localhost   
localhost.domain



192.168.30.138   quickstart.cloudera quickstart  localhost   
localhost.domain

The cloudera version that i am running is:

[cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties



# Autogenerated build properties

version=2.6.0-cdh5.7.0

git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a

cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a

cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1

cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76

cloudera.base-branch=cdh5-base-2.6.0

cloudera.build-branch=cdh5-2.6.0_5.7.0

cloudera.pkg.version=2.6.0+cdh5.7.0+1280

cloudera.pkg.release=1.cdh5.7.0.p0.92

cloudera.cdh.release=cdh5.7.0

cloudera.build.time=2016.03.23-18:30:29GMT

I can do a ls command in the vmware machine:

[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv

-rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 
/user/cloudera/ratings.csv

I can read its content:

[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l

568454

The code is quite simple, just trying to map its content:

val ratingFile="hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv"



case class AmazonRating(userId: String, productId: String, rating: Double)



val NumRecommendations = 10

val MinRecommendationsPerUser = 10

val MaxRecommendationsPerUser = 20

val MyUsername = "myself"

val NumPartitions = 20





println("Using this ratingFile: " + ratingFile)

  // first create an RDD out of the rating file

val rawTrainingRatings = sc.textFile(ratingFile).map {

line =>

  val Array(userId, productId, scoreStr) = line.split(",")

  AmazonRating(userId, productId, scoreStr.toDouble)

}



  // only keep users that have rated between MinRecommendationsPerUser and 
MaxRecommendationsPerUser products

val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => 
MinRecommendationsPerUser <= r._2.size  && r._2.size < 
MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache()



println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of 
${rawTrainingRatings.count()}")

I am getting this message:

Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 0 
ratings out of 568454

because if i run the exact code within the spark-shell, i got this message:

Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 
ratings out of 568454

Why is it working fine within the spark-shell but it is not running fine 
programmatically  in the vmware image?

I am running the code using sbt-pack plugin to generate unix commands and run 
them within the vmware image which has the spark pseudocluster,

This is the code i use to instantiate the sparkconf:

val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")

   
.setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true")

val sc = new SparkContext(sparkConf)

val sqlContext = new SQLContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))

//this checkpointdir should be in a conf file

RE: About a problem running a spark job in a cdh-5.7.0 vmware image.

2016-06-03 Thread David Newberger
Alonso,

The CDH VM uses YARN and the default deploy mode is client. I’ve been able to 
use the CDH VM for many learning scenarios.


http://www.cloudera.com/documentation/enterprise/latest.html
http://www.cloudera.com/documentation/enterprise/latest/topics/spark.html

David Newberger

From: Alonso [mailto:alons...@gmail.com]
Sent: Friday, June 3, 2016 5:39 AM
To: user@spark.apache.org
Subject: About a problem running a spark job in a cdh-5.7.0 vmware image.

Hi, i am developing a project that needs to use kafka, spark-streaming and 
spark-mllib, this is the github 
project<https://github.com/alonsoir/awesome-recommendation-engine/tree/develop>.

I am using a vmware cdh-5.7-0 image, with 4 cores and 8 GB of ram, the file 
that i want to use is only 16 MB, if i finding problems related with resources 
because the process outputs this message:

   .set("spark.driver.allowMultipleContexts", 
"true")


<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
16/06/03 11:58:09 WARN TaskSchedulerImpl: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient resources



when i go to spark-master page, i can see this:


Spark Master at spark://192.168.30.137:7077

URL: spark://192.168.30.137:7077
REST URL: spark://192.168.30.137:6066 (cluster mode)
Alive Workers: 0
Cores in use: 0 Total, 0 Used
Memory in use: 0.0 B Total, 0.0 B Used
Applications: 2 Running, 0 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE

Workers
Worker Id Address State Cores Memory
Running Applications
Application ID Name Cores Memory per Node Submitted Time User State Duration
app-20160603115752-0001
(kill)
AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:52 cloudera WAITING 2.0 min
app-20160603115751-
(kill)
AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:51 cloudera WAITING 2.0 min


And this is the spark-worker output:

Spark Worker at 192.168.30.137:7078

ID: worker-20160603115937-192.168.30.137-7078
Master URL:
Cores: 4 (0 Used)
Memory: 6.7 GB (0.0 B Used)

Back to Master
Running Executors (0)
ExecutorID Cores State Memory Job Details Logs

It is weird isn't ? master url is not set up and there is not any ExecutorID, 
Cores, so on so forth...

If i do a ps xa | grep spark, this is the output:

[cloudera@quickstart bin]$ ps xa | grep spark
 6330 ?Sl 0:11 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp 
/usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
 -Dspark.deploy.defaultCores=4 -Xms1g -Xmx1g -XX:MaxPermSize=256m 
org.apache.spark.deploy.master.Master

 6674 ?Sl 0:12 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp 
/etc/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
 -Dspark.history.fs.logDirectory=hdfs:///user/spark/applicationHistory 
-Dspark.history.ui.port=18088 -Xms1g -Xmx1g -XX:MaxPermSize=256m 
org.apache.spark.deploy.history.HistoryServer

 8153 pts/1Sl+0:14 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp 
/home/cloudera/awesome-recommendation-engine/target/pack/lib/* 
-Dprog.home=/home/cloudera/awesome-recommendation-engine/target/pack 
-Dprog.version=1.0-SNAPSHOT example.spark.AmazonKafkaConnector 
192.168.1.35:9092 amazonRatingsTopic

 8413 ?Sl 0:04 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp 
/usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker 
spark://quickstart.cloudera:7077

 8619 pts/3S+ 0:00 grep spark

master is set up with four cores and 1 GB and worker has not any dedicated core 
and it is using 1GB, that is weird isn't ? I have configured the vmware image 
with 4 cores (from eight) 

RE: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-03 Thread David Newberger
What does your processing time look like. Is it consistently within that 20sec 
micro batch window?

David Newberger

From: Adrian Tanase [mailto:atan...@adobe.com]
Sent: Friday, June 3, 2016 8:14 AM
To: user@spark.apache.org
Cc: Cosmin Ciobanu
Subject: [REPOST] Severe Spark Streaming performance degradation after 
upgrading to 1.6.1

Hi all,

Trying to repost this question from a colleague on my team, somehow his 
subscription is not active:
http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html

Appreciate any thoughts,
-adrian


RE: About a problem running a spark job in a cdh-5.7.0 vmware image.

2016-06-03 Thread David Newberger
Alonso, I could totally be misunderstanding something or missing a piece of the 
puzzle however remove .setMaster. If you do that it will run with whatever the 
CDH VM is setup for which in the out of the box default case is YARN and Client.

val sparkConf = new SparkConf().setAppName(“Some App thingy thing”)

From the Spark 1.6.0 Scala API Documentation:
https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.SparkConf


“
Configuration for a Spark application. Used to set various Spark parameters as 
key-value pairs.

Most of the time, you would create a SparkConf object with new SparkConf(), 
which will load values from any spark.* Java system properties set in your 
application as well. In this case, parameters you set directly on the SparkConf 
object take priority over system properties.

For unit tests, you can also call new SparkConf(false) to skip loading external 
settings and get the same configuration no matter what the system properties 
are.

All setter methods in this class support chaining. For example, you can write 
new SparkConf().setMaster("local").setAppName("My app").

Note that once a SparkConf object is passed to Spark, it is cloned and can no 
longer be modified by the user. Spark does not support modifying the 
configuration at runtime.
“

David Newberger

From: Alonso Isidoro Roman [mailto:alons...@gmail.com]
Sent: Friday, June 3, 2016 10:37 AM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: About a problem running a spark job in a cdh-5.7.0 vmware image.

Thank you David, so, i would have to change the way that i am creating  
SparkConf object, isn't?

I can see in this 
link<http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html#concept_ysw_lnp_h5>
 that the way to run a spark job using YARN is using this kind of command:


spark-submit --class org.apache.spark.examples.SparkPi --master yarn \

--deploy-mode client SPARK_HOME/lib/spark-examples.jar 10

Can i use this way programmatically? maybe changing setMaster? to something 
like setMaster("yarn:quickstart.cloudera:8032")?

I have seen the port in this guide: 
http://www.cloudera.com/documentation/enterprise/5-6-x/topics/cdh_ig_ports_cdh5.html






RE: Spark Streaming - long garbage collection time

2016-06-03 Thread David Newberger
Have you tried UseG1GC in place of UseConcMarkSweepGC? This article really 
helped me with GC a few short weeks ago

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html
 

David Newberger

-Original Message-
From: Marco1982 [mailto:marco.plata...@yahoo.it] 
Sent: Friday, June 3, 2016 2:19 PM
To: user@spark.apache.org
Subject: Spark Streaming - long garbage collection time

Hi all,

I'm running a Spark Streaming application with 1-hour batches to join two data 
feeds and write the output to disk. The total size of one data feed is about 40 
GB per hour (split in multiple files), while the size of the second data feed 
is about 600-800 MB per hour (also split in multiple files). Due to application 
constraints, I may not be able to run smaller batches.
Currently, it takes about 20 minutes to produce the output in a cluster with
140 cores and 700 GB of RAM. I'm running 7 workers and 28 executors, each with 
5 cores and 22 GB of RAM.

I execute mapToPair(), filter(), and reduceByKeyAndWindow(1 hour batch) on the 
40 GB data feed. Most of the computation time is spent on these operations. 
What worries me is the Garbage Collection (GC) execution time per executor, 
which goes from 25 secs to 9.2 mins. I attach two screenshots
below: one lists the GC time and one prints out GC comments for a single 
executor. I anticipate that the executor that spends 9.2 mins in doing garbage 
collection is eventually killed by the Spark driver.

I think these numbers are too high. Do you have any suggestion about keeping GC 
time low? I'm already using Kryo Serializer, ++UseConcMarkSweepGC, and 
spark.rdd.compress=true.

Is there anything else that would help?

Thanks
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n27087/gc_time.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n27087/executor_16.png>
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-long-garbage-collection-time-tp27087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread David Newberger
I was going to ask if you had 2 jobs running. If the checkpointing for both are 
setup to look at the same location I could see an error like this happening. Do 
both spark jobs have a reference to a checkpointing dir?

David Newberger

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, June 3, 2016 3:20 PM
To: user @spark
Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp 
(inode 806125): File does not exist.

OK

I was running two spark streaming jobs, one using streaming data from Kafka and 
another from twitter in local mode on the same node.

It is possible that the directory /user/hduser/checkpoint/temp is  shared by 
both spark streaming jobs

any experience on this please?

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 3 June 2016 at 20:48, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
Hi,

Just started seeing these errors:

16/06/03 20:30:01 ERROR DFSClient: Failed to close inode 806125
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. 
[Lease.  Holder: DFSClient_NONMAPREDUCE_-907736468_1, pendingcreates: 1]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)


Sounds like a connection is left open but cannot establish why!

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>





RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread David Newberger
Hi Mich,

My gut says you are correct that each application should have its own 
checkpoint directory. Though honestly I’m a bit fuzzy on checkpointing still as 
I’ve not worked with it much yet.

Cheers,

David Newberger

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, June 3, 2016 3:40 PM
To: David Newberger
Cc: user @spark
Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp 
(inode 806125): File does not exist.

Hi David

yes they do

The  first streaming job does

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

And the twitter does

  /** Returns the HDFS URL */
  def getCheckpointDirectory(): String = {
try {
  val name : String = Seq("bash", "-c", "curl -s 
http://169.254.169.254/latest/meta-data/hostname";) !! ;
  println("Hostname = " + name)
  "hdfs://" + name.trim + ":9000/checkpoint/"
} catch {
  case e: Exception => {
"./checkpoint/"
  }
}

I need to change one of these.

Actually a better alternative would be that each application has its own 
checkpoint?

THanks




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 3 June 2016 at 21:23, David Newberger 
mailto:david.newber...@wandcorp.com>> wrote:
I was going to ask if you had 2 jobs running. If the checkpointing for both are 
setup to look at the same location I could see an error like this happening. Do 
both spark jobs have a reference to a checkpointing dir?

David Newberger

From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>]
Sent: Friday, June 3, 2016 3:20 PM
To: user @spark
Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp 
(inode 806125): File does not exist.

OK

I was running two spark streaming jobs, one using streaming data from Kafka and 
another from twitter in local mode on the same node.

It is possible that the directory /user/hduser/checkpoint/temp is  shared by 
both spark streaming jobs

any experience on this please?

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 3 June 2016 at 20:48, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
Hi,

Just started seeing these errors:

16/06/03 20:30:01 ERROR DFSClient: Failed to close inode 806125
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. 
[Lease.  Holder: DFSClient_NONMAPREDUCE_-907736468_1, pendingcreates: 1]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)


Sounds like a connection is left open but cannot establish why!

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>






RE: Creating a Hive table through Spark and potential locking issue (a bug)

2016-06-08 Thread David Newberger
Could you be looking at 2 jobs trying to use the same file and one getting to 
it before the other and finally removing it?

David Newberger

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Wednesday, June 8, 2016 1:33 PM
To: user; user @spark
Subject: Creating a Hive table through Spark and potential locking issue (a bug)


Hi,

I noticed an issue with Spark creating and populating a Hive table.

The process as I see is as follows:


  1.  Spark creates the Hive table. In this case an ORC table in a Hive Database
  2.  Spark uses JDBC connection to get data out from an Oracle
  3.  I create a temp table in Spark through (registerTempTable)
  4.  Spark populates that table. That table is actually created in

   hdfs dfs -ls /tmp/hive/hduser
   drwx--   - hduser supergroup
   /tmp/hive/hduser/b1ea6829-790f-4b37-a0ff-3ed218388059



  1.  However, The original table itself does not have any locking on it!
  2.  I log in into Hive and drop that table

3. hive> drop table dummy;
OK


  1.   That table is dropped OK
  2.  Spark crashes with message
Started at
[08/06/2016 18:37:53.53]
16/06/08 19:13:46 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on 
/user/hive/warehouse/oraclehadoop.db/dummy/.hive-staging_hive_2016-06-08_18-38-08_804_3299712811201460314-1/-ext-1/_temporary/0/_temporary/attempt_201606081838_0001_m_00_0/part-0
 (inode 831621): File does not exist. Holder 
DFSClient_NONMAPREDUCE_-1836386597_1 does not have any open files.
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)
at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399)
at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1532)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1349)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588)
16/06/08 19:13:46 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; 
aborting job

Suggested solution.
In a concurrent env, Spark should apply locks in order to prevent such 
operations. Locks are kept in Hive meta data table HIVE_LOCKS

HTH
Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>




RE: streaming example has error

2016-06-15 Thread David Newberger
Have you tried to “set spark.driver.allowMultipleContexts = true”?

David Newberger

From: Lee Ho Yeung [mailto:jobmatt...@gmail.com]
Sent: Tuesday, June 14, 2016 8:34 PM
To: user@spark.apache.org
Subject: streaming example has error

when simulate streaming with nc -lk 
got error below,
then i try example,

martin@ubuntu:~/Downloads$ /home/martin/Downloads/spark-1.6.1/bin/run-example 
streaming.NetworkWordCount localhost 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/14 18:33:06 INFO StreamingExamples: Setting log level to [WARN] for 
streaming example. To override add a custom log4j.properties to the classpath.
16/06/14 18:33:06 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
16/06/14 18:33:06 WARN Utils: Your hostname, ubuntu resolves to a loopback 
address: 127.0.1.1; using 192.168.157.134 instead (on interface eth0)
16/06/14 18:33:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
16/06/14 18:33:13 WARN SizeEstimator: Failed to check whether UseCompressedOops 
is set; assuming yes

got error too.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
ssc.start()
ssc.awaitTermination()



scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._

scala> val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
conf: org.apache.spark.SparkConf = 
org.apache.spark.SparkConf@67bcaf<mailto:org.apache.spark.SparkConf@67bcaf>

scala> val ssc = new StreamingContext(conf, Seconds(1))
16/06/14 18:28:44 WARN AbstractLifeCycle: FAILED 
SelectChannelConnector@0.0.0.0:4040<http://SelectChannelConnector@0.0.0.0:4040>:
 java.net.BindException: Address already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at 
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.eclipse.jetty.server.Server.doStart(Server.java:293)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at 
org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:252)
at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262)
at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1988)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1979)
at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:262)
at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext.(SparkContext.scala:481)
at 
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874)
at 
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:81)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
  

RE: Handle empty kafka in Spark Streaming

2016-06-15 Thread David Newberger
If you're asking how to handle no messages in a batch window then I would add 
an isEmpty check like:

dStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) 
...
}

Or something like that. 


David Newberger

-Original Message-
From: Yogesh Vyas [mailto:informy...@gmail.com] 
Sent: Wednesday, June 15, 2016 6:31 AM
To: user
Subject: Handle empty kafka in Spark Streaming

Hi,

Does anyone knows how to handle empty Kafka while Spark Streaming job is 
running ?

Regards,
Yogesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: Handle empty kafka in Spark Streaming

2016-06-15 Thread David Newberger
Hi Yogesh,

I'm not sure if this is possible or not. I'd be interested in knowing. My gut 
thinks it would be an anti-pattern if it's possible to do something like this 
and that's why I handle it in either the foreachRDD or foreachPartition. The 
way I look at spark streaming is as an application which is always running and 
doing something like windowed batching or microbatching or whatever I'm trying 
to accomplish. IF an RDD I get from Kafka is empty then I don't run the rest of 
the job.  IF the RDD I'm get from Kafka has some number of events then I'll 
process the RDD further. 

David Newberger

-Original Message-
From: Yogesh Vyas [mailto:informy...@gmail.com] 
Sent: Wednesday, June 15, 2016 8:30 AM
To: David Newberger
Subject: Re: Handle empty kafka in Spark Streaming

I am looking for something which checks the JavaPairReceiverInputDStreambefore 
further going for any operations.
For example, if I have get JavaPairReceiverInputDStream in following
manner:

JavaPairReceiverInputDStream 
message=KafkaUtils.createStream(ssc, zkQuorum, group, topics, 
StorageLevel.MEMORY_AND_DISK_SER());

Then I would like check whether message is empty or not. If it not empty then 
go for further operations else wait for some data in Kafka.

On Wed, Jun 15, 2016 at 6:31 PM, David Newberger  
wrote:
> If you're asking how to handle no messages in a batch window then I would add 
> an isEmpty check like:
>
> dStream.foreachRDD(rdd => {
> if (!rdd.isEmpty())
> ...
> }
>
> Or something like that.
>
>
> David Newberger
>
> -Original Message-
> From: Yogesh Vyas [mailto:informy...@gmail.com]
> Sent: Wednesday, June 15, 2016 6:31 AM
> To: user
> Subject: Handle empty kafka in Spark Streaming
>
> Hi,
>
> Does anyone knows how to handle empty Kafka while Spark Streaming job is 
> running ?
>
> Regards,
> Yogesh
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
>


RE: Limit pyspark.daemon threads

2016-06-15 Thread David Newberger
Have you tried setting spark.cores.max

“When running on a standalone deploy 
cluster<http://spark.apache.org/docs/latest/spark-standalone.html> or a Mesos 
cluster in "coarse-grained" sharing 
mode<http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-run-modes>,
 the maximum amount of CPU cores to request for the application from across the 
cluster (not from each machine). If not set, the default will 
bespark.deploy.defaultCores on Spark's standalone cluster manager, or infinite 
(all available cores) on Mesos.”

David Newberger

From: agateaaa [mailto:agate...@gmail.com]
Sent: Wednesday, June 15, 2016 4:39 PM
To: Gene Pang
Cc: Sven Krasser; Carlile, Ken; user
Subject: Re: Limit pyspark.daemon threads

Thx Gene! But my concern is with CPU usage not memory. I want to see if there 
is anyway to control the number of pyspark.daemon processes that get spawned. 
We have some restriction on number of CPU's we can use on a node, and number of 
pyspark.daemon processes that get created dont seem to honor 
spark.executor.cores property setting
Thanks!

On Wed, Jun 15, 2016 at 1:53 PM, Gene Pang 
mailto:gene.p...@gmail.com>> wrote:
As Sven mentioned, you can use Alluxio to store RDDs in off-heap memory, and 
you can then share that RDD across different jobs. If you would like to run 
Spark on Alluxio, this documentation can help: 
http://www.alluxio.org/documentation/master/en/Running-Spark-on-Alluxio.html

Thanks,
Gene

On Tue, Jun 14, 2016 at 12:44 AM, agateaaa 
mailto:agate...@gmail.com>> wrote:
Hi,
I am seeing this issue too with pyspark (Using Spark 1.6.1).  I have set 
spark.executor.cores to 1, but I see that whenever streaming batch starts 
processing data, see python -m pyspark.daemon processes increase gradually to 
about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon takes 
up around 100 % CPU)
After the processing is done 4 pyspark.daemon processes go away and we are left 
with one till the next batch run. Also sometimes the  CPU usage for executor 
process spikes to about 800% even though spark.executor.core is set to 1
e.g. top output
PID USER  PR   NI  VIRT  RES  SHR S   %CPU %MEMTIME+  COMMAND
19634 spark 20   0 8871420 1.790g  32056 S 814.1  2.9   0:39.33 /usr/lib/j+ 
<--EXECUTOR

13897 spark 20   0   46576  17916   6720 S   100.0  0.0   0:00.17 python -m 
+ <--pyspark.daemon
13991 spark 20   0   46524  15572   4124 S   98.0  0.0   0:08.18 python -m 
+ <--pyspark.daemon
14488 spark 20   0   46524  15636   4188 S   98.0  0.0   0:07.25 python -m 
+ <--pyspark.daemon
14514 spark 20   0   46524  15636   4188 S   94.0  0.0   0:06.72 python -m 
+ <--pyspark.daemon
14526 spark 20   0   48200  17172   4092 S   0.0  0.0   0:00.38 python -m + 
<--pyspark.daemon


Is there any way to control the number of pyspark.daemon processes that get 
spawned ?
Thank you
Agateaaa

On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser 
mailto:kras...@gmail.com>> wrote:
Hey Ken,

1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap 
storage option using Alluxio, formerly Tachyon, with which I have no experience 
however.)

2. The worker memory setting is not a hard maximum unfortunately. What happens 
is that during aggregation the Python daemon will check its process size. If 
the size is larger than this setting, it will start spilling to disk. I've seen 
many occasions where my daemons grew larger. Also, you're relying on Python's 
memory management to free up space again once objects are evicted. In practice, 
leave this setting reasonably small but make sure there's enough free memory on 
the machine so you don't run into OOM conditions. If the lower memory setting 
causes strains for your users, make sure they increase the parallelism of their 
jobs (smaller partitions meaning less data is processed at a time).

3. I believe that is the behavior you can expect when setting 
spark.executor.cores. I've not experimented much with it and haven't looked at 
that part of the code, but what you describe also reflects my understanding. 
Please share your findings here, I'm sure those will be very helpful to others, 
too.

One more suggestion for your users is to move to the Pyspark DataFrame API. 
Much of the processing will then happen in the JVM, and you will bump into 
fewer Python resource contention issues.

Best,
-Sven


On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken 
mailto:carli...@janelia.hhmi.org>> wrote:
This is extremely helpful!

I’ll have to talk to my users about how the python memory limit should be 
adjusted and what their expectations are. I’m fairly certain we bumped it up in 
the dark past when jobs were failing because of insufficient memory for the 
python processes.

So just to make sure I’m understanding correctly:


  *   JVM memory (set by SPARK_EXECUTOR_MEMORY and/or SPARK_WORKER_MEMORY?) is 
wher

RE: streaming example has error

2016-06-16 Thread David Newberger
Try adding wordCounts.print() before ssc.start()


David Newberger

From: Lee Ho Yeung [mailto:jobmatt...@gmail.com]
Sent: Wednesday, June 15, 2016 9:16 PM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: streaming example has error

got another error StreamingContext: Error starting the context, marking it as 
stopped

/home/martin/Downloads/spark-1.6.1/bin/spark-shell --packages 
com.databricks:spark-csv_2.11:1.4.0
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount").set("spark.driver.allowMultipleContexts",
 "true")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
ssc.start()
ssc.awaitTermination()



scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = 
org.apache.spark.streaming.dstream.MappedDStream@61a5e7<mailto:org.apache.spark.streaming.dstream.MappedDStream@61a5e7>

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = 
org.apache.spark.streaming.dstream.ShuffledDStream@a522f1<mailto:org.apache.spark.streaming.dstream.ShuffledDStream@a522f1>

scala> ssc.start()
16/06/15 19:14:10 ERROR StreamingContext: Error starting the context, marking 
it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations 
registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:44)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:46)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:52)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
at $line42.$read$$iwC$$iwC$$iwC$$iwC.(:62)
at $line42.$read$$iwC$$iwC$$iwC.(:64)
at $line42.$read$$iwC$$iwC.(:66)
at $line42.$read$$iwC.(:68)
at $line42.$read.(:70)
at $line42.$read$.(:74)
at $line42.$read$.()
at $line42.$eval$.(:7)
at $line42.$eval$.()
at $line42.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at 
org.apache.spark.repl.SparkILoop.org<http://org.apache.spark.repl.SparkILoop.org>$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$proces

RE: difference between dataframe and dataframwrite

2016-06-16 Thread David Newberger
DataFrame is a collection of data which is organized into named columns.
DataFrame.write is an interface for saving the contents of a DataFrame to 
external storage.

Hope this helps

David Newberger


From: pseudo oduesp [mailto:pseudo20...@gmail.com]
Sent: Thursday, June 16, 2016 9:43 AM
To: user@spark.apache.org
Subject: difference between dataframe and dataframwrite

hi,

what is difference between dataframe and dataframwrite ?



RE: HBase-Spark Module

2016-07-29 Thread David Newberger
Hi Ben,

This seems more like a question for community.cloudera.com. However, it would 
be in hbase not spark I believe. 

https://repository.cloudera.com/artifactory/webapp/#/artifacts/browse/tree/General/cloudera-release-repo/org/apache/hbase/hbase-spark

David Newberger


-Original Message-
From: Benjamin Kim [mailto:bbuil...@gmail.com] 
Sent: Friday, July 29, 2016 12:57 PM
To: user@spark.apache.org
Subject: HBase-Spark Module

I would like to know if anyone has tried using the hbase-spark module? I tried 
to follow the examples in conjunction with CDH 5.8.0. I cannot find the 
HBaseTableCatalog class in the module or in any of the Spark jars. Can someone 
help?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Need Advice: Spark-Streaming Setup

2016-08-01 Thread David Kaufman
Hi,

I'm currently working on Spark, HBase-Setup which processes log files (~10
GB/day). These log files are persisted hourly on n > 10 application servers
and copied to a 4 node hdfs.

Our current spark-job aggregates single visits (based on a session-uuid)
across all application-servers on a daily basis. Visits are filtered (only
about 1% of data remains) and stored in an HBase for further processing.

Currently there is no use of the Spark-Streaming API, i.e. a cronjob runs
every day and fires the visit calculation.

Questions
1) Ist it really necessary to store the log files in the HDFS or can spark
somehow read the files from a local file system and distribute the data to
the other nodes? Rationale: The data is (probably) only read once during
the visit calculation which defies the purpose of a dfs.

2) If the raw log files have to be in the HDFS, I have to remove the files
from the HDFS after processing them, so COPY -> PROCESS -> REMOVE. Is this
the way to go?

3) Before I can process a visit for an hour. I have to wait until all log
files of all application servers have been copied to the HDFS. It doesn't
seem like StreamingContext.fileStream can wait for more sophisticated
patterns, e.g. ("context*/logs-2016-08-01-15"). Do you guys have a
recommendation to solve this problem? One possible solution: After the
files have been copied, create an additional file that indicates spark that
all files are available?

If you have any questions, please don't hesitate to ask.

Thanks,
David


Spark on YARN multitenancy

2015-12-15 Thread David Fox
 Hello Spark experts,

We are currently evaluating Spark on our cluster that already supports MRv2
over YARN.

We have noticed a problem with running jobs concurrently, in particular
that a running Spark job will not release its resources until the job is
finished. Ideally, if two people run any combination of MRv2 and Spark
jobs, the resources should be fairly distributed.

I have noticed a feature called "dynamic resource allocation" in Spark 1.2,
but this does not seem to be solving the problem, because it releases
resources only when Spark is IDLE, not while it's BUSY. What I am looking
for is similar approch to MapReduce where a new user obtains fair share of
resources

I haven't been able to locate any further information on this matter. On
the other hand, I feel this must be pretty common issue for a lot of users.

So,

   1. What is your experience when dealing with multitenant (multiple
   users) Spark cluster with YARN?
   2. Is Spark architectually adept to support releasing resources while
   it's busy? Is this a planned feature or is it something that conflicts with
   the idea of Spark executors?

Thanks


RE: fishing for help!

2015-12-21 Thread David Newberger
Hi Eran,

Based on the limited information the first things that come to my mind are 
Processor, RAM, and Disk speed.

David Newberger
QA Analyst

WAND - The Future of Restaurant Technology
(W) www.wandcorp.com<http://www.wandcorp.com/>
(E) david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com>
(P) 952.361.6200

From: Eran Witkon [mailto:eranwit...@gmail.com]
Sent: Monday, December 21, 2015 6:54 AM
To: user
Subject: fishing for help!

Hi,
I know it is a wide question but can you think of reasons why a pyspark job 
which runs on from server 1 using user 1 will run faster then the same job when 
running on server 2 with user 1
Eran


Fat jar can't find jdbc

2015-12-21 Thread David Yerrington
Hi Everyone,

I'm building a prototype that fundamentally grabs data from a MySQL
instance, crunches some numbers, and then moves it on down the pipeline.
I've been using SBT with assembly tool to build a single jar for deployment.

I've gone through the paces of stomping out many dependency problems and
have come down to one last (hopefully) zinger.

java.lang.ClassNotFoundException: Failed to load class for data source:
> jdbc.
>
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
>
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
>
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
>
> at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203)
>
> at her.recommender.getDataframe(her.recommender.scala:45)
>
> at her.recommender.getRecommendations(her.recommender.scala:60)
>

I'm assuming this has to do with mysql-connector because this is the
problem I run into when I'm working with spark-shell and I forget to
include my classpath with my mysql-connect jar file.

I've tried:

   - Using different versions of mysql-connector-java in my build.sbt file
   - Copying the connector jar to my_project/src/main/lib
   - Copying the connector jar to my_project/lib <-- (this is where I keep
   my build.sbt)

Everything loads fine and works, except my call that does
"sqlContext.load("jdbc", myOptions)".  I know this is a total newbie
question but in my defense, I'm fairly new to Scala, and this is my first
go at deploying a fat jar with sbt-assembly.

Thanks for any advice!

-- 
David Yerrington
yerrington.net


Re: Fat jar can't find jdbc

2015-12-22 Thread David Yerrington
Sure here it is:

import AssemblyKeys._

assemblySettings

// assemblyJarName in assembly := "recommender.jar"

test in assembly := {}

organization  := "com.example"

version   := "0.1"

scalaVersion  := "2.11.6"

scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")

libraryDependencies ++= {
  val akkaV = "2.3.9"
  val sprayV = "1.3.3"
  Seq(
"org.apache.spark" %% "spark-core" % "1.5.2",
"org.apache.spark" %% "spark-sql" % "1.5.2",
"org.apache.spark" %% "spark-hive" % "1.5.2",
"org.apache.spark" %% "spark-streaming" % "1.5.2",
"org.apache.spark" %% "spark-streaming-kafka" % "1.5.2",
"org.apache.spark" %% "spark-streaming-flume" % "1.5.2",
"org.apache.spark" %% "spark-mllib" % "1.5.2",
"org.apache.commons" % "commons-lang3" % "3.0",
"io.spray"%%  "spray-can" % sprayV,
"io.spray"%%  "spray-routing" % sprayV,
"io.spray"%%  "spray-testkit" % sprayV  % "test",
"io.spray"%%  "spray-json"% "1.3.2",
"com.typesafe.akka"   %%  "akka-actor"% akkaV,
"com.typesafe.akka"   %%  "akka-testkit"  % akkaV   % "test",
"com.zaxxer"  %   "HikariCP-java6"% "2.3.3",
"com.typesafe.slick"  %%  "slick" % "3.1.0",
"org.specs2"  %%  "specs2-core"   % "2.3.11" % "test",
"mysql"   %   "mysql-connector-java" % "5.1.35",
"org.slf4j"   %   "slf4j-nop" % "1.6.4",
"net.liftweb" %%  "lift-json" % "2.6+",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.4.0"
  )
}

// For the jackson resolves business
resolvers += "Sonatype OSS Snapshots" at "
https://oss.sonatype.org/content/repositories/snapshots";

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.startsWith("META-INF") => MergeStrategy.discard
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
case "about.html"  => MergeStrategy.rename
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
  }
}

Revolver.settings




Also, when I run "assembly", I see this, without warning / errors:

...

[info] Including: datanucleus-rdbms-3.2.9.jar
[info] Including: reactive-streams-1.0.0.jar
*[info] Including: mysql-connector-java-5.1.35.jar*
[info] Including: commons-pool-1.5.4.jar
[info] Including: commons-dbcp-1.4.jar

...


On Tue, Dec 22, 2015 at 12:04 AM, Vijay Kiran  wrote:

> Can you paste your libraryDependencies from build.sbt ?
>
> ./Vijay
>
> > On 22 Dec 2015, at 06:12, David Yerrington  wrote:
> >
> > Hi Everyone,
> >
> > I'm building a prototype that fundamentally grabs data from a MySQL
> instance, crunches some numbers, and then moves it on down the pipeline.
> I've been using SBT with assembly tool to build a single jar for deployment.
> >
> > I've gone through the paces of stomping out many dependency problems and
> have come down to one last (hopefully) zinger.
> >
> > java.lang.ClassNotFoundException: Failed to load class for data source:
> jdbc.
> >
> > at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
> >
> > at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
> >
> > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
> >
> > at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203)
> >
> > at her.recommender.getDataframe(her.recommender.scala:45)
> >
> > at her.recommender.getRecommendations(her.recommender.scala:60)
> >
> >
> > I'm assuming this has to do with mysql-connector because this is the
> problem I run into when I'm working with spark-sh

Re: Fat jar can't find jdbc

2015-12-22 Thread David Yerrington
Igor, I think it's available.  After I extract the jar file, I see a
directory with class files that look very relevant in "/com/mysql/jdbc".

After reading this, I started to wonder if MySQL connector was really the
problem.  Perhaps it's something to do with SQLcontext?  I just wired a
test endpoint to run a very basic mysql query, outside of Spark, and it
worked just fine (yay!).  I copied and pasted this example to verify my
MySQL connector availability, and it worked just fine:
https://mkaz.github.io/2011/05/27/using-scala-with-jdbc-to-connect-to-mysql/

As far as the Maven manifest goes, I'm really not sure.  I will research it
though.  Now I'm wondering if my mergeStrategy is to blame?  I'm going to
try there next.

Thank you for the help!

On Tue, Dec 22, 2015 at 1:18 AM, Igor Berman  wrote:

> David, can you verify that mysql connector classes indeed in your single
> jar?
> open it with zip tool available at your platform
>
> another options that might be a problem - if there is some dependency in
> MANIFEST(not sure though this is the case of mysql connector) then it might
> be broken after preparing single jar
> so you need to verify that it's ok(in maven usually it's possible to
> define merging policy for resources while creating single jar)
>
> On 22 December 2015 at 10:04, Vijay Kiran  wrote:
>
>> Can you paste your libraryDependencies from build.sbt ?
>>
>> ./Vijay
>>
>> > On 22 Dec 2015, at 06:12, David Yerrington 
>> wrote:
>> >
>> > Hi Everyone,
>> >
>> > I'm building a prototype that fundamentally grabs data from a MySQL
>> instance, crunches some numbers, and then moves it on down the pipeline.
>> I've been using SBT with assembly tool to build a single jar for deployment.
>> >
>> > I've gone through the paces of stomping out many dependency problems
>> and have come down to one last (hopefully) zinger.
>> >
>> > java.lang.ClassNotFoundException: Failed to load class for data source:
>> jdbc.
>> >
>> > at
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
>> >
>> > at
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
>> >
>> > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
>> >
>> > at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203)
>> >
>> > at her.recommender.getDataframe(her.recommender.scala:45)
>> >
>> > at her.recommender.getRecommendations(her.recommender.scala:60)
>> >
>> >
>> > I'm assuming this has to do with mysql-connector because this is the
>> problem I run into when I'm working with spark-shell and I forget to
>> include my classpath with my mysql-connect jar file.
>> >
>> > I've tried:
>> >   • Using different versions of mysql-connector-java in my
>> build.sbt file
>> >   • Copying the connector jar to my_project/src/main/lib
>> >   • Copying the connector jar to my_project/lib <-- (this is where
>> I keep my build.sbt)
>> > Everything loads fine and works, except my call that does
>> "sqlContext.load("jdbc", myOptions)".  I know this is a total newbie
>> question but in my defense, I'm fairly new to Scala, and this is my first
>> go at deploying a fat jar with sbt-assembly.
>> >
>> > Thanks for any advice!
>> >
>> > --
>> > David Yerrington
>> > yerrington.net
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
David Yerrington
yerrington.net


Problem About Worker System.out

2015-12-28 Thread David John
I have used  Spark 1.4  for 6 months.  Thanks  all the members of this 
community for your great work.I have a question  about the logging issue. I 
hope this question can be solved.
The program is running under this configurations: YARN Cluster, YARN-client 
mode.
In Scala,writing a code like:rdd.map( a => println(a) );   will get the output 
about the value of a in our console.
However,in Java (1.7),writing rdd.map( new Function(){ 
@Override public  Integer   call(Integer a) throws Exception {  
System.out.println(a); }});won't get the output in our console.
The configuration is the same.
I have try this code but not work either: rdd.map( new 
Function(){ @Override public  Integer   call(Integer 
a) throws Exception {org.apache.log4j.Logger log = 
Logger.getLogger(this.getClass()); log.info(a); 
log.warn(a); log.error(a); log.fatal(a); }});
No output either:final   org.apache.log4j.Logger log = 
Logger.getLogger(this.getClass()); rdd.map( new Function(){
 @Override public  Integer   call(Integer a) throws Exception { 
log.info(a); log.warn(a); log.error(a); 
log.fatal(a); }});
It seems that the output of stdout in worker doesn't send the output back to 
our driver.I am wonder why it works in scala but not in java.Is there a  simple 
way to make java work like scala?
Thanks. 
  

FW: Problem About Worker System.out

2015-12-28 Thread David John




Thanks.
Can we use a slf4j/log4j logger to transfer our message from a worker  to a  
driver?I saw some discussions say that we can use this code to transfer their 
message:object Holder extends Serializable {  
   @transient lazy val log = Logger.getLogger(getClass.getName)
}


val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element =>
   Holder.log.info(element)
}ref: 
http://stackoverflow.com/questions/29208844/apache-spark-logging-within-scala

Is this a traditional way?Or Spark has a SocketAppender for developer?Date: 
Mon, 28 Dec 2015 17:52:17 +0800
Subject: Re: Problem About Worker System.out
From: sai.sai.s...@gmail.com
To: david_john_2...@outlook.com
CC: user@spark.apache.org

Stdout will not be sent back to driver, no matter you use Scala or Java. You 
must do something wrongly that makes you think it is an expected behavior.
On Mon, Dec 28, 2015 at 5:33 PM, David John  wrote:



I have used  Spark 1.4  for 6 months.  Thanks  all the members of this 
community for your great work.I have a question  about the logging issue. I 
hope this question can be solved.
The program is running under this configurations: YARN Cluster, YARN-client 
mode.
In Scala,writing a code like:rdd.map( a => println(a) );   will get the output 
about the value of a in our console.
However,in Java (1.7),writing rdd.map( new Function(){ 
@Override public  Integer   call(Integer a) throws Exception {  
System.out.println(a); }});won't get the output in our console.
The configuration is the same.
I have try this code but not work either: rdd.map( new 
Function(){ @Override public  Integer   call(Integer 
a) throws Exception {org.apache.log4j.Logger log = 
Logger.getLogger(this.getClass()); log.info(a); 
log.warn(a); log.error(a); log.fatal(a); }});
No output either:final   org.apache.log4j.Logger log = 
Logger.getLogger(this.getClass()); rdd.map( new Function(){
 @Override public  Integer   call(Integer a) throws Exception { 
log.info(a); log.warn(a); log.error(a); 
log.fatal(a); }});
It seems that the output of stdout in worker doesn't send the output back to 
our driver.I am wonder why it works in scala but not in java.Is there a  simple 
way to make java work like scala?
Thanks. 
  


  

Using Experminal Spark Features

2015-12-30 Thread David Newberger
Hi All,

I've been looking at the Direct Approach for streaming Kafka integration 
(http://spark.apache.org/docs/latest/streaming-kafka-integration.html) because 
it looks like a good fit for our use cases. My concern is the feature is 
experimental according to the documentation. Has anyone used this approach yet 
and if so what has you experience been with using it? If it helps we'd be 
looking to implement it using Scala. Secondly, in general what has people 
experience been with using experimental features in Spark?

Cheers,

David Newberger



Re: [discuss] dropping Python 2.6 support

2016-01-11 Thread David Chin
FWIW, RHEL 6 still uses Python 2.6, although 2.7.8 and 3.3.2 are available
through Red Hat Software Collections. See:
https://www.softwarecollections.org/en/

I run an academic compute cluster on RHEL 6. We do, however, provide Python
2.7.x and 3.5.x via modulefiles.

On Tue, Jan 5, 2016 at 8:45 AM, Nicholas Chammas  wrote:

> +1
>
> Red Hat supports Python 2.6 on REHL 5 until 2020
> <https://alexgaynor.net/2015/mar/30/red-hat-open-source-community/>, but
> otherwise yes, Python 2.6 is ancient history and the core Python developers
> stopped supporting it in 2013. REHL 5 is not a good enough reason to
> continue support for Python 2.6 IMO.
>
> We should aim to support Python 2.7 and Python 3.3+ (which I believe we
> currently do).
>
> Nick
>
> On Tue, Jan 5, 2016 at 8:01 AM Allen Zhang  wrote:
>
>> plus 1,
>>
>> we are currently using python 2.7.2 in production environment.
>>
>>
>>
>>
>>
>> 在 2016-01-05 18:11:45,"Meethu Mathew"  写道:
>>
>> +1
>> We use Python 2.7
>>
>> Regards,
>>
>> Meethu Mathew
>>
>> On Tue, Jan 5, 2016 at 12:47 PM, Reynold Xin  wrote:
>>
>>> Does anybody here care about us dropping support for Python 2.6 in Spark
>>> 2.0?
>>>
>>> Python 2.6 is ancient, and is pretty slow in many aspects (e.g. json
>>> parsing) when compared with Python 2.7. Some libraries that Spark depend on
>>> stopped supporting 2.6. We can still convince the library maintainers to
>>> support 2.6, but it will be extra work. I'm curious if anybody still uses
>>> Python 2.6 to run Spark.
>>>
>>> Thanks.
>>>
>>>
>>>
>>


-- 
David Chin, Ph.D.
david.c...@drexel.eduSr. Systems Administrator, URCF, Drexel U.
http://www.drexel.edu/research/urcf/
https://linuxfollies.blogspot.com/
+1.215.221.4747 (mobile)
https://github.com/prehensilecode


ROSE: Spark + R on the JVM, now available.

2016-01-12 Thread David Russell
Hi all,

I'd like to share news of the recent release of a new Spark package, 
[ROSE](http://spark-packages.org/package/onetapbeyond/opencpu-spark-executor).

ROSE is a Scala library offering access to the full scientific computing power 
of the R programming language to Apache Spark batch and streaming applications 
on the JVM. Where Apache SparkR lets data scientists use Spark from R, ROSE is 
designed to let Scala and Java developers use R from Spark.

The project is available and documented [on 
GitHub](https://github.com/onetapbeyond/opencpu-spark-executor) and I would 
encourage you to [take a 
look](https://github.com/onetapbeyond/opencpu-spark-executor). Any feedback, 
questions etc very welcome.

David

"All that is gold does not glitter, Not all those who wander are lost."

Re: ROSE: Spark + R on the JVM.

2016-01-12 Thread David Russell
Hi Corey,

> Would you mind providing a link to the github?

Sure, here is the github link you're looking for:

https://github.com/onetapbeyond/opencpu-spark-executor

David

"All that is gold does not glitter, Not all those who wander are lost."



 Original Message 
Subject: Re: ROSE: Spark + R on the JVM.
Local Time: January 12 2016 12:32 pm
UTC Time: January 12 2016 5:32 pm
From: cjno...@gmail.com
To: themarchoffo...@protonmail.com
CC: user@spark.apache.org,d...@spark.apache.org



David,
Thank you very much for announcing this! It looks like it could be very useful. 
Would you mind providing a link to the github?



On Tue, Jan 12, 2016 at 10:03 AM, David  wrote:

Hi all,

I'd like to share news of the recent release of a new Spark package, ROSE.

ROSE is a Scala library offering access to the full scientific computing power 
of the R programming language to Apache Spark batch and streaming applications 
on the JVM. Where Apache SparkR lets data scientists use Spark from R, ROSE is 
designed to let Scala and Java developers use R from Spark.

The project is available and documented on GitHub and I would encourage you to 
take a look. Any feedback, questions etc very welcome.

David

"All that is gold does not glitter, Not all those who wander are lost."

Re: ROSE: Spark + R on the JVM.

2016-01-12 Thread David Russell
Hi Richard,

> Would it be possible to access the session API from within ROSE,
> to get for example the images that are generated by R / openCPU

Technically it would be possible although there would be some potentially 
significant runtime costs per task in doing so, primarily those related to 
extracting image data from the R session, serializing and then moving that data 
across the cluster for each and every image.

From a design perspective ROSE was intended to be used within Spark scale 
applications where R object data was seen as the primary task output. An output 
in a format that could be rapidly serialized and easily processed. Are there 
real world use cases where Spark scale applications capable of generating 10k, 
100k, or even millions of image files would actually need to capture and store 
images? If so, how practically speaking, would these images ever be used? I'm 
just not sure. Maybe you could describe your own use case to provide some 
insights?

> and the logging to stdout that is logged by R?

If you are referring to the R console output (generated within the R session 
during the execution of an OCPUTask) then this data could certainly 
(optionally) be captured and returned on an OCPUResult. Again, can you provide 
any details for how you might use this console output in a real world 
application?

As an aside, for simple standalone Spark applications that will only ever run 
on a single host (no cluster) you could consider using an alternative library 
called fluent-r. This library is also available under my GitHub repo, [see 
here](https://github.com/onetapbeyond/fluent-r). The fluent-r library already 
has support for the retrieval of R objects, R console output and R graphics 
device image/plots. However it is not as lightweight as ROSE and it not 
designed to work in a clustered environment. ROSE on the other hand is designed 
for scale.

David

"All that is gold does not glitter, Not all those who wander are lost."



 Original Message 
Subject: Re: ROSE: Spark + R on the JVM.
Local Time: January 12 2016 6:56 pm
UTC Time: January 12 2016 11:56 pm
From: rsiebel...@gmail.com
To: m...@vijaykiran.com
CC: 
cjno...@gmail.com,themarchoffo...@protonmail.com,user@spark.apache.org,d...@spark.apache.org



Hi,

this looks great and seems to be very usable.
Would it be possible to access the session API from within ROSE, to get for 
example the images that are generated by R / openCPU and the logging to stdout 
that is logged by R?

thanks in advance,
Richard



On Tue, Jan 12, 2016 at 10:16 PM, Vijay Kiran  wrote:

I think it would be this: https://github.com/onetapbeyond/opencpu-spark-executor

> On 12 Jan 2016, at 18:32, Corey Nolet  wrote:
>


> David,
>
> Thank you very much for announcing this! It looks like it could be very 
> useful. Would you mind providing a link to the github?
>
> On Tue, Jan 12, 2016 at 10:03 AM, David  
> wrote:
> Hi all,
>
> I'd like to share news of the recent release of a new Spark package, ROSE.
>
> ROSE is a Scala library offering access to the full scientific computing 
> power of the R programming language to Apache Spark batch and streaming 
> applications on the JVM. Where Apache SparkR lets data scientists use Spark 
> from R, ROSE is designed to let Scala and Java developers use R from Spark.
>
> The project is available and documented on GitHub and I would encourage you 
> to take a look. Any feedback, questions etc very welcome.
>
> David
>
> "All that is gold does not glitter, Not all those who wander are lost."
>




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: ROSE: Spark + R on the JVM.

2016-01-13 Thread David Russell
Hi Richard,

Thanks for providing the background on your application.

> the user types or copy-pastes his R code,
> the system should then send this R code (using ROSE) to R

Unfortunately this type of ad hoc R analysis is not supported. ROSE supports 
the execution of any R function or script within an existing R package on CRAN, 
Bioc, or github. It does not support the direct execution of arbitrary blocks 
of R code as you described.

You may want to look at [DeployR](http://deployr.revolutionanalytics.com/), 
it's an open source R integration server that provides APIs in Java, JavaScript 
and .NET that can easily support your use case. The outputs of your DeployR 
integration could then become inputs to your data processing system.

David

"All that is gold does not glitter, Not all those who wander are lost."



 Original Message 
Subject: Re: ROSE: Spark + R on the JVM.
Local Time: January 13 2016 3:18 am
UTC Time: January 13 2016 8:18 am
From: rsiebel...@gmail.com
To: themarchoffo...@protonmail.com
CC: 
m...@vijaykiran.com,cjno...@gmail.com,user@spark.apache.org,d...@spark.apache.org


Hi David,

the use case is that we're building a data processing system with an intuitive 
user interface where Spark is used as the data processing framework.
We would like to provide a HTML user interface to R where the user types or 
copy-pastes his R code, the system should then send this R code (using ROSE) to 
R, process it and give the results back to the user. The RDD would be used so 
that the data can be further processed by the system but we would like to also 
show or be able to show the messages printed to STDOUT and also the images 
(plots) that are generated by R. The plots seems to be available in the OpenCPU 
API, see below

Inline image 1

So the case is not that we're trying to process millions of images but rather 
that we would like to show the generated plots (like a regression plot) that's 
generated in R to the user. There could be several plots generated by the code, 
but certainly not thousands or even hundreds, only a few.

Hope that this would be possible using ROSE because it seems a really good fit,
thanks in advance,
Richard



On Wed, Jan 13, 2016 at 3:39 AM, David Russell  
wrote:

Hi Richard,


> Would it be possible to access the session API from within ROSE,
> to get for example the images that are generated by R / openCPU

Technically it would be possible although there would be some potentially 
significant runtime costs per task in doing so, primarily those related to 
extracting image data from the R session, serializing and then moving that data 
across the cluster for each and every image.

From a design perspective ROSE was intended to be used within Spark scale 
applications where R object data was seen as the primary task output. An output 
in a format that could be rapidly serialized and easily processed. Are there 
real world use cases where Spark scale applications capable of generating 10k, 
100k, or even millions of image files would actually need to capture and store 
images? If so, how practically speaking, would these images ever be used? I'm 
just not sure. Maybe you could describe your own use case to provide some 
insights?


> and the logging to stdout that is logged by R?

If you are referring to the R console output (generated within the R session 
during the execution of an OCPUTask) then this data could certainly 
(optionally) be captured and returned on an OCPUResult. Again, can you provide 
any details for how you might use this console output in a real world 
application?

As an aside, for simple standalone Spark applications that will only ever run 
on a single host (no cluster) you could consider using an alternative library 
called fluent-r. This library is also available under my GitHub repo, [see 
here](https://github.com/onetapbeyond/fluent-r). The fluent-r library already 
has support for the retrieval of R objects, R console output and R graphics 
device image/plots. However it is not as lightweight as ROSE and it not 
designed to work in a clustered environment. ROSE on the other hand is designed 
for scale.


David

"All that is gold does not glitter, Not all those who wander are lost."




 Original Message 
Subject: Re: ROSE: Spark + R on the JVM.


Local Time: January 12 2016 6:56 pm
UTC Time: January 12 2016 11:56 pm
From: rsiebel...@gmail.com
To: m...@vijaykiran.com
CC: 
cjno...@gmail.com,themarchoffo...@protonmail.com,user@spark.apache.org,d...@spark.apache.org



Hi,

this looks great and seems to be very usable.
Would it be possible to access the session API from within ROSE, to get for 
example the images that are generated by R / openCPU and the logging to stdout 
that is logged by R?

thanks in advance,
Richard



On Tue, Jan 12, 2016 at 10:16 PM, Vijay Kiran  wrote:

I think it would be this: https://github.com/onetapbeyond/opencpu-sp

Re: Kafka Streaming and partitioning

2016-01-13 Thread David D
Yep that's exactly what we want. Thanks for all the info Cody.
Dave.
On 13 Jan 2016 18:29, "Cody Koeninger"  wrote:

> The idea here is that the custom partitioner shouldn't actually get used
> for repartitioning the kafka stream (because that would involve a shuffle,
> which is what you're trying to avoid).  You're just assigning a partitioner
> because you know how it already is partitioned.
>
>
> On Wed, Jan 13, 2016 at 11:22 AM, Dave  wrote:
>
>> So for case 1 below
>> - subclass or modify the direct stream and kafkardd.  They're private, so
>> you'd need to rebuild just the external kafka project, not all of spark
>> When the data is read from Kafka it will be partitioned correctly with
>> the Custom Partitioner passed in to the new direct stream and kafka RDD
>> implementations.
>>
>> For case 2
>> - write a wrapper subclass of rdd that takes a given custom partitioner
>> and rdd in the constructor, overrides partitioner, and delegates every
>> other method to the wrapped rdd.  This should be possible without
>> modification to any existing spark code.  You'd use it something like 
>> Am I correct in saying that the data from Kafka will not be read into
>> memory in the cluster (kafka server is not located on the Spark Cluster in
>> my use case) until the following code is executed
>> stream.transform { rdd =>
>>   val wrapped = YourWrapper(cp, rdd)
>>   wrapped.join(reference)
>> }
>> In which case it will run through the partitioner of the wrapped RDD when
>> it arrives in the cluster for the first time i.e. no shuffle.
>>
>> Thanks,
>> Dave.
>>
>>
>>
>> On 13/01/16 17:00, Cody Koeninger wrote:
>>
>> In the case here of a kafkaRDD, the data doesn't reside on the cluster,
>> it's not cached by default.  If you're running kafka on the same nodes as
>> spark, then data locality would play a factor, but that should be handled
>> by the existing getPreferredLocations method.
>>
>> On Wed, Jan 13, 2016 at 10:46 AM, Dave  wrote:
>>
>>> Thanks Cody, appreciate the response.
>>>
>>> With this pattern the partitioners will now match when the join is
>>> executed.
>>> However, does the wrapper RDD not need to set the partition meta data on
>>> the wrapped RDD in order to allow Spark to know where the data for each
>>> partition resides in the cluster.
>>>
>>> Thanks,
>>> Dave.
>>>
>>>
>>> On 13/01/16 16:21, Cody Koeninger wrote:
>>>
>>> If two rdds have an identical partitioner, joining should not involve a
>>> shuffle.
>>>
>>> You should be able to override the partitioner without calling
>>> partitionBy.
>>>
>>> Two ways I can think of to do this:
>>> - subclass or modify the direct stream and kafkardd.  They're private,
>>> so you'd need to rebuild just the external kafka project, not all of spark
>>>
>>> - write a wrapper subclass of rdd that takes a given custom partitioner
>>> and rdd in the constructor, overrides partitioner, and delegates every
>>> other method to the wrapped rdd.  This should be possible without
>>> modification to any existing spark code.  You'd use it something like
>>>
>>> val cp = YourCustomPartitioner(...)
>>> val reference = YourReferenceRDD(cp, ...)
>>> val stream = KafkaUtils
>>>
>>> stream.transform { rdd =>
>>>   val wrapped = YourWrapper(cp, rdd)
>>>   wrapped.join(reference)
>>> }
>>>
>>>
>>> I haven't had reason to do either one of those approaches, so YMMV, but
>>> I believe others have
>>>
>>>
>>>
>>>
>>> On Wed, Jan 13, 2016 at 3:40 AM, ddav < 
>>> dave.davo...@gmail.com> wrote:
>>>
 Hi,

 I have the following use case:

 1. Reference data stored in an RDD that is persisted and partitioned
 using a
 simple custom partitioner.
 2. Input stream from kafka that uses the same partitioner algorithm as
 the
 ref data RDD - this partitioning is done in kafka.

 I am using kafka direct streams so the number of kafka partitions map
 to the
 number of partitions in the spark RDD. From testing and the
 documentation I
 see Spark does not know anything about how the data has been
 partitioned in
 kafka.

 In my use case I need to join the reference data RDD and the input
 stream
 RDD.  Due to the fact I have manually ensured the incoming data from
 kafka
 uses the same partitioning algorithm I know the data has been grouped
 correctly in the input stream RDD in Spark but I cannot do a join
 without a
 shuffle step due to the fact Spark has no knowledge of how the data has
 been
 partitioned.

 I have two ways to do this.
 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
 followed by a join. This results in a shuffle of the input stream RDD
 and
 then the co-partitioned join to take place.
 2. Call join on the reference data RDD passing in the input stream RDD.
 Spark will do a shuffle under the hood in this case and the join will
 take
 place. The join will do its best to run on a node that has l

Re: rdd.foreach return value

2016-01-18 Thread David Russell
The foreach operation on RDD has a void (Unit) return type. See attached. So 
there is no return value to the driver.

David

"All that is gold does not glitter, Not all those who wander are lost."



 Original Message 
Subject: rdd.foreach return value
Local Time: January 18 2016 10:34 pm
UTC Time: January 19 2016 3:34 am
From: charles.up...@gmail.com
To: user@spark.apache.org


code snippet




the 'print' actually print info on the worker node, but I feel confused where 
the 'return' value

goes to. for I get nothing on the driver node.
--


--
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao

foreach.png
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

MLlib OneVsRest causing intermittent exceptions

2016-01-25 Thread David Brooks
Hi,

I've run into an exception using MLlib OneVsRest with logistic regression
(v1.6.0, but also in previous versions).

The issue is intermittent.  When running multiclass classification with
K-fold cross validation, there are scenarios where the split does not
contain instances for every target label.  In such cases, an
ArrayIndexOutOfBoundsException is generated.

I've tried to reproduce the problem in a simple SBT project here:

   https://github.com/junglebarry/SparkOneVsRestTest

I don't imagine this is typical - it first surfaced when running over a
dataset with some very rare classes.

I'm happy to look into patching the code, but I first wanted to confirm
that the problem was real, and that I wasn't somehow misunderstanding how I
should be using OneVsRest.

Any guidance would be appreciated - I'm new to the list.

Many thanks,
David


Re: MLlib OneVsRest causing intermittent exceptions

2016-01-26 Thread David Brooks
Hi Ram,

I didn't include an explicit label column in my reproduction as I thought
it superfluous.  However, in my original use-case, I was using a
StringIndexer, where the labels were indexed across the entire dataset
(training+validation+test).  The (indexed) label column was then explicitly
provided to the OneVsRest instance.

Here's the abridged version:

val textDocuments = ??? // real data here

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("labelIndexed")
  .fit(textDocuments)

val lrClassifier = new LogisticRegression()

val classifier = new OneVsRest()
  .setClassifier(lrClassifier)
  .setLabelCol(labelIndexer.getOutputCol)

// ...


There's an explicit reference to the label column, and when created, that
column contains all possible values of the label (it's `fit` over all
data).  It looks to me like StringIndexer computes label metadata at that
point (in `transform`) and attaches it to the column.  This way, I'd hope
that even once TrainValidationSplit returns a subset dataframe - which may
not contain all labels - the metadata on the column should still contain
all labels.

Does my use of StringIndexer count as "metadata", here?  If so, I still see
the exception as before.

I've pushed a new example using StringIndexer to my earlier repo, so you
can see the code and issue.  I'm happy to try a simpler method for
providing column metadata, if one is available.

Thanks,
David

On Mon, Jan 25, 2016 at 11:13 PM Ram Sriharsha 
wrote:

> Hi David
>
> What happens if you provide the class labels via metadata instead of
> letting OneVsRest determine the labels?
>
> Ram
>
> On Mon, Jan 25, 2016 at 3:06 PM, David Brooks  wrote:
>
>> Hi,
>>
>> I've run into an exception using MLlib OneVsRest with logistic regression
>> (v1.6.0, but also in previous versions).
>>
>> The issue is intermittent.  When running multiclass classification with
>> K-fold cross validation, there are scenarios where the split does not
>> contain instances for every target label.  In such cases, an
>> ArrayIndexOutOfBoundsException is generated.
>>
>> I've tried to reproduce the problem in a simple SBT project here:
>>
>>https://github.com/junglebarry/SparkOneVsRestTest
>>
>> I don't imagine this is typical - it first surfaced when running over a
>> dataset with some very rare classes.
>>
>> I'm happy to look into patching the code, but I first wanted to confirm
>> that the problem was real, and that I wasn't somehow misunderstanding how I
>> should be using OneVsRest.
>>
>> Any guidance would be appreciated - I'm new to the list.
>>
>> Many thanks,
>> David
>>
>
>
>
> --
> Ram Sriharsha
> Architect, Spark and Data Science
> Hortonworks, 2550 Great America Way, 2nd Floor
> Santa Clara, CA 95054
> Ph: 408-510-8635
> email: har...@apache.org
>
> [image: https://www.linkedin.com/in/harsha340]
> <https://www.linkedin.com/in/harsha340> <https://twitter.com/halfabrane>
> <https://github.com/harsha2010/>
>
>


Re: MLlib OneVsRest causing intermittent exceptions

2016-01-26 Thread David Brooks
Hi again Ram,

Sorry, I was too hasty in my previous response.  I've done a bit more
digging through the code, and StringIndexer does indeed provide metadata,
as a NominalAttribute with a known number of class labels.  I don't think
the issue is related to the use of metadata, however.

It seems to me to be caused by the interaction between OneVsRest and
TrainValidationSplit.  For rare target classes under OneVsRest, it seems
quite possible for this random-split approach to select a training subset
where all items belong to non-target classes - all of which are given the
same class label by OneVsRest.  In this case, we start training
LogisticRegression on data of a single class, which seems odd.  The
exception stems from there.

The cause looks to me to be that OneVsRest.fit runs binary classifications
from 0 to numClasses (OneVsRest.scala:209), and this seems incompatible
with the random split, which cannot guarantee training examples for all
labels in the range.  It might be preferable to iterate over the observed
labels in the training set, rather than all labels in the range.  I don't
know the performance effects of that change, but it does look incompatible
with using the label metadata as a shortcut.

Do you agree that there is an issue here?  Would you accept contributions
to the code to remedy it?  I'd gladly take a look if I can be of help.

Many thanks,
David



On Tue, Jan 26, 2016 at 1:29 PM David Brooks  wrote:

> Hi Ram,
>
> I didn't include an explicit label column in my reproduction as I thought
> it superfluous.  However, in my original use-case, I was using a
> StringIndexer, where the labels were indexed across the entire dataset
> (training+validation+test).  The (indexed) label column was then explicitly
> provided to the OneVsRest instance.
>
> Here's the abridged version:
>
> val textDocuments = ??? // real data here
>
> // Index labels, adding metadata to the label column.
> // Fit on whole dataset to include all labels in index.
> val labelIndexer = new StringIndexer()
>   .setInputCol("label")
>   .setOutputCol("labelIndexed")
>   .fit(textDocuments)
>
> val lrClassifier = new LogisticRegression()
>
> val classifier = new OneVsRest()
>   .setClassifier(lrClassifier)
>   .setLabelCol(labelIndexer.getOutputCol)
>
> // ...
>
>
> There's an explicit reference to the label column, and when created, that
> column contains all possible values of the label (it's `fit` over all
> data).  It looks to me like StringIndexer computes label metadata at that
> point (in `transform`) and attaches it to the column.  This way, I'd hope
> that even once TrainValidationSplit returns a subset dataframe - which
> may not contain all labels - the metadata on the column should still
> contain all labels.
>
> Does my use of StringIndexer count as "metadata", here?  If so, I still
> see the exception as before.
>
> I've pushed a new example using StringIndexer to my earlier repo, so you
> can see the code and issue.  I'm happy to try a simpler method for
> providing column metadata, if one is available.
>
> Thanks,
> David
>
> On Mon, Jan 25, 2016 at 11:13 PM Ram Sriharsha 
> wrote:
>
>> Hi David
>>
>> What happens if you provide the class labels via metadata instead of
>> letting OneVsRest determine the labels?
>>
>> Ram
>>
>> On Mon, Jan 25, 2016 at 3:06 PM, David Brooks  wrote:
>>
>>> Hi,
>>>
>>> I've run into an exception using MLlib OneVsRest with logistic
>>> regression (v1.6.0, but also in previous versions).
>>>
>>> The issue is intermittent.  When running multiclass classification with
>>> K-fold cross validation, there are scenarios where the split does not
>>> contain instances for every target label.  In such cases, an
>>> ArrayIndexOutOfBoundsException is generated.
>>>
>>> I've tried to reproduce the problem in a simple SBT project here:
>>>
>>>https://github.com/junglebarry/SparkOneVsRestTest
>>>
>>> I don't imagine this is typical - it first surfaced when running over a
>>> dataset with some very rare classes.
>>>
>>> I'm happy to look into patching the code, but I first wanted to confirm
>>> that the problem was real, and that I wasn't somehow misunderstanding how I
>>> should be using OneVsRest.
>>>
>>> Any guidance would be appreciated - I'm new to the list.
>>>
>>> Many thanks,
>>> David
>>>
>>
>>
>>
>> --
>> Ram Sriharsha
>> Architect, Spark and Data Science
>> Hortonworks, 2550 Great America Way, 2nd Floor
>> Santa Clara, CA 95054
>> Ph: 408-510-8635
>> email: har...@apache.org
>>
>> [image: https://www.linkedin.com/in/harsha340]
>> <https://www.linkedin.com/in/harsha340> <https://twitter.com/halfabrane>
>> <https://github.com/harsha2010/>
>>
>>


Re: MLlib OneVsRest causing intermittent exceptions

2016-01-26 Thread David Brooks
Hi Ram, Joseph,

That's right, but I will clarify:

(a) a random split can generate a training set that does not contain some
rare class
(b) when LogisticRegression is run over a dataframe where all instances
have the same class label, it throws an ArrayIndexOutOfBoundsException.

When (a) occurs, (b) is the consequence.  The rare class is missing from
the training set, so you would not expect OneVsRest to train a binary
classifier on it; however, because OneVsRest trains binary classifiers on
all class labels in the range (0 to numClasses), it *will* train a binary
classifier on the missing class, which leads to the exception from (b).

A concrete example:

   - class labels 0, 1, 2, 3 are present in dataset (*numClasses* = 4);
   - 0, 2, 3 are in the training set after random split (no *1*);
   - The range (0 to 4) is used to train binary classifiers on each of 0,
   *1*, 2, 3
   - As soon as the classifier is trained on *1*, the exception is thrown

I'd suggest:

   1. In LogisticRegression, where numClasses == 1, thrown a more
   meaningful validation exception (avoiding the more cryptic
   ArrayIndexOutOfBoundsException)
   2. Only run OneVsRest for class labels that appear in the dataframe,
   rather than all labels in the Range(0, numClasses).

I created a few simple test cases for running from SBT, like this one
<https://github.com/junglebarry/SparkOneVsRestTest/blob/master/src/main/scala/SparkOneVsRestTest_2_Errors.scala>,
but I've turned them into gists now for spark-shell:

   - LogisticRegression throwing ArrayIndexOutOfBoundsException
   <https://gist.github.com/junglebarry/a7cedce6eaf978d7b9ee>
   - OneVsRest throwing ArrayIndexOutOfBoundsException
   <https://gist.github.com/junglebarry/66234edfebaad6254ebe> (with a
   simulated missing class from a Range)
   - OneVsRest throwing ArrayIndexOutOfBoundsException with random split
   <https://gist.github.com/junglebarry/6073aa474d89f3322063>.  Only
   exceptions in 2/3 of cases, due to randomness.

If these look good as test cases, I'll take a look at filing JIRAs and
getting patches tomorrow morning.  It's late here!

Thanks for the swift response,
David


On Tue, Jan 26, 2016 at 11:09 PM Ram Sriharsha 
wrote:

> Hi David
>
> If I am reading the email right, there are two problems here right?
> a) for rare classes the random split will likely miss the rare class.
> b) if it misses the rare class an exception is thrown
>
> I thought the exception stems from b), is that right?... i wouldn't expect
> an exception to be thrown in the case the training dataset is missing the
> rare class.
> could you reproduce this in a simple snippet of code that we can quickly
> test on the shell?
>
>
>
>
> On Tue, Jan 26, 2016 at 3:02 PM, Ram Sriharsha 
> wrote:
>
>> Hey David, Yeah absolutely!, feel free to create a JIRA and attach your
>> patch to it. We can help review it and pull in the fix... happy to accept
>> contributions!
>> ccing Joseph who is one of the maintainers of MLLib as well.. when
>> creating the JIRA can you attach a simple test case?
>>
>> On Tue, Jan 26, 2016 at 2:59 PM, David Brooks  wrote:
>>
>>> Hi again Ram,
>>>
>>> Sorry, I was too hasty in my previous response.  I've done a bit more
>>> digging through the code, and StringIndexer does indeed provide metadata,
>>> as a NominalAttribute with a known number of class labels.  I don't think
>>> the issue is related to the use of metadata, however.
>>>
>>> It seems to me to be caused by the interaction between OneVsRest and
>>> TrainValidationSplit.  For rare target classes under OneVsRest, it seems
>>> quite possible for this random-split approach to select a training subset
>>> where all items belong to non-target classes - all of which are given the
>>> same class label by OneVsRest.  In this case, we start training
>>> LogisticRegression on data of a single class, which seems odd.  The
>>> exception stems from there.
>>>
>>> The cause looks to me to be that OneVsRest.fit runs binary
>>> classifications from 0 to numClasses (OneVsRest.scala:209), and this seems
>>> incompatible with the random split, which cannot guarantee training
>>> examples for all labels in the range.  It might be preferable to iterate
>>> over the observed labels in the training set, rather than all labels in the
>>> range.  I don't know the performance effects of that change, but it does
>>> look incompatible with using the label metadata as a shortcut.
>>>
>>> Do you agree that there is an issue here?  Would you accept
>>> contributions to the code to remedy it?  I'd gladly take a look if I can be
>>> of hel

Re: MLlib OneVsRest causing intermittent exceptions

2016-01-27 Thread David Brooks
Hi Ram,

Yes, I complete agree.  An exception is poor way to handle this case, and
training on a dataset of zero labels and no one labels should simply work
without exceptions.

Fortunately, it looks like someone else has recently patched the problem
with LogisticRegression:


https://github.com/apache/spark/commit/2388de51912efccaceeb663ac56fc500a79d2ceb

This should resolve the issue I'm experiencing.  I'll get hold of a build
from source and try it out.

Thanks for all your help!

David

On Wed, Jan 27, 2016 at 12:51 AM Ram Sriharsha 
wrote:

> btw, OneVsRest is using the labels in the dataset that is fed to the fit
> method, in case the metadata is missing.
> So if the metadata contains a label, we expect that label to be present in
> the dataset passed to the fit method.
> If you want OneVsRest to compute the labels you can leave the label
> metadata empty in which case we first compute the # of
> labels in the training dataset.
>
> If the training dataset contains a given label, then logistic regression
> should work fine regardless of the rarity of that label (performance might
> be bad but it won't throw an exception afaik)
>
> if the training dataset does not contain a given label but the metadata
> does, then we do end up training classifiers which will never see that
> label.
> But even here, what gets passed to the underlying classifier is a dataset
> with only say zero labels and no one labels.
> A classifier should be able to handle this... but if it cannot for some
> reason, we can have a check in OneVsRest that doesn't train that classifier
>
> On Tue, Jan 26, 2016 at 4:33 PM, Ram Sriharsha 
> wrote:
>
>> Hey David
>>
>> In your scenario, OneVsRest is training a classifier for 1 vs not 1...
>> and the input dataset for fit (or train) has labeled data for label 1
>>
>> But the underlying binary classifier (LogisticRegression) uses sampling
>> to determine the subset of data to sample during each iteration and it is
>> possible that this sample does not include any examples with label 1 (ie
>> numClasses = 1)
>>
>> So the examples it selects in that iteration only include 0 labeled data
>> and nothing with label 1.
>>
>> But why should it throw an exception? if it does, then i would think we
>> need to fix the issue in the underlying algorithm instead of the
>> reduction somehow knowing that the binary classifier is sampling from the
>> training dataset.
>>
>> Or am I misunderstanding the issue here?
>>
>> I'll take a look at the gist you linked when i get a chance , thanks!
>>
>> Ram
>>
>> On Tue, Jan 26, 2016 at 4:06 PM, David Brooks  wrote:
>>
>>> Hi Ram, Joseph,
>>>
>>> That's right, but I will clarify:
>>>
>>> (a) a random split can generate a training set that does not contain
>>> some rare class
>>> (b) when LogisticRegression is run over a dataframe where all instances
>>> have the same class label, it throws an ArrayIndexOutOfBoundsException.
>>>
>>> When (a) occurs, (b) is the consequence.  The rare class is missing from
>>> the training set, so you would not expect OneVsRest to train a binary
>>> classifier on it; however, because OneVsRest trains binary classifiers on
>>> all class labels in the range (0 to numClasses), it *will* train a
>>> binary classifier on the missing class, which leads to the exception from
>>> (b).
>>>
>>> A concrete example:
>>>
>>>- class labels 0, 1, 2, 3 are present in dataset (*numClasses* = 4);
>>>- 0, 2, 3 are in the training set after random split (no *1*);
>>>- The range (0 to 4) is used to train binary classifiers on each of
>>>0, *1*, 2, 3
>>>- As soon as the classifier is trained on *1*, the exception is
>>>thrown
>>>
>>> I'd suggest:
>>>
>>>1. In LogisticRegression, where numClasses == 1, thrown a more
>>>meaningful validation exception (avoiding the more cryptic
>>>ArrayIndexOutOfBoundsException)
>>>2. Only run OneVsRest for class labels that appear in the dataframe,
>>>rather than all labels in the Range(0, numClasses).
>>>
>>> I created a few simple test cases for running from SBT, like this one
>>> <https://github.com/junglebarry/SparkOneVsRestTest/blob/master/src/main/scala/SparkOneVsRestTest_2_Errors.scala>,
>>> but I've turned them into gists now for spark-shell:
>>>
>>>- LogisticRegression throwing ArrayIndexOutOfBoundsException
>>><https://gist.github.com/junglebarry/

[ANNOUNCE] New SAMBA Package = Spark + AWS Lambda

2016-02-01 Thread David Russell
Hi all,

Just sharing news of the release of a newly available Spark package, SAMBA
<https://github.com/onetapbeyond/lambda-spark-executor>.
<http://spark-packages.org/package/onetapbeyond/opencpu-spark-executor>

https://github.com/onetapbeyond/lambda-spark-executor

SAMBA is an Apache Spark package offering seamless integration with the AWS
Lambda <https://aws.amazon.com/lambda/> compute service for Spark batch and
streaming applications on the JVM.

Within traditional Spark deployments RDD tasks are executed using fixed
compute resources on worker nodes within the Spark cluster. With SAMBA,
application developers can delegate selected RDD tasks to execute using
on-demand AWS Lambda compute infrastructure in the cloud.

Not unlike the recently released ROSE
<https://github.com/onetapbeyond/opencpu-spark-executor> package that
extends the capabilities of traditional Spark applications with support for
CRAN R analytics, SAMBA provides another (hopefully) useful extension for
Spark application developers on the JVM.

SAMBA Spark Package: https://github.com/onetapbeyond/lambda-spark-executor
<https://github.com/onetapbeyond/lambda-spark-executor>
ROSE Spark Package: https://github.com/onetapbeyond/opencpu-spark-executor
<https://github.com/onetapbeyond/opencpu-spark-executor>

Questions, suggestions, feedback welcome.

David

-- 
"*All that is gold does not glitter,** Not all those who wander are lost."*


Re: Guidelines for writing SPARK packages

2016-02-01 Thread David Russell
Hi Praveen,

The basic requirements for releasing a Spark package on
spark-packages.org are as follows:

1. The package content must be hosted by GitHub in a public repo under
the owner's account.
2. The repo name must match the package name.
3. The master branch of the repo must contain "README.md" and "LICENSE".

Per the doc on spark-packages.org site an example package that meets
those requirements can be found at
https://github.com/databricks/spark-avro. My own recently released
SAMBA package also meets these requirements:
https://github.com/onetapbeyond/lambda-spark-executor.

As you can see there is nothing in this list of requirements that
demands the implementation of specific interfaces. What you'll need to
implement will depend entirely on what you want to accomplish. If you
want to register a release for your package you will also need to push
the artifacts for your package to Maven central.

David


On Mon, Feb 1, 2016 at 7:03 AM, Praveen Devarao  wrote:
> Hi,
>
> Is there any guidelines or specs to write a Spark package? I would
> like to implement a spark package and would like to know the way it needs to
> be structured (implement some interfaces etc) so that it can plug into Spark
> for extended functionality.
>
> Could any one help me point to docs or links on the above?
>
> Thanking You
>
> Praveen Devarao



-- 
"All that is gold does not glitter, Not all those who wander are lost."

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [ANNOUNCE] New SAMBA Package = Spark + AWS Lambda

2016-02-02 Thread David Russell
Hi Ben,

> My company uses Lamba to do simple data moving and processing using python
> scripts. I can see using Spark instead for the data processing would make it
> into a real production level platform.

That may be true. Spark has first class support for Python which
should make your life easier if you do go this route. Once you've
fleshed out your ideas I'm sure folks on this mailing list can provide
helpful guidance based on their real world experience with Spark.

> Does this pave the way into replacing
> the need of a pre-instantiated cluster in AWS or bought hardware in a
> datacenter?

In a word, no. SAMBA is designed to extend-not-replace the traditional
Spark computation and deployment model. At it's most basic, the
traditional Spark computation model distributes data and computations
across worker nodes in the cluster.

SAMBA simply allows some of those computations to be performed by AWS
Lambda rather than locally on your worker nodes. There are I believe a
number of potential benefits to using SAMBA in some circumstances:

1. It can help reduce some of the workload on your Spark cluster by
moving that workload onto AWS Lambda, an infrastructure on-demand
compute service.

2. It allows Spark applications written in Java or Scala to make use
of libraries and features offered by Python and JavaScript (Node.js)
today, and potentially, more libraries and features offered by
additional languages in the future as AWS Lambda language support
evolves.

3. It provides a simple, clean API for integration with REST APIs that
may be a benefit to Spark applications that form part of a broader
data pipeline or solution.

> If so, then this would be a great efficiency and make an easier
> entry point for Spark usage. I hope the vision is to get rid of all cluster
> management when using Spark.

You might find one of the hosted Spark platform solutions such as
Databricks or Amazon EMR that handle cluster management for you a good
place to start. At least in my experience, they got me up and running
without difficulty.

David

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



add kafka streaming jars when initialising the sparkcontext in python

2016-02-10 Thread David Kennedy
I have no problems when submitting the task using spark-submit.  The --jars
option with the list of jars required is successful and I see in the output
the jars being added:

16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/usr/lib/spark/extras/lib/spark-streaming-kafka.jar at
http://192.168.10.4:33820/jars/spark-streaming-kafka.jar with timestamp
1455102864058
16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/opt/kafka/libs/scala-library-2.10.1.jar at
http://192.168.10.4:33820/jars/scala-library-2.10.1.jar with timestamp
1455102864077
16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/opt/kafka/libs/kafka_2.10-0.8.1.1.jar at
http://192.168.10.4:33820/jars/kafka_2.10-0.8.1.1.jar with timestamp
1455102864085
16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/opt/kafka/libs/metrics-core-2.2.0.jar at
http://192.168.10.4:33820/jars/metrics-core-2.2.0.jar with timestamp
1455102864086
16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/usr/share/java/mysql.jar at http://192.168.10.4:33820/jars/mysql.jar
with timestamp 1455102864090

But when I try to programmatically create a context in python (I want to
set up some tests) I don't see this and I end up with class not found
errors.

Trying to cover all bases even though I suspect that it's redundant when
running local I've tried:

spark_conf = SparkConf()
spark_conf.setMaster('local[4]')
spark_conf.set('spark.executor.extraLibraryPath',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')
spark_conf.set('spark.executor.extraClassPath',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')
spark_conf.set('spark.driver.extraClassPath',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')
spark_conf.set('spark.driver.extraLibraryPath',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')
self.spark_context = SparkContext(conf=spark_conf)

But still I get the same failure to find the same class:

Py4JJavaError: An error occurred while calling o30.loadClass.
: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper

The class is certainly in the spark_streaming_kafka.jar and is present in
the filesystem at that location.

I'm under the impression that were I using java I'd be able to use the
addJars method on the conf to take care of this but there doesn't appear to
be anything that corresponds for python.

Hacking about I found that adding:


spark_conf.set('spark.jars',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')

got the logging to admit to adding the jars to the http server (just as for
the spark submit output above) but leaving the other config options in
place or removing them the class is still not found.

Is this not possible in python?

Incidentally, I have tried SPARK_CLASSPATH (getting the message that it's
deprecated and ignored anyway) and I cannot find anything else to try.

Can anybody help?

David K.


How to add kafka streaming jars when initialising the sparkcontext in python

2016-02-15 Thread David Kennedy
I have no problems when submitting the task using spark-submit.  The --jars
option with the list of jars required is successful and I see in the output
the jars being added:

16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/usr/lib/spark/extras/lib/spark-streaming-kafka.jar at
http://192.168.10.4:33820/jars/spark-streaming-kafka.jar with timestamp
1455102864058
16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/opt/kafka/libs/scala-library-2.10.1.jar at
http://192.168.10.4:33820/jars/scala-library-2.10.1.jar with timestamp
1455102864077
16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/opt/kafka/libs/kafka_2.10-0.8.1.1.jar at
http://192.168.10.4:33820/jars/kafka_2.10-0.8.1.1.jar with timestamp
1455102864085
16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/opt/kafka/libs/metrics-core-2.2.0.jar at
http://192.168.10.4:33820/jars/metrics-core-2.2.0.jar with timestamp
1455102864086
16/02/10 11:14:24 INFO spark.SparkContext: Added JAR
file:/usr/share/java/mysql.jar at http://192.168.10.4:33820/jars/mysql.jar
with timestamp 1455102864090

But when I try to programmatically create a context in python (I want to
set up some tests) I don't see this and I end up with class not found
errors.

Trying to cover all bases even though I suspect that it's redundant when
running local I've tried:

spark_conf = SparkConf()
spark_conf.setMaster('local[4]')
spark_conf.set('spark.executor.extraLibraryPath',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')
spark_conf.set('spark.executor.extraClassPath',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')
spark_conf.set('spark.driver.extraClassPath',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')
spark_conf.set('spark.driver.extraLibraryPath',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')
self.spark_context = SparkContext(conf=spark_conf)

But still I get the same failure to find the same class:

Py4JJavaError: An error occurred while calling o30.loadClass.
: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper

The class is certainly in the spark_streaming_kafka.jar and is present in
the filesystem at that location.

I'm under the impression that were I using java I'd be able to use the
addJars method on the conf to take care of this but there doesn't appear to
be anything that corresponds for python.

Hacking about I found that adding:


spark_conf.set('spark.jars',
   '/usr/lib/spark/extras/lib/spark-streaming-kafka.jar,'
   '/opt/kafka/libs/scala-library-2.10.1.jar,'
   '/opt/kafka/libs/kafka_2.10-0.8.1.1.jar,'
   '/opt/kafka/libs/metrics-core-2.2.0.jar,'
   '/usr/share/java/mysql.jar')

got the logging to admit to adding the jars to the http server (just as for
the spark submit output above) but leaving the other config options in
place or removing them the class is still not found.

Is this not possible in python?

Incidentally, I have tried SPARK_CLASSPATH (getting the message that it's
deprecated and ignored anyway) and I cannot find anything else to try.

Can anybody help?

David K.


Re: How to avoid Spark shuffle spill memory?

2015-10-06 Thread David Mitchell
Hi unk1102,

Try adding more memory to your nodes.  Are you running Spark in the cloud?
If so, increase the memory on your servers.
Do you have default parallelism set (spark.default.parallelism)?  If so,
unset it, and let Spark decided how many partitions to allocate.
You can also try refactoring your code to make is use less memory.

David

On Tue, Oct 6, 2015 at 3:19 PM, unk1102  wrote:

> Hi I have a Spark job which runs for around 4 hours and it shared
> SparkContext and runs many child jobs. When I see each job in UI I see
> shuffle spill of around 30 to 40 GB and because of that many times
> executors
> gets lost because of using physical memory beyond limits how do I avoid
> shuffle spill? I have tried almost all optimisations nothing is helping I
> dont cache anything I am using Spark 1.4.1 and also using tungsten,codegen
> etc  I am using spark.shuffle.storage as 0.2 and spark.storage.memory as
> 0.2
> I tried to increase shuffle memory to 0.6 but then it halts in GC pause
> causing my executor to timeout and then getting lost eventually.
>
> Please guide. Thanks in advance.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-Spark-shuffle-spill-memory-tp24960.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
### Confidential e-mail, for recipient's (or recipients') eyes only, not
for distribution. ###


Install via directions in "Learning Spark". Exception when running bin/pyspark

2015-10-12 Thread David Bess
Greetings all,

Excited to be learning spark.  I am working through the "Learning Spark"
book and I am having trouble getting Spark installed and running.  

This is what I have done so far.  

I installed Spark from here: 

http://spark.apache.org/downloads.html

selecting 1.5.1, prebuilt for hadoop 2.6 and later, direct download.

I untared the download

cd downloads
tar -xf spark-1.5.1-bin-hadoop2.6.tgz
cd spark-1.5.1-bin-hadoop2.6

Next I try running a shell, the example in the book claims we can run in
local mode and there should be no need to install hadoop / yarn / mesos or
anything else to get started.  

I have tried the following commands

./bin/pyspark
bin/pyspark
./bin/spark-shell
bin/spark-shell

I am getting an error as follows:

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/launcher/Main
Caused by: java.lang.ClassNotFoundException: org.apache.spark.launcher.Main
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)

About my system.  

I have a macbook pro OS X Yosemite 10.10.5
I just downloaded and installed the latest Java from Oracle website, I
believe this was java8u60
I double checked my python version and it appears to be 2.7.10

I am familiar with command line, and have background in hadoop, but this has
me stumped.  

Thanks in advance,

David Bess






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Install-via-directions-in-Learning-Spark-Exception-when-running-bin-pyspark-tp25043.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Install via directions in "Learning Spark". Exception when running bin/pyspark

2015-10-13 Thread David Bess
Got it working!  Thank you for confirming my suspicion that this issue was
related to Java.  When I dug deeper I found multiple versions and some other
issues.  I worked on it a while before deciding it would be easier to just
uninstall all Java and reinstall clean JDK, and now it works perfectly.  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Install-via-directions-in-Learning-Spark-Exception-when-running-bin-pyspark-tp25043p25049.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OLAP query using spark dataframe with cassandra

2015-11-10 Thread David Morales
; dev 
>> *Subject:* Re: OLAP query using spark dataframe with cassandra
>>
>> Is there any distributor supporting these software components in
>> combination? If no and your core business is not software then you may want
>> to look for something else, because it might not make sense to build up
>> internal know-how in all of these areas.
>>
>> In any case - it depends all highly on your data and queries. You will
>> have to do your own experiments.
>>
>> On 09 Nov 2015, at 07:02, "fightf...@163.com"  wrote:
>>
>> Hi, community
>>
>> We are specially interested about this featural integration according to
>> some slides from [1]. The SMACK(Spark+Mesos+Akka+Cassandra+Kafka)
>>
>> seems good implementation for lambda architecure in the open-source
>> world, especially non-hadoop based cluster environment. As we can see,
>>
>> the advantages obviously consist of :
>>
>> 1 the feasibility and scalability of spark datafram api, which can also
>> make a perfect complement for Apache Cassandra native cql feature.
>>
>> 2 both streaming and batch process availability using the ALL-STACK
>> thing, cool.
>>
>> 3 we can both achieve compacity and usability for spark with cassandra,
>> including seemlessly integrating with job scheduling and resource
>> management.
>>
>> Only one concern goes to the OLAP query performance issue, which mainly
>> caused by frequent aggregation work between daily increased large tables,
>> for
>>
>> both spark sql and cassandra. I can see that the [1] use case facilitates
>> FiloDB to achieve columnar storage and query performance, but we had
>> nothing more
>>
>> knowledge.
>>
>> Question is : Any guy had such use case for now, especially using in your
>> production environment ? Would be interested in your architeture for
>> designing this
>>
>> OLAP engine using spark +  cassandra. What do you think the comparison
>> between the scenario with traditional OLAP cube design? Like Apache Kylin
>> or
>>
>> pentaho mondrian ?
>>
>> Best Regards,
>>
>> Sun.
>>
>>
>> [1]
>> <http://www.slideshare.net/planetcassandra/cassandra-summit-2014-interactive-olap-queries-using-apache-cassandra-and-spark>
>> http://www.slideshare.net/planetcassandra/cassandra-summit-2014-interactive-olap-queries-using-apache-cassandra-and-spark
>>
>> --
>> fightf...@163.com
>>
>>
>>
>


-- 

David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf
<https://twitter.com/dmoralesdf>


<http://www.stratio.com/>
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
<https://twitter.com/StratioBD>*


RE: hdfs-ha on mesos - odd bug

2015-11-11 Thread Buttler, David

I have verified that this error exists on my system as well, and the suggested 
workaround also works.
Spark version: 1.5.1; 1.5.2
Mesos version: 0.21.1
CDH version: 4.7

I have set up the spark-env.sh to contain HADOOP_CONF_DIR pointing to the 
correct place, and I have also linked in the hdfs-site.xml file to 
$SPARK_HOME/conf.  I agree that it should work, but it doesn't.

I have also tried including the correct Hadoop configuration files in the 
application jar.

Note: it works fine from spark-shell, but it doesn't work from spark-submit

Dave

-Original Message-
From: Marcelo Vanzin [mailto:van...@cloudera.com] 
Sent: Tuesday, September 15, 2015 7:47 PM
To: Adrian Bridgett
Cc: user
Subject: Re: hdfs-ha on mesos - odd bug

On Mon, Sep 14, 2015 at 6:55 AM, Adrian Bridgett  wrote:
> 15/09/14 13:00:25 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 
> 0,
> 10.1.200.245): java.lang.IllegalArgumentException:
> java.net.UnknownHostException: nameservice1
> at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxie
> s.java:310)

This looks like you're trying to connect to an HA HDFS service but you have not 
provided the proper hdfs-site.xml for your app; then, instead of recognizing 
"nameservice1" as an HA nameservice, it thinks it's an actual NN address, tries 
to connect to it, and fails.

If you provide the correct hdfs-site.xml to your app (by placing it in 
$SPARK_HOME/conf or setting HADOOP_CONF_DIR to point to the conf directory), it 
should work.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: Save GraphX to disk

2015-11-13 Thread Buttler, David
A graph is nodes and vertices.  What else are you expecting to save/load?  You 
could save/load the triplets, but that is actually more work to reconstruct the 
graph than the nodes and vertices separately.

Dave


From: Gaurav Kumar [mailto:gauravkuma...@gmail.com]
Sent: Friday, November 13, 2015 6:09 AM
To: user@spark.apache.org
Subject: Save GraphX to disk

Hi,

I was wondering how to save a graph to disk and load it back again. I know how 
to save vertices and edges to disk and construct the graph from them, not sure 
if there's any method to save the graph itself to disk.
Best Regards,
Gaurav Kumar
Big Data • Data Science • Photography • Music
+91 9953294125


Re: WARN LoadSnappy: Snappy native library not loaded

2015-11-19 Thread David Rosenstrauch
I ran into this recently.  Turned out we had an old 
org-xerial-snappy.properties file in one of our conf directories that 
had the setting:


# Disables loading Snappy-Java native library bundled in the
# snappy-java-*.jar file forcing to load the Snappy-Java native
# library from the java.library.path.
#
org.xerial.snappy.disable.bundled.libs=true

When I switched that to false, it made the problem go away.

May or may not be your problem of course, but worth a look.

HTH,

DR

On 11/17/2015 05:22 PM, Andy Davidson wrote:

I started a spark POC. I created a ec2 cluster on AWS using spark-c2. I have
3 slaves. In general I am running into trouble even with small work loads. I
am using IPython notebooks running on my spark cluster. Everything is
painfully slow. I am using the standAlone cluster manager. I noticed that I
am getting the following warning on my driver console. Any idea what the
problem might be?



15/11/17 22:01:59 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.

15/11/17 22:03:05 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

15/11/17 22:03:05 WARN LoadSnappy: Snappy native library not loaded



Here is an overview of my POS app. I have a file on hdfs containing about
5000 twitter status strings.

tweetStrings = sc.textFile(dataURL)
jTweets = (tweetStrings.map(lambda x: json.loads(x)).take(10))

Generated the following error ³error occurred while calling o78.partitions.:
java.lang.OutOfMemoryError: GC overhead limit exceeded²

Any idea what we need to do to improve new spark user¹s out of the box
experience?

Kind regards

Andy

export PYSPARK_PYTHON=python3.4

export PYSPARK_DRIVER_PYTHON=python3.4

export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"

MASTER_URL=spark://ec2-55-218-207-122.us-west-1.compute.amazonaws.com:7077


numCores=2

$SPARK_ROOT/bin/pyspark --master $MASTER_URL --total-executor-cores
$numCores $*








-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Passing SPARK_CONF_DIR to slaves in standalone mode under Grid Engine job

2015-07-29 Thread David Chin
Hi, all,

I am just setting up to run Spark in standalone mode, as a (Univa) Grid
Engine job. I have been able to set up the appropriate environment
variables such that the master launches correctly, etc. In my setup, I
generate GE job-specific conf and log dirs.

However, I am finding that the SPARK_* environment variables are not passed
to the worker processes on different physical nodes since they are launched
via ssh. I have added echo commands to sbin/start-slaves.sh and
sbin/slaves.sh scripts and verified that they the appropriate SPARK_*
environment variables set.

Since I have a global installation of Spark, I would like not to have all
Spark jobs write to $SPARK_HOME/work and logs.

I know that ssh can read environment variables from ~/.ssh/environment, but
if a user qsubs 2 different Spark jobs, this won't handle it right.

I appreciate any suggestions.

Thanks,
Dave Chin


Spark-Grid Engine light integration writeup

2015-08-06 Thread David Chin
Hello, all:

I was able to get Spark 1.4.1 and 1.2.0 standalone to run within a Univa
Grid Engine cluster, with some modification to the appropriate sbin
scripts. My write-up is at:


http://linuxfollies.blogspot.com/2015/08/apache-spark-integration-with-grid.html

I'll be glad to get comments from anyone who may be doing something similar.

Cheers,
Dave

-- 
David Chin, Ph.D.
david.c...@drexel.eduSr. Systems Administrator, URCF, Drexel U.
http://www.drexel.edu/research/urcf/
https://linuxfollies.blogspot.com/
215.221.4747 (mobile)
https://github.com/prehensilecode


Problem with take vs. takeSample in PySpark

2015-08-10 Thread David Montague
Hi all,

I am getting some strange behavior with the RDD take function in PySpark
while doing some interactive coding in an IPython notebook.  I am running
PySpark on Spark 1.2.0 in yarn-client mode if that is relevant.

I am using sc.wholeTextFiles and pandas to load a collection of .csv files
into an RDD of pandas dataframes. I create an RDD called train_rdd for
which each row of the RDD contains a label and pandas dataframe pair:

import pandas as pd
from StringIO import StringIO

rdd = sc.wholeTextFiles(data_path, 1000)
train_rdd = rdd.map(lambda x: x[0], pd.read_csv(StringIO(x[1]

In order to test out the next steps I want to take, I am trying to use take to
select one of the dataframes and apply the desired modifications before
writing out the Spark code to apply it to all of the dataframes in parallel.

However, when I try to use take like this:

label, df = train_rdd.take(1)[0]

I get a spark.driver.maxResultSize error (stack trace included at the end
of this message). Now, each of these dataframes is only about 100MB in
size, so should easily fit on the driver and not go over the maxResultSize
limit of 1024MB.

If I instead use takeSample, though, there is no problem:

label, df = train_rdd.takeSample(False, 1, seed=50)[0]

(Here, I have set the seed so that the RDD that is selected is the same one
that the take function is trying to load (i.e., the first one), just to
ensure that it is not because the specific dataframe take is getting is too
large.)

Does calling take result in a collect operation being performed before
outputting the first item? That would explain to me why this error is
occurring, but that seems like poor behavior for the take function. Clearly
takeSample is behaving the way I want it to, but it would be nice if I
could get this behavior with the take function, or at least without needing
to choose an element randomly. I was able to get the behavior I wanted
above by just changing the seed until I got the dataframe I wanted, but I
don't think that is a good approach in general.

Any insight is appreciated.

Best,
David Montague




---
Py4JJavaError Traceback (most recent call last)
 in ()
  1 label_s, df_s = train_rdd.takeSample(False, 1, seed=50)[0]
> 2 label, df = train_rdd.take(1)[0]

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/rdd.py
in take(self, num)
   1109
   1110 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
->  res = self.context.runJob(self, takeUpToNumLeft, p,
True)
   1112
   1113 items += res

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/context.py
in runJob(self, rdd, partitionFunc, partitions, allowLocal)
816 # SparkContext#runJob.
817 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 818 it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, javaPartitions, allowLocal)
819 return list(mappedRDD._collect_iterator_through_file(it))
820

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total
size of serialized results of 177 tasks (1038.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at s

Re: grpah x issue spark 1.3

2015-08-17 Thread David Zeelen
the code below is taken from the spark website and generates the error
detailed

Hi using spark 1.3 and trying some sample code:
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal",
"postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)


when i run:
graph.numEdges
all works well but with
graph.numVertices
it falls over and i get a whole heap of errors:
Failed to open file: /tmp/spark..shuffle_0_21_0.index
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74)
at org.apache.spark.network.server.TransporSLF4J: Class path contains
multiple SLF4J bindings.

Is anyone else experiencing this? Ive tried different graphs and always end
up with the same results.

thanks

On Tue, 18 Aug 2015 at 12:15 am, Sonal Goyal  wrote:

> I have been using graphx in production on 1.3 and 1.4 with no issues.
> What's the  exception you see and what are you trying to do?
> On Aug 17, 2015 10:49 AM, "dizzy5112"  wrote:
>
>> Hi using spark 1.3 and trying some sample code:
>>
>>
>> when i run:
>>
>> all works well but with
>>
>> it falls over and i get a whole heap of errors:
>>
>> Is anyone else experiencing this? Ive tried different graphs and always
>> end
>> up with the same results.
>>
>> thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/grpah-x-issue-spark-1-3-tp24292.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: submit_spark_job_to_YARN

2015-08-30 Thread David Mitchell
Hi Ajay,

Are you trying to save to your local file system or to HDFS?

// This would save to HDFS under "/user/hadoop/counter"
counter.saveAsTextFile("/user/hadoop/counter");

David


On Sun, Aug 30, 2015 at 11:21 AM, Ajay Chander  wrote:

> Hi Everyone,
>
> Recently we have installed spark on yarn in hortonworks cluster. Now I am
> trying to run a wordcount program in my eclipse and I
> did setMaster("local") and I see the results that's as expected. Now I want
> to submit the same job to my yarn cluster from my eclipse. In storm
> basically I was doing the same by using StormSubmitter class and by passing
> nimbus & zookeeper host to Config object. I was looking for something
> exactly the same.
>
> When I went through the documentation online, it read that I am suppose to
> "export HADOOP_HOME_DIR=path to the conf dir". So now I copied the conf
> folder from one of sparks gateway node to my local Unix box. Now I did
> export that dir...
>
> export HADOOP_HOME_DIR=/Users/user1/Documents/conf/
>
> And I did the same in .bash_profile too. Now when I do echo
> $HADOOP_HOME_DIR, I see the path getting printed in the command prompt. Now
> my assumption is, in my program when I change setMaster("local") to
> setMaster("yarn-client") my program should pick up the resource mangers i.e
> yarn cluster info from the directory which I have exported and the job
> should get submitted to resolve manager from my eclipse. But somehow it's
> not happening. Please tell me if my assumption is wrong or if I am missing
> anything here.
>
> I have attached the word count program that I was using. Any help is
> highly appreciated.
>
> Thank you,
> Ajay
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
### Confidential e-mail, for recipient's (or recipients') eyes only, not
for distribution. ###


Event logging not working when worker machine terminated

2015-09-08 Thread David Rosenstrauch
Our Spark cluster is configured to write application history event 
logging to a directory on HDFS.  This all works fine.  (I've tested it 
with Spark shell.)


However, on a large, long-running job that we ran tonight, one of our 
machines at the cloud provider had issues and had to be terminated and 
replaced in the middle of the job.


The job completed correctly, and shows in state FINISHED in the 
"Completed Applications" section of the Spark GUI.  However, when I try 
to look at the application's history, the GUI says "Application history 
not found" and "Application ... is still in progress".


The reason appears to be the machine that was terminated.  When I click 
on the executor list for that job, Spark is showing the executor from 
the terminated machine as still in state RUNNING.


Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.

Thanks,

DR

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Event logging not working when worker machine terminated

2015-09-09 Thread David Rosenstrauch

Standalone.

On 09/08/2015 11:18 PM, Jeff Zhang wrote:

What cluster mode do you use ? Standalone/Yarn/Mesos ?


On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch 
wrote:


Our Spark cluster is configured to write application history event logging
to a directory on HDFS.  This all works fine.  (I've tested it with Spark
shell.)

However, on a large, long-running job that we ran tonight, one of our
machines at the cloud provider had issues and had to be terminated and
replaced in the middle of the job.

The job completed correctly, and shows in state FINISHED in the "Completed
Applications" section of the Spark GUI.  However, when I try to look at the
application's history, the GUI says "Application history not found" and
"Application ... is still in progress".

The reason appears to be the machine that was terminated.  When I click on
the executor list for that job, Spark is showing the executor from the
terminated machine as still in state RUNNING.

Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.

Thanks,

DR

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org








-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Event logging not working when worker machine terminated

2015-09-09 Thread David Rosenstrauch
Thanks for the info.  Do you know if there's a ticket already open for 
this issue?  If so, I'd like to monitor it.


Thanks,

DR

On 09/09/2015 11:50 AM, Charles Chao wrote:

I have encountered the same problem after migrating from 1.2.2 to 1.3.0.
After some searching this appears to be a bug introduced in 1.3. Hopefully
it¹s fixed in 1.4.

Thanks,

Charles





On 9/9/15, 7:30 AM, "David Rosenstrauch"  wrote:


Standalone.

On 09/08/2015 11:18 PM, Jeff Zhang wrote:

What cluster mode do you use ? Standalone/Yarn/Mesos ?


On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch 
wrote:


Our Spark cluster is configured to write application history event
logging
to a directory on HDFS.  This all works fine.  (I've tested it with
Spark
shell.)

However, on a large, long-running job that we ran tonight, one of our
machines at the cloud provider had issues and had to be terminated and
replaced in the middle of the job.

The job completed correctly, and shows in state FINISHED in the
"Completed
Applications" section of the Spark GUI.  However, when I try to look
at the
application's history, the GUI says "Application history not found" and
"Application ... is still in progress".

The reason appears to be the machine that was terminated.  When I
click on
the executor list for that job, Spark is showing the executor from the
terminated machine as still in state RUNNING.

Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.

Thanks,

DR

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org








-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Suggestion

2015-09-15 Thread David Morales
Hi there,

This is exactly our goal in Stratio Sparkta, a real-time aggregation engine
fully developed with spark streaming (and fully open source).

Take a look at:


   - the docs: http://docs.stratio.com/modules/sparkta/development/
   - the repository: https://github.com/Stratio/sparkta
   - and some slides explaining how sparkta was born and what it makes:
   http://www.slideshare.net/Stratio/strata-sparkta


Feel free to ask us anything about the project.








2015-09-15 8:10 GMT+02:00 srungarapu vamsi :

> The batch approach i had implemented takes about 10 minutes to complete
> all the pre-computation tasks for the one hour worth of data. When i went
> through my code, i figured out that most of the time consuming tasks are
> the ones, which read data from cassandra and the places where i perform
> sparkContex.union(Array[RDD]).
> Now the ask is to get the pre computation tasks near real time. So i am
> exploring the streaming approach.
>
> My pre computation tasks not only include just finding the unique numbers
> for a given device every minute, every hour, every day but it also includes
> the following tasks:
> 1. Find the number of unique numbers across a set of devices every minute,
> every hour, every day
> 2. Find the number of unique numbers which are commonly occurring across a
> set of devices every minute, every hour, every day
> 3. Find (total time a number occurred across a set of devices)/(total
> unique numbers occurred across the set of devices)
> The above mentioned pre computation tasks are just a few of what i will be
> needing and there are many more coming towards me :)
> I see all these problems need more of data parallel approach and hence i
> am interested to do this on the spark streaming end.
>
>
> On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke 
> wrote:
>
>> Why did you not stay with the batch approach? For me the architecture
>> looks very complex for a simple thing you want to achieve. Why don't you
>> process the data already in storm ?
>>
>> Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi 
>> a écrit :
>>
>>> I am pretty new to spark. Please suggest a better model for the
>>> following use case.
>>>
>>> I have few (about 1500) devices in field which keep emitting about 100KB
>>> of data every minute. The nature of data sent by the devices is just a list
>>> of numbers.
>>> As of now, we have Storm is in the architecture which receives this
>>> data, sanitizes it and writes to cassandra.
>>> Now, i have a requirement to process this data. The processing includes
>>> finding unique numbers emitted by one or more devices for every minute,
>>> every hour, every day, every month.
>>> I had implemented this processing part as a batch job execution and now
>>> i am interested in making it a streaming application. i.e calculating the
>>> processed data as and when devices emit the data.
>>>
>>> I have the following two approaches:
>>> 1. Storm writes the actual data to cassandra and writes a message on
>>> Kafka bus that data corresponding to device D and minute M has been written
>>> to cassandra
>>>
>>> Then Spark streaming reads this message from kafka , then reads the data
>>> of Device D at minute M from cassandra and starts processing the data.
>>>
>>> 2. Storm writes the data to both cassandra and  kafka, spark reads the
>>> actual data from kafka , processes the data and writes to cassandra.
>>> The second approach avoids additional hit of reading from cassandra
>>> every minute , a device has written data to cassandra at the cost of
>>> putting the actual heavy messages instead of light events on  kafka.
>>>
>>> I am a bit confused among the two approaches. Please suggest which one
>>> is better and if both are bad, how can i handle this use case?
>>>
>>>
>>> --
>>> /Vamsi
>>>
>>
>
>
> --
> /Vamsi
>



-- 

David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf
<https://twitter.com/dmoralesdf>


<http://www.stratio.com/>
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
<https://twitter.com/StratioBD>*


  1   2   3   >