Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Antony Mayi
spark.cleaner.ttl is not the right way - seems to be really designed for 
streaming. although it keeps the disk usage under control it also causes loss 
of rdds and broadcasts that are required later leading to crash.
is there any other way?thanks,Antony. 

 On Sunday, 15 February 2015, 21:42, Antony Mayi  
wrote:
   
 

 spark.cleaner.ttl ? 

 On Sunday, 15 February 2015, 18:23, Antony Mayi  
wrote:
   
 

 Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

 


 
   

Re: Loading tables using parquetFile vs. loading tables from Hive metastore with Parquet serde

2015-02-16 Thread Cheng Lian
Hi Jianshi,

When accessing a Hive table with Parquet SerDe, Spark SQL tries to convert
it into Spark SQL's native Parquet support for better performance. And yes,
predicate push-down, column pruning are applied here. In 1.3.0, we'll also
cover the write path except for writing partitioned table.

Cheng

On Sun Feb 15 2015 at 9:22:15 AM Jianshi Huang 
wrote:

> Hi,
>
> If I have a table in Hive metastore saved as Parquet, and I want to use it
> in Spark. It seems Spark will use Hive's Parquet serde to load the actual
> data.
>
> So is there any difference here? Will predicate pushdown, pruning and
> future Parquet optimizations in SparkSQL work for using Hive serde?
>
> Loading tables using parquetFile vs. loading tables from Hive metastore
> with Parquet serde
>
>
> Thanks,
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>


Re: Array in broadcast can't be serialized

2015-02-16 Thread Tao Xiao
Thanks Ted

After searching for a whole day, I still don't know how to let spark use
twitter chill serialization - there are very few documents about how to
integrate twitter chill into Spark for serialization. I tried the
following, but an exception of "java.lang.ClassCastException:
com.twitter.chill.WrappedArraySerializer cannot be cast to
org.apache.spark.serializer.Serializer" was thrown:

val conf = new SparkConf()
   .setAppName("Test Serialization")
   .set("spark.serializer",
"com.twitter.chill.WrappedArraySerializer")


Well, what is the correct way of configuring Spark to use the twitter chill
serialization framework ?







2015-02-15 22:23 GMT+08:00 Ted Yu :

> I was looking at https://github.com/twitter/chill
>
> It seems this would achieve what you want:
> chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala
>
> Cheers
>
> On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao 
> wrote:
>
>> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
>> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
>> serialized even when I registered both of them in Kryo.
>>
>> The code is as follows:
>>
>>val conf = new SparkConf()
>> .setAppName("Hello Spark")
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> .set("spark.kryo.registrator", "xt.MyKryoRegistrator")
>>
>> val sc = new SparkContext(conf)
>>
>> val rdd = sc.parallelize(List(
>> (new ImmutableBytesWritable(Bytes.toBytes("AAA")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("BBB")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("CCC")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("DDD")),
>> new KeyValue())), 4)
>>
>> // snippet 1:  a single object of *ImmutableBytesWritable* can
>> be serialized in broadcast
>> val partitioner = new SingleElementPartitioner(sc.broadcast(new
>> ImmutableBytesWritable(Bytes.toBytes(3
>> val ret = rdd.aggregateByKey(List[KeyValue](),
>> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
>> println("\n\n\ret.count = " + ret.count + ",  partition size = "
>> + ret.partitions.size)
>>
>> // snippet 2: an array of *ImmutableBytesWritable* can not be
>> serialized in broadcast
>> val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
>> ImmutableBytesWritable(Bytes.toBytes(2)), new
>> ImmutableBytesWritable(Bytes.toBytes(3)))
>> val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
>> val ret1 = rdd.aggregateByKey(List[KeyValue](),
>> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
>> println("\n\n\nrdd2.count = " + ret1.count)
>>
>> sc.stop
>>
>>
>>   // the following are kryo registrator and partitioners
>>class MyKryoRegistrator extends KryoRegistrator {
>> override def registerClasses(kryo: Kryo): Unit = {
>>  kryo.register(classOf[ImmutableBytesWritable])   //
>> register ImmutableBytesWritable
>>  kryo.register(classOf[Array[ImmutableBytesWritable]])
>>  // register Array[ImmutableBytesWritable]
>> }
>>}
>>
>>class SingleElementPartitioner(bc:
>> Broadcast[ImmutableBytesWritable]) extends Partitioner {
>> override def numPartitions: Int = 5
>> def v = Bytes.toInt(bc.value.get)
>> override def getPartition(key: Any): Int =  v - 1
>>}
>>
>>
>> class ArrayPartitioner(bc:
>> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
>> val arr = bc.value
>> override def numPartitions: Int = arr.length
>> override def getPartition(key: Any): Int =
>> Bytes.toInt(arr(0).get)
>> }
>>
>>
>>
>> In the code above, snippet 1 can work as expected. But snippet 2 throws
>> "Task not serializable: java.io.NotSerializableException:
>> org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .
>>
>>
>> So do I have to implement a Kryo serializer for Array[T] if it is used in
>> broadcast ?
>>
>> Thanks
>>
>>
>>
>>
>>
>


Which OutputCommitter to use for S3?

2015-02-16 Thread Mingyu Kim
HI all,

The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to 
require moving files at the commit step, which is not a constant operation in 
S3, as discussed in 
http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E.
 People seem to develop their own NullOutputCommitter implementation or use 
DirectFileOutputCommitter (as mentioned in 
SPARK-3595), but I wanted to 
check if there is a de facto standard, publicly available OutputCommitter to 
use for S3 in conjunction with Spark.

Thanks,
Mingyu


Spark Streaming and SQL checkpoint error: (java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf)

2015-02-16 Thread Haopu Wang
I have a streaming application which registered temp table on a
HiveContext for each batch duration.

The application runs well in Spark 1.1.0. But I get below error from
1.1.1.

Do you have any suggestions to resolve it? Thank you!

 

java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf

- field (class "scala.Tuple2", name: "_1", type: "class
java.lang.Object")

- object (class "scala.Tuple2", (Configuration: core-default.xml,
core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23,org.apa
che.hadoop.hive.ql.session.SessionState@49b6eef9))

- field (class "org.apache.spark.sql.hive.HiveContext", name: "x$3",
type: "class scala.Tuple2")

- object (class "org.apache.spark.sql.hive.HiveContext",
org.apache.spark.sql.hive.HiveContext@4e6e66a4)

- field (class
"com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$regi
sterTempTable$2", name: "sqlContext$1", type: "class
org.apache.spark.sql.SQLContext")

   - object (class
"com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$regi
sterTempTable$2", )

- field (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1",
name: "foreachFunc$1", type: "interface scala.Function1")

- object (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1",
)

- field (class "org.apache.spark.streaming.dstream.ForEachDStream",
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
type: "interface scala.Function2")

- object (class "org.apache.spark.streaming.dstream.ForEachDStream",
org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)

- element of array (index: 0)

- array (class "[Ljava.lang.Object;", size: 16)

- field (class "scala.collection.mutable.ArrayBuffer", name:
"array", type: "class [Ljava.lang.Object;")

- object (class "scala.collection.mutable.ArrayBuffer",
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20))

- field (class "org.apache.spark.streaming.DStreamGraph", name:
"outputStreams", type: "class scala.collection.mutable.ArrayBuffer")

- custom writeObject data (class
"org.apache.spark.streaming.DStreamGraph")

- object (class "org.apache.spark.streaming.DStreamGraph",
org.apache.spark.streaming.DStreamGraph@776ae7da)

- field (class "org.apache.spark.streaming.Checkpoint", name:
"graph", type: "class org.apache.spark.streaming.DStreamGraph")

- root object (class "org.apache.spark.streaming.Checkpoint",
org.apache.spark.streaming.Checkpoint@5eade065)

at java.io.ObjectOutputStream.writeObject0(Unknown Source)

 



Re: Writing to HDFS from spark Streaming

2015-02-16 Thread Sean Owen
PS this is the real fix to this issue:

https://issues.apache.org/jira/browse/SPARK-5795

I'd like to merge it as I don't think it breaks the API; it actually
fixes it to work as intended.

On Mon, Feb 16, 2015 at 3:25 AM, Bahubali Jain  wrote:
> I used the latest assembly jar and the below as suggested by Akhil to fix
> this problem...
> temp.saveAsHadoopFiles("DailyCSV",".txt", String.class, String.class,(Class)
> TextOutputFormat.class);
>
> Thanks All for the help !
>
> On Wed, Feb 11, 2015 at 1:38 PM, Sean Owen  wrote:
>>
>> That kinda dodges the problem by ignoring generic types. But it may be
>> simpler than the 'real' solution, which is a bit ugly.
>>
>> (But first, to double check, are you importing the correct
>> TextOutputFormat? there are two versions. You use .mapred. with the
>> old API and .mapreduce. with the new API.)
>>
>> Here's how I've formally casted around it in similar code:
>>
>> @SuppressWarnings
>> Class> outputFormatClass =
>> (Class>) (Class)
>> TextOutputFormat.class;
>>
>> and then pass that as the final argument.
>>
>> On Wed, Feb 11, 2015 at 6:35 AM, Akhil Das 
>> wrote:
>> > Did you try :
>> >
>> > temp.saveAsHadoopFiles("DailyCSV",".txt", String.class,
>> > String.class,(Class)
>> > TextOutputFormat.class);
>> >
>> > Thanks
>> > Best Regards
>> >
>> > On Wed, Feb 11, 2015 at 9:40 AM, Bahubali Jain 
>> > wrote:
>> >>
>> >> Hi,
>> >> I am facing issues while writing data from a streaming rdd to hdfs..
>> >>
>> >> JavaPairDstream temp;
>> >> ...
>> >> ...
>> >> temp.saveAsHadoopFiles("DailyCSV",".txt", String.class,
>> >> String.class,TextOutputFormat.class);
>> >>
>> >>
>> >> I see compilation issues as below...
>> >> The method saveAsHadoopFiles(String, String, Class, Class,
>> >> Class> >> extends OutputFormat>) in the type JavaPairDStream
>> >> is
>> >> not applicable for the arguments (String, String, Class,
>> >> Class, Class)
>> >>
>> >> I see same kind of problem even with saveAsNewAPIHadoopFiles API .
>> >>
>> >> Thanks,
>> >> Baahu
>> >
>> >
>
>
>
>
> --
> Twitter:http://twitter.com/Baahu
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
Hello,

I have an application in Java that uses Spark Streaming 1.2.1 in the
following manner:

 - Listen to the input directory.
 - If a new file is copied to that input directory process it.
 - Process: contact a RESTful web service (running also locally and
responsive), send the contents of the file, receive the response from the
web service, write the results as a new file into the output directory
 - batch interval : 30 seconds
 - checkpoint interval: 150 seconds

When I test the application locally with 1 or 2 files, it works perfectly
fine as expected. I run it like:

spark-submit --class myClass --verbose --master local[4]
--deploy-mode client myApp.jar /in file:///out

But then I've realized something strange when I copied 20 files to the
INPUT directory: Spark Streaming detects all of the files, but it ends up
processing *only 16 files*. And the remaining 4 are not processed at all.

I've tried it with 19, 18, and then 17 files. Same result, only 16 files
end up in the output directory.

Then I've tried it by copying 16 files at once to the input directory, and
it can process all of the 16 files. That's why I call it magic number 16.

When I mean it detects all of the files, I mean that in the logs I see the
following lines when I copy 17 files:

===
2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: "1G"
2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves to
a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0)
2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to bind
to another address
2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
file:/tmp/receivedBlockMetadata
2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Reading from the logs:
file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
---
Time: 142408626 ms
---

2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 142408596:
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 142408596
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 142408596:
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 142408596
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 142408599:
2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 142408599

---

Time: 142408629 ms
---
===

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
I've managed to solve this, but I still don't know exactly why my solution
works:

In my code I was trying to force the Spark to output via:

  jsonIn.print();

jsonIn being a JavaDStream.

When removed the code above, and added the code below to force the output
operation, hence the execution:

jsonIn.foreachRDD(new Function, Void>() {
  @Override
  public Void call(JavaRDD stringJavaRDD) throws Exception {
stringJavaRDD.collect();
return null;
  }
});

It works as I expect, processing all of the 20 files I give to it, instead
of stopping at 16.

--
Emre


On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc  wrote:

> Hello,
>
> I have an application in Java that uses Spark Streaming 1.2.1 in the
> following manner:
>
>  - Listen to the input directory.
>  - If a new file is copied to that input directory process it.
>  - Process: contact a RESTful web service (running also locally and
> responsive), send the contents of the file, receive the response from the
> web service, write the results as a new file into the output directory
>  - batch interval : 30 seconds
>  - checkpoint interval: 150 seconds
>
> When I test the application locally with 1 or 2 files, it works perfectly
> fine as expected. I run it like:
>
> spark-submit --class myClass --verbose --master local[4]
> --deploy-mode client myApp.jar /in file:///out
>
> But then I've realized something strange when I copied 20 files to the
> INPUT directory: Spark Streaming detects all of the files, but it ends up
> processing *only 16 files*. And the remaining 4 are not processed at all.
>
> I've tried it with 19, 18, and then 17 files. Same result, only 16 files
> end up in the output directory.
>
> Then I've tried it by copying 16 files at once to the input directory, and
> it can process all of the 16 files. That's why I call it magic number 16.
>
> When I mean it detects all of the files, I mean that in the logs I see the
> following lines when I copy 17 files:
>
>
> ===
> 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: "1G"
> 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
> to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
> eth0)
> 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
> bind to another address
> 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
> 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable
> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
> file:/tmp/receivedBlockMetadata
> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Reading from the logs:
> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
> ---
> Time: 142408626 ms
> ---
>
> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
> file:/tmp/receivedBlockMetadata older than 142408596:
> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Cleared log files in
> file:/tmp/receivedBlockMetadata older than 142408596
> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
> file:/tmp/receivedBlockMetadata older than 142408596:
> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Cleared log files in
> file:/tmp/receivedBlockMetadata older than 142408596
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input pat

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Sean Owen
How are you deciding whether files are processed or not? It doesn't seem
possible from this code. Maybe it just seems so.
On Feb 16, 2015 12:51 PM, "Emre Sevinc"  wrote:

> I've managed to solve this, but I still don't know exactly why my solution
> works:
>
> In my code I was trying to force the Spark to output via:
>
>   jsonIn.print();
>
> jsonIn being a JavaDStream.
>
> When removed the code above, and added the code below to force the output
> operation, hence the execution:
>
> jsonIn.foreachRDD(new Function, Void>() {
>   @Override
>   public Void call(JavaRDD stringJavaRDD) throws Exception {
> stringJavaRDD.collect();
> return null;
>   }
> });
>
> It works as I expect, processing all of the 20 files I give to it, instead
> of stopping at 16.
>
> --
> Emre
>
>
> On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc 
> wrote:
>
>> Hello,
>>
>> I have an application in Java that uses Spark Streaming 1.2.1 in the
>> following manner:
>>
>>  - Listen to the input directory.
>>  - If a new file is copied to that input directory process it.
>>  - Process: contact a RESTful web service (running also locally and
>> responsive), send the contents of the file, receive the response from the
>> web service, write the results as a new file into the output directory
>>  - batch interval : 30 seconds
>>  - checkpoint interval: 150 seconds
>>
>> When I test the application locally with 1 or 2 files, it works perfectly
>> fine as expected. I run it like:
>>
>> spark-submit --class myClass --verbose --master local[4]
>> --deploy-mode client myApp.jar /in file:///out
>>
>> But then I've realized something strange when I copied 20 files to the
>> INPUT directory: Spark Streaming detects all of the files, but it ends up
>> processing *only 16 files*. And the remaining 4 are not processed at all.
>>
>> I've tried it with 19, 18, and then 17 files. Same result, only 16 files
>> end up in the output directory.
>>
>> Then I've tried it by copying 16 files at once to the input directory,
>> and it can process all of the 16 files. That's why I call it magic number
>> 16.
>>
>> When I mean it detects all of the files, I mean that in the logs I see
>> the following lines when I copy 17 files:
>>
>>
>> ===
>> 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: "1G"
>> 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
>> to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
>> eth0)
>> 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
>> bind to another address
>> 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
>> 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
>> file:/tmp/receivedBlockMetadata
>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Reading from the logs:
>> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
>> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
>> ---
>> Time: 142408626 ms
>> ---
>>
>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>> file:/tmp/receivedBlockMetadata older than 142408596:
>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>> file:/tmp/receivedBlockMetadata older than 142408596
>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>> file:/tmp/receivedBlockMetadata older than 142408596:
>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>> file:/tmp/receivedBlockMetadata older than 142408596
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
Hello Sean,

I did not understand your question very well, but what I do is checking the
output directory (and I have various logger outputs at various stages
showing the contents of an input file being processed, the response from
the web service, etc.).

By the way, I've already solved my problem by using foreachRDD instead of
print (see my second message in this thread). Apparently forcing Spark to
materialize DAG via print() is not the way to go. (My interpretation might
be wrong, but this is what I've just seen in my case).

--
Emre




On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen  wrote:

> How are you deciding whether files are processed or not? It doesn't seem
> possible from this code. Maybe it just seems so.
> On Feb 16, 2015 12:51 PM, "Emre Sevinc"  wrote:
>
>> I've managed to solve this, but I still don't know exactly why my
>> solution works:
>>
>> In my code I was trying to force the Spark to output via:
>>
>>   jsonIn.print();
>>
>> jsonIn being a JavaDStream.
>>
>> When removed the code above, and added the code below to force the output
>> operation, hence the execution:
>>
>> jsonIn.foreachRDD(new Function, Void>() {
>>   @Override
>>   public Void call(JavaRDD stringJavaRDD) throws Exception {
>> stringJavaRDD.collect();
>> return null;
>>   }
>> });
>>
>> It works as I expect, processing all of the 20 files I give to it,
>> instead of stopping at 16.
>>
>> --
>> Emre
>>
>>
>> On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc 
>> wrote:
>>
>>> Hello,
>>>
>>> I have an application in Java that uses Spark Streaming 1.2.1 in the
>>> following manner:
>>>
>>>  - Listen to the input directory.
>>>  - If a new file is copied to that input directory process it.
>>>  - Process: contact a RESTful web service (running also locally and
>>> responsive), send the contents of the file, receive the response from the
>>> web service, write the results as a new file into the output directory
>>>  - batch interval : 30 seconds
>>>  - checkpoint interval: 150 seconds
>>>
>>> When I test the application locally with 1 or 2 files, it works
>>> perfectly fine as expected. I run it like:
>>>
>>> spark-submit --class myClass --verbose --master local[4]
>>> --deploy-mode client myApp.jar /in file:///out
>>>
>>> But then I've realized something strange when I copied 20 files to the
>>> INPUT directory: Spark Streaming detects all of the files, but it ends up
>>> processing *only 16 files*. And the remaining 4 are not processed at all.
>>>
>>> I've tried it with 19, 18, and then 17 files. Same result, only 16 files
>>> end up in the output directory.
>>>
>>> Then I've tried it by copying 16 files at once to the input directory,
>>> and it can process all of the 16 files. That's why I call it magic number
>>> 16.
>>>
>>> When I mean it detects all of the files, I mean that in the logs I see
>>> the following lines when I copy 17 files:
>>>
>>>
>>> ===
>>> 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
>>> "1G"
>>> 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
>>> to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
>>> eth0)
>>> 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
>>> bind to another address
>>> 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
>>> 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
>>> native-hadoop library for your platform... using builtin-java classes where
>>> applicable
>>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>>> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
>>> file:/tmp/receivedBlockMetadata
>>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>>> ReceivedBlockHandlerMaster:59 - Reading from the logs:
>>> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
>>> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
>>> ---
>>> Time: 142408626 ms
>>> ---
>>>
>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>>> file:/tmp/receivedBlockMetadata older than 142408596:
>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>>> file:/tmp/receivedBlockMetadata older than 142408596
>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>>> file:/tmp/receivedBlockMetadata older than 142408596:
>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>>> file:/tmp/receivedBlockMetadata older than 142408596
>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>> process 

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Sean Owen
Materialization shouldn't be relevant. The collect by itself doesn't let
you detect whether it happened. Print should print some results to the
console but on different machines, so may not be a reliable way to see what
happened.

Yes I understand your real process uses foreachRDD and that's what you
should use. It sounds like that works. But you must always have been using
that right? What do you mean that you changed to use it?

Basically I'm not clear on what the real code does and what about the
output of that code tells you only 16 files were processed.
On Feb 16, 2015 1:18 PM, "Emre Sevinc"  wrote:

> Hello Sean,
>
> I did not understand your question very well, but what I do is checking
> the output directory (and I have various logger outputs at various stages
> showing the contents of an input file being processed, the response from
> the web service, etc.).
>
> By the way, I've already solved my problem by using foreachRDD instead of
> print (see my second message in this thread). Apparently forcing Spark to
> materialize DAG via print() is not the way to go. (My interpretation might
> be wrong, but this is what I've just seen in my case).
>
> --
> Emre
>
>
>
>
> On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen  wrote:
>
>> How are you deciding whether files are processed or not? It doesn't seem
>> possible from this code. Maybe it just seems so.
>> On Feb 16, 2015 12:51 PM, "Emre Sevinc"  wrote:
>>
>>> I've managed to solve this, but I still don't know exactly why my
>>> solution works:
>>>
>>> In my code I was trying to force the Spark to output via:
>>>
>>>   jsonIn.print();
>>>
>>> jsonIn being a JavaDStream.
>>>
>>> When removed the code above, and added the code below to force the
>>> output operation, hence the execution:
>>>
>>> jsonIn.foreachRDD(new Function, Void>() {
>>>   @Override
>>>   public Void call(JavaRDD stringJavaRDD) throws Exception {
>>> stringJavaRDD.collect();
>>> return null;
>>>   }
>>> });
>>>
>>> It works as I expect, processing all of the 20 files I give to it,
>>> instead of stopping at 16.
>>>
>>> --
>>> Emre
>>>
>>>
>>> On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc 
>>> wrote:
>>>
 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works
 perfectly fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16
 files end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory,
 and it can process all of the 16 files. That's why I call it magic number
 16.

 When I mean it detects all of the files, I mean that in the logs I see
 the following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
 "1G"
 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu
 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on
 interface eth0)
 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
 bind to another address
 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
 file:/tmp/receivedBlockMetadata
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Reading from the logs:
 file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
 ---
 Time: 142408626 ms
 ---

 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerM

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Akhil Das
Instead of print you should do jsonIn.count().print(). Straight forward
approach is to use foreachRDD :)

Thanks
Best Regards

On Mon, Feb 16, 2015 at 6:48 PM, Emre Sevinc  wrote:

> Hello Sean,
>
> I did not understand your question very well, but what I do is checking
> the output directory (and I have various logger outputs at various stages
> showing the contents of an input file being processed, the response from
> the web service, etc.).
>
> By the way, I've already solved my problem by using foreachRDD instead of
> print (see my second message in this thread). Apparently forcing Spark to
> materialize DAG via print() is not the way to go. (My interpretation might
> be wrong, but this is what I've just seen in my case).
>
> --
> Emre
>
>
>
>
> On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen  wrote:
>
>> How are you deciding whether files are processed or not? It doesn't seem
>> possible from this code. Maybe it just seems so.
>> On Feb 16, 2015 12:51 PM, "Emre Sevinc"  wrote:
>>
>>> I've managed to solve this, but I still don't know exactly why my
>>> solution works:
>>>
>>> In my code I was trying to force the Spark to output via:
>>>
>>>   jsonIn.print();
>>>
>>> jsonIn being a JavaDStream.
>>>
>>> When removed the code above, and added the code below to force the
>>> output operation, hence the execution:
>>>
>>> jsonIn.foreachRDD(new Function, Void>() {
>>>   @Override
>>>   public Void call(JavaRDD stringJavaRDD) throws Exception {
>>> stringJavaRDD.collect();
>>> return null;
>>>   }
>>> });
>>>
>>> It works as I expect, processing all of the 20 files I give to it,
>>> instead of stopping at 16.
>>>
>>> --
>>> Emre
>>>
>>>
>>> On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc 
>>> wrote:
>>>
 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works
 perfectly fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16
 files end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory,
 and it can process all of the 16 files. That's why I call it magic number
 16.

 When I mean it detects all of the files, I mean that in the logs I see
 the following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
 "1G"
 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu
 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on
 interface eth0)
 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
 bind to another address
 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
 file:/tmp/receivedBlockMetadata
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Reading from the logs:
 file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
 ---
 Time: 142408626 ms
 ---

 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file

hive-thriftserver maven artifact

2015-02-16 Thread Marco
Hi,

I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive
Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact
in a public repository ? I have not found it @Maven Central.

Thanks,
Marco


Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
Sean,

In this case, I've been testing the code on my local machine and using
Spark locally, so I all the log output was available on my terminal. And
I've used the .print() method to have an output operation, just to force
Spark execute.

And I was not using foreachRDD, I was only using print() method on a
JavaDStream object, and it was working fine for a few files, up to 16 (and
without print() it did not do anything because there were no output
operations).

To sum it up, in my case:

 - Initially, use .print() and no foreachRDD: processes up to 16 files and
does not do anything for the remaining 4.
 - Remove .print() and use foreachRDD: processes all of the 20 files.

Maybe, as in Akhil Das's suggestion, using .count.print() might also have
fixed my problem, but I'm satisfied with foreachRDD approach for now.
(Though it is still a mystery to me why using .print() had a difference,
maybe my mental model of Spark is wrong, I thought no matter what output
operation I used, the number of files processed by Spark would be
independent of that because the processing is done in a different method,
.print() is only used to force Spark execute that processing, am I wrong?).

--
Emre


On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen  wrote:

> Materialization shouldn't be relevant. The collect by itself doesn't let
> you detect whether it happened. Print should print some results to the
> console but on different machines, so may not be a reliable way to see what
> happened.
>
> Yes I understand your real process uses foreachRDD and that's what you
> should use. It sounds like that works. But you must always have been using
> that right? What do you mean that you changed to use it?
>
> Basically I'm not clear on what the real code does and what about the
> output of that code tells you only 16 files were processed.
> On Feb 16, 2015 1:18 PM, "Emre Sevinc"  wrote:
>
>> Hello Sean,
>>
>> I did not understand your question very well, but what I do is checking
>> the output directory (and I have various logger outputs at various stages
>> showing the contents of an input file being processed, the response from
>> the web service, etc.).
>>
>> By the way, I've already solved my problem by using foreachRDD instead of
>> print (see my second message in this thread). Apparently forcing Spark to
>> materialize DAG via print() is not the way to go. (My interpretation might
>> be wrong, but this is what I've just seen in my case).
>>
>> --
>> Emre
>>
>>
>>
>>
>> On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen  wrote:
>>
>>> How are you deciding whether files are processed or not? It doesn't seem
>>> possible from this code. Maybe it just seems so.
>>> On Feb 16, 2015 12:51 PM, "Emre Sevinc"  wrote:
>>>
 I've managed to solve this, but I still don't know exactly why my
 solution works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStream.

 When removed the code above, and added the code below to force the
 output operation, hence the execution:

 jsonIn.foreachRDD(new Function, Void>() {
   @Override
   public Void call(JavaRDD stringJavaRDD) throws Exception {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it,
 instead of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc 
 wrote:

> Hello,
>
> I have an application in Java that uses Spark Streaming 1.2.1 in the
> following manner:
>
>  - Listen to the input directory.
>  - If a new file is copied to that input directory process it.
>  - Process: contact a RESTful web service (running also locally and
> responsive), send the contents of the file, receive the response from the
> web service, write the results as a new file into the output directory
>  - batch interval : 30 seconds
>  - checkpoint interval: 150 seconds
>
> When I test the application locally with 1 or 2 files, it works
> perfectly fine as expected. I run it like:
>
> spark-submit --class myClass --verbose --master local[4]
> --deploy-mode client myApp.jar /in file:///out
>
> But then I've realized something strange when I copied 20 files to the
> INPUT directory: Spark Streaming detects all of the files, but it ends up
> processing *only 16 files*. And the remaining 4 are not processed at all.
>
> I've tried it with 19, 18, and then 17 files. Same result, only 16
> files end up in the output directory.
>
> Then I've tried it by copying 16 files at once to the input directory,
> and it can process all of the 16 files. That's why I call it magic number
> 16.
>
> When I mean it detects all of the files, I mean that in the logs I see
> the following lines when I copy 17 files:
>
>

Re: hive-thriftserver maven artifact

2015-02-16 Thread Ted Yu
I searched for 'spark-hive-thriftserver_2.10' on this page:
http://mvnrepository.com/artifact/org.apache.spark

Looks like it is not published.

On Mon, Feb 16, 2015 at 5:44 AM, Marco  wrote:

> Hi,
>
> I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive
> Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact
> in a public repository ? I have not found it @Maven Central.
>
> Thanks,
> Marco
>
>


Re: Extract hour from Timestamp in Spark SQL

2015-02-16 Thread Wush Wu
Dear Cheng Hao,

You are right!

After using the HiveContext, the issue is solved.

Thanks,
Wush

2015-02-15 10:42 GMT+08:00 Cheng, Hao :

>  Are you using the SQLContext? I think the HiveContext is recommended.
>
>
>
> Cheng Hao
>
>
>
> *From:* Wush Wu [mailto:w...@bridgewell.com]
> *Sent:* Thursday, February 12, 2015 2:24 PM
> *To:* u...@spark.incubator.apache.org
> *Subject:* Extract hour from Timestamp in Spark SQL
>
>
>
> Dear all,
>
> I am new to Spark SQL and have no experience of Hive.
>
> I tried to use the built-in Hive Function to extract the hour from
> timestamp in spark sql, but got : "java.util.NoSuchElementException: key
> not found: hour"
>
> How should I extract the hour from timestamp?
>
> And I am very confusing about which functions I could use in Spark SQL. Is
> there any list of available functions except
> http://spark.apache.org/docs/1.2.0/sql-programming-guide.html#compatibility-with-apache-hive
> ?
>
> Thanks,
> Wush
>
>
>
>
>


MLib usage on Spark Streaming

2015-02-16 Thread Spico Florin
Hello!
  I'm newbie to Spark and I have the following case study:
1. Client sending at 100ms the following data:
  {uniqueId, timestamp, measure1, measure2 }
2. Each 30 seconds I would like to correlate the data collected in the
window, with some predefined double vector pattern for each given key. The
predefined pattern has 300 records. The data should be also sorted by
timestamp.
3. When the correlation is greater than a predefined threshold (e.g 0.9) I
would like to emit an new message containing {uniqueId,
doubleCorrelationValue}
4. For the correlation I would like to use MLlib
5. As a programming language I would like to muse Java 7.

Can you please give me some suggestions on how to create the skeleton for
the above scenario?

Thanks.
 Regards,
 Florin


Re: hive-thriftserver maven artifact

2015-02-16 Thread Marco
Ok, so will it be only available for the next version (1.30)?

2015-02-16 15:24 GMT+01:00 Ted Yu :

> I searched for 'spark-hive-thriftserver_2.10' on this page:
> http://mvnrepository.com/artifact/org.apache.spark
>
> Looks like it is not published.
>
> On Mon, Feb 16, 2015 at 5:44 AM, Marco  wrote:
>
>> Hi,
>>
>> I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive
>> Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact
>> in a public repository ? I have not found it @Maven Central.
>>
>> Thanks,
>> Marco
>>
>>
>


-- 
Viele Grüße,
Marco


Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Emre Sevinc
Hello,

I'm using Spark 1.2.1 and have a module.properties file, and in it I have
non-Spark properties, as well as Spark properties, e.g.:

   job.output.dir=file:///home/emre/data/mymodule/out

I'm trying to pass it to spark-submit via:

   spark-submit --class com.myModule --master local[4] --deploy-mode client
--verbose --properties-file /home/emre/data/mymodule.properties
mymodule.jar

And I thought I could read the value of my non-Spark property, namely,
job.output.dir by using:

SparkConf sparkConf = new SparkConf();
final String validatedJSONoutputDir = sparkConf.get("job.output.dir");

But it gives me an exception:

Exception in thread "main" java.util.NoSuchElementException:
job.output.dir

Is it not possible to mix Spark and non-Spark properties in a single
.properties file, then pass it via --properties-file and then get the
values of those non-Spark properties via SparkConf?

Or is there another object / method to retrieve the values for those
non-Spark properties?


-- 
Emre Sevinç


Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Sean Owen
Since SparkConf is only for Spark properties, I think it will in
general only pay attention to and preserve "spark.*" properties. You
could experiment with that. In general I wouldn't rely on Spark
mechanisms for your configuration, and you can use any config
mechanism you like to retain your own properties.

On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc  wrote:
> Hello,
>
> I'm using Spark 1.2.1 and have a module.properties file, and in it I have
> non-Spark properties, as well as Spark properties, e.g.:
>
>job.output.dir=file:///home/emre/data/mymodule/out
>
> I'm trying to pass it to spark-submit via:
>
>spark-submit --class com.myModule --master local[4] --deploy-mode client
> --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar
>
> And I thought I could read the value of my non-Spark property, namely,
> job.output.dir by using:
>
> SparkConf sparkConf = new SparkConf();
> final String validatedJSONoutputDir = sparkConf.get("job.output.dir");
>
> But it gives me an exception:
>
> Exception in thread "main" java.util.NoSuchElementException:
> job.output.dir
>
> Is it not possible to mix Spark and non-Spark properties in a single
> .properties file, then pass it via --properties-file and then get the values
> of those non-Spark properties via SparkConf?
>
> Or is there another object / method to retrieve the values for those
> non-Spark properties?
>
>
> --
> Emre Sevinç

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Emre Sevinc
Sean,

I'm trying this as an alternative to what I currently do. Currently I have
my module.properties file for my module in the resources directory, and
that file is put inside the über JAR file when I build my application with
Maven, and then when I submit it using spark-submit, I can read that
module.properties file via the traditional method:


properties.load(MyModule.class.getClassLoader().getResourceAsStream("module.properties"));

and everything works fine. The disadvantage is that in order to make any
changes to that .properties file effective, I have to re-build my
application. Therefore I'm trying to find a way to be able to send that
module.properties file via spark-submit and read the values in iy, so that
I will not be forced to build my application every time I want to make a
change in the module.properties file.

I've also checked the "--files" option of spark-submit, but I see that it
is for sending the listed files to executors (correct me if I'm wrong),
what I'm after is being able to pass dynamic properties (key/value pairs)
to the Driver program of my Spark application. And I still could not find
out how to do that.

--
Emre





On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen  wrote:

> Since SparkConf is only for Spark properties, I think it will in
> general only pay attention to and preserve "spark.*" properties. You
> could experiment with that. In general I wouldn't rely on Spark
> mechanisms for your configuration, and you can use any config
> mechanism you like to retain your own properties.
>
> On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc 
> wrote:
> > Hello,
> >
> > I'm using Spark 1.2.1 and have a module.properties file, and in it I have
> > non-Spark properties, as well as Spark properties, e.g.:
> >
> >job.output.dir=file:///home/emre/data/mymodule/out
> >
> > I'm trying to pass it to spark-submit via:
> >
> >spark-submit --class com.myModule --master local[4] --deploy-mode
> client
> > --verbose --properties-file /home/emre/data/mymodule.properties
> mymodule.jar
> >
> > And I thought I could read the value of my non-Spark property, namely,
> > job.output.dir by using:
> >
> > SparkConf sparkConf = new SparkConf();
> > final String validatedJSONoutputDir =
> sparkConf.get("job.output.dir");
> >
> > But it gives me an exception:
> >
> > Exception in thread "main" java.util.NoSuchElementException:
> > job.output.dir
> >
> > Is it not possible to mix Spark and non-Spark properties in a single
> > .properties file, then pass it via --properties-file and then get the
> values
> > of those non-Spark properties via SparkConf?
> >
> > Or is there another object / method to retrieve the values for those
> > non-Spark properties?
> >
> >
> > --
> > Emre Sevinç
>



-- 
Emre Sevinc


Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Sean Owen
How about system properties? or something like Typesafe Config which
lets you at least override something in a built-in config file on the
command line, with props or other files.

On Mon, Feb 16, 2015 at 3:38 PM, Emre Sevinc  wrote:
> Sean,
>
> I'm trying this as an alternative to what I currently do. Currently I have
> my module.properties file for my module in the resources directory, and that
> file is put inside the über JAR file when I build my application with Maven,
> and then when I submit it using spark-submit, I can read that
> module.properties file via the traditional method:
>
>
> properties.load(MyModule.class.getClassLoader().getResourceAsStream("module.properties"));
>
> and everything works fine. The disadvantage is that in order to make any
> changes to that .properties file effective, I have to re-build my
> application. Therefore I'm trying to find a way to be able to send that
> module.properties file via spark-submit and read the values in iy, so that I
> will not be forced to build my application every time I want to make a
> change in the module.properties file.
>
> I've also checked the "--files" option of spark-submit, but I see that it is
> for sending the listed files to executors (correct me if I'm wrong), what
> I'm after is being able to pass dynamic properties (key/value pairs) to the
> Driver program of my Spark application. And I still could not find out how
> to do that.
>
> --
> Emre
>
>
>
>
>
> On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen  wrote:
>>
>> Since SparkConf is only for Spark properties, I think it will in
>> general only pay attention to and preserve "spark.*" properties. You
>> could experiment with that. In general I wouldn't rely on Spark
>> mechanisms for your configuration, and you can use any config
>> mechanism you like to retain your own properties.
>>
>> On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc 
>> wrote:
>> > Hello,
>> >
>> > I'm using Spark 1.2.1 and have a module.properties file, and in it I
>> > have
>> > non-Spark properties, as well as Spark properties, e.g.:
>> >
>> >job.output.dir=file:///home/emre/data/mymodule/out
>> >
>> > I'm trying to pass it to spark-submit via:
>> >
>> >spark-submit --class com.myModule --master local[4] --deploy-mode
>> > client
>> > --verbose --properties-file /home/emre/data/mymodule.properties
>> > mymodule.jar
>> >
>> > And I thought I could read the value of my non-Spark property, namely,
>> > job.output.dir by using:
>> >
>> > SparkConf sparkConf = new SparkConf();
>> > final String validatedJSONoutputDir =
>> > sparkConf.get("job.output.dir");
>> >
>> > But it gives me an exception:
>> >
>> > Exception in thread "main" java.util.NoSuchElementException:
>> > job.output.dir
>> >
>> > Is it not possible to mix Spark and non-Spark properties in a single
>> > .properties file, then pass it via --properties-file and then get the
>> > values
>> > of those non-Spark properties via SparkConf?
>> >
>> > Or is there another object / method to retrieve the values for those
>> > non-Spark properties?
>> >
>> >
>> > --
>> > Emre Sevinç
>
>
>
>
> --
> Emre Sevinc

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Charles Feduke
I haven't actually tried mixing non-Spark settings into the Spark
properties. Instead I package my properties into the jar and use the
Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
specific) to get at my properties:

Properties file: src/main/resources/integration.conf

(below $ENV might be set to either "integration" or "prod"[3])

ssh -t root@$HOST "/root/spark/bin/spark-shell --jars /root/$JAR_NAME \
--conf 'config.resource=$ENV.conf' \
--conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'"

Since the properties file is packaged up with the JAR I don't have to worry
about sending the file separately to all of the slave nodes. Typesafe
Config is written in Java so it will work if you're not using Scala. (The
Typesafe Config also has the advantage of being extremely easy to integrate
with code that is using Java Properties today.)

If you instead want to send the file separately from the JAR and you use
the Typesafe Config library, you can specify "config.file" instead of
".resource"; though I'd point you to [3] below if you want to make your
development life easier.

1. https://github.com/typesafehub/config
2. https://github.com/ceedubs/ficus
3.
http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/



On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc  wrote:

> Hello,
>
> I'm using Spark 1.2.1 and have a module.properties file, and in it I have
> non-Spark properties, as well as Spark properties, e.g.:
>
>job.output.dir=file:///home/emre/data/mymodule/out
>
> I'm trying to pass it to spark-submit via:
>
>spark-submit --class com.myModule --master local[4] --deploy-mode
> client --verbose --properties-file /home/emre/data/mymodule.properties
> mymodule.jar
>
> And I thought I could read the value of my non-Spark property, namely,
> job.output.dir by using:
>
> SparkConf sparkConf = new SparkConf();
> final String validatedJSONoutputDir = sparkConf.get("job.output.dir");
>
> But it gives me an exception:
>
> Exception in thread "main" java.util.NoSuchElementException:
> job.output.dir
>
> Is it not possible to mix Spark and non-Spark properties in a single
> .properties file, then pass it via --properties-file and then get the
> values of those non-Spark properties via SparkConf?
>
> Or is there another object / method to retrieve the values for those
> non-Spark properties?
>
>
> --
> Emre Sevinç
>


Use of nscala-time within spark-shell

2015-02-16 Thread Hammam CHAMSI
Hi All,


Thanks in advance for your help. I have timestamp which I need 
to convert to datetime using scala. A folder contains the three needed 
jar files: "joda-convert-1.5.jar  joda-time-2.4.jar 
 nscala-time_2.11-1.8.0.jar"

Using scala REPL and adding the jars: scala -classpath "*.jar"

I can use nscala-time like following:


scala> import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala> import org.joda._

import org.joda._


scala> DateTime.now

res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00


But when i try to use spark-shell:

ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
 /usr/local/spark/bin/spark-shell --master local --driver-memory 2g 
--executor-memory 2g --executor-cores 1


It successfully imports the jars:


scala> import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala> import org.joda._

import org.joda._


but fails using them

scala> DateTime.now

java.lang.NoSuchMethodError: 
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

at 
com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)

at 
com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)

at 
com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)

at com.github.nscala_time.time.Imports$.(Imports.scala:20)

at com.github.nscala_time.time.Imports$.(Imports.scala)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:17)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:22)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24)

at $iwC$$iwC$$iwC$$iwC$$iwC.(:26)

at $iwC$$iwC$$iwC$$iwC.(:28)

at $iwC$$iwC$$iwC.(:30)

at $iwC$$iwC.(:32)

at $iwC.(:34)

at (:36)

at .(:40)

at .()

at .(:7)

at .()

at $print()

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)

at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)

at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)

at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)

at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)

at org.apache.spark.repl.Main$.main(Main.scala:31)

at org.apache.spark.repl.Main.main(Main.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Your help is very aappreciated,


Regards,


Hammam




  

How to retreive the value from sql.row by column name

2015-02-16 Thread Eric Bell
Is it possible to reference a column from a SchemaRDD using the column's 
name instead of its number?


For example, let's say I've created a SchemaRDD from an avro file:

val sqlContext = new SQLContext(sc)
import sqlContext._
val caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper")
caper.registerTempTable("caper")

scala> caper
res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at 
SchemaRDD.scala:108

== Query Plan ==
== Physical Plan ==
PhysicalRDD 
[ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...

scala>

Now I want to access fields, and of course the normal thing to do is to 
use a field name, not a field number.


scala> val kv = caper.map(r => (r.ran_id, r))
:23: error: value ran_id is not a member of 
org.apache.spark.sql.Row

   val kv = caper.map(r => (r.ran_id, r))

How do I do this?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Corey Nolet
We've been using commons configuration to pull our properties out of
properties files and system properties (prioritizing system properties over
others) and we add those properties to our spark conf explicitly and we use
ArgoPartser to get the command line argument for which property file to
load. We also implicitly added an extra parse args method to our SparkConf.
In our main method, we do something like this:

val sparkConf = SparkConfFactory.newSparkConf.parseModuleArts(args)
val sparkContext = new SparkContext(sparkConf)

Now all of our externally parsed properties are in the same spark conf so
we can pull them off anywhere in the program that has access to an
rdd/sparkcontext or the spark conf directly.

On Mon, Feb 16, 2015 at 10:42 AM, Sean Owen  wrote:

> How about system properties? or something like Typesafe Config which
> lets you at least override something in a built-in config file on the
> command line, with props or other files.
>
> On Mon, Feb 16, 2015 at 3:38 PM, Emre Sevinc 
> wrote:
> > Sean,
> >
> > I'm trying this as an alternative to what I currently do. Currently I
> have
> > my module.properties file for my module in the resources directory, and
> that
> > file is put inside the über JAR file when I build my application with
> Maven,
> > and then when I submit it using spark-submit, I can read that
> > module.properties file via the traditional method:
> >
> >
> >
> properties.load(MyModule.class.getClassLoader().getResourceAsStream("module.properties"));
> >
> > and everything works fine. The disadvantage is that in order to make any
> > changes to that .properties file effective, I have to re-build my
> > application. Therefore I'm trying to find a way to be able to send that
> > module.properties file via spark-submit and read the values in iy, so
> that I
> > will not be forced to build my application every time I want to make a
> > change in the module.properties file.
> >
> > I've also checked the "--files" option of spark-submit, but I see that
> it is
> > for sending the listed files to executors (correct me if I'm wrong), what
> > I'm after is being able to pass dynamic properties (key/value pairs) to
> the
> > Driver program of my Spark application. And I still could not find out
> how
> > to do that.
> >
> > --
> > Emre
> >
> >
> >
> >
> >
> > On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen  wrote:
> >>
> >> Since SparkConf is only for Spark properties, I think it will in
> >> general only pay attention to and preserve "spark.*" properties. You
> >> could experiment with that. In general I wouldn't rely on Spark
> >> mechanisms for your configuration, and you can use any config
> >> mechanism you like to retain your own properties.
> >>
> >> On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc 
> >> wrote:
> >> > Hello,
> >> >
> >> > I'm using Spark 1.2.1 and have a module.properties file, and in it I
> >> > have
> >> > non-Spark properties, as well as Spark properties, e.g.:
> >> >
> >> >job.output.dir=file:///home/emre/data/mymodule/out
> >> >
> >> > I'm trying to pass it to spark-submit via:
> >> >
> >> >spark-submit --class com.myModule --master local[4] --deploy-mode
> >> > client
> >> > --verbose --properties-file /home/emre/data/mymodule.properties
> >> > mymodule.jar
> >> >
> >> > And I thought I could read the value of my non-Spark property, namely,
> >> > job.output.dir by using:
> >> >
> >> > SparkConf sparkConf = new SparkConf();
> >> > final String validatedJSONoutputDir =
> >> > sparkConf.get("job.output.dir");
> >> >
> >> > But it gives me an exception:
> >> >
> >> > Exception in thread "main" java.util.NoSuchElementException:
> >> > job.output.dir
> >> >
> >> > Is it not possible to mix Spark and non-Spark properties in a single
> >> > .properties file, then pass it via --properties-file and then get the
> >> > values
> >> > of those non-Spark properties via SparkConf?
> >> >
> >> > Or is there another object / method to retrieve the values for those
> >> > non-Spark properties?
> >> >
> >> >
> >> > --
> >> > Emre Sevinç
> >
> >
> >
> >
> > --
> > Emre Sevinc
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming and SQL checkpoint error: (java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf)

2015-02-16 Thread Michael Armbrust
You probably want to mark the HiveContext as @transient as its not valid to
use it on the slaves anyway.

On Mon, Feb 16, 2015 at 1:58 AM, Haopu Wang  wrote:

>  I have a streaming application which registered temp table on a
> HiveContext for each batch duration.
>
> The application runs well in Spark 1.1.0. But I get below error from 1.1.1.
>
> Do you have any suggestions to resolve it? Thank you!
>
>
>
> *java.io.NotSerializableException*: org.apache.hadoop.hive.conf.HiveConf
>
> - field (class "scala.Tuple2", name: "_1", type: "class
> java.lang.Object")
>
> - object (class "scala.Tuple2", (Configuration: core-default.xml,
> core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
> yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
> org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23
> ,org.apache.hadoop.hive.ql.session.SessionState@49b6eef9))
>
> - field (class "org.apache.spark.sql.hive.HiveContext", name: "x$3",
> type: "class scala.Tuple2")
>
> - object (class "org.apache.spark.sql.hive.HiveContext",
> org.apache.spark.sql.hive.HiveContext@4e6e66a4)
>
> - field (class
> "com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$registerTempTable$2",
> name: "sqlContext$1", type: "class org.apache.spark.sql.SQLContext")
>
>- object (class
> "com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$registerTempTable$2",
> )
>
> - field (class
> "org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1", name:
> "foreachFunc$1", type: "interface scala.Function1")
>
> - object (class
> "org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1",
> )
>
> - field (class "org.apache.spark.streaming.dstream.ForEachDStream",
> name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
> type: "interface scala.Function2")
>
> - object (class "org.apache.spark.streaming.dstream.ForEachDStream",
> org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)
>
> - element of array (index: 0)
>
> - array (class "[Ljava.lang.Object;", size: 16)
>
> - field (class "scala.collection.mutable.ArrayBuffer", name: "array",
> type: "class [Ljava.lang.Object;")
>
> - object (class "scala.collection.mutable.ArrayBuffer",
> ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20))
>
> - field (class "org.apache.spark.streaming.DStreamGraph", name:
> "outputStreams", type: "class scala.collection.mutable.ArrayBuffer")
>
> - custom writeObject data (class
> "org.apache.spark.streaming.DStreamGraph")
>
> - object (class "org.apache.spark.streaming.DStreamGraph",
> org.apache.spark.streaming.DStreamGraph@776ae7da)
>
> - field (class "org.apache.spark.streaming.Checkpoint", name: "graph",
> type: "class org.apache.spark.streaming.DStreamGraph")
>
> - root object (class "org.apache.spark.streaming.Checkpoint",
> org.apache.spark.streaming.Checkpoint@5eade065)
>
> at java.io.ObjectOutputStream.writeObject0(Unknown Source)
>
>
>


Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Michael Armbrust
For efficiency the row objects don't contain the schema so you can't get
the column by name directly.  I usually do a select followed by pattern
matching. Something like the following:

caper.select('ran_id).map { case Row(ranId: String) => }

On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell  wrote:

> Is it possible to reference a column from a SchemaRDD using the column's
> name instead of its number?
>
> For example, let's say I've created a SchemaRDD from an avro file:
>
> val sqlContext = new SQLContext(sc)
> import sqlContext._
> val caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper")
> caper.registerTempTable("caper")
>
> scala> caper
> res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
> SchemaRDD.scala:108
> == Query Plan ==
> == Physical Plan ==
> PhysicalRDD [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,
> APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#
> 11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,
> CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,
> CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,
> CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,
> CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#
> 38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,
> CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,
> CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,
> CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,
> CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
> scala>
>
> Now I want to access fields, and of course the normal thing to do is to
> use a field name, not a field number.
>
> scala> val kv = caper.map(r => (r.ran_id, r))
> :23: error: value ran_id is not a member of
> org.apache.spark.sql.Row
>val kv = caper.map(r => (r.ran_id, r))
>
> How do I do this?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Eric Bell
I am just learning scala so I don't actually understand what your code 
snippet is doing but thank you, I will learn more so I can figure it out.


I am new to all of this and still trying to make the mental shift from 
normal programming to distributed programming, but it seems to me that 
the row object would know its own schema object that it came from and be 
able to ask its schema to transform a name to a column number. Am I 
missing something or is this just a matter of time constraints and this 
one just hasn't gotten into the queue yet?


Baring that, do the schema classes provide methods for doing this? I've 
looked and didn't see anything.


I've just discovered that the python implementation for SchemaRDD does 
in fact allow for referencing by name and column. Why is this provided 
in the python implementation but not scala or java implementations?


Thanks,

--eric


On 02/16/2015 10:46 AM, Michael Armbrust wrote:
For efficiency the row objects don't contain the schema so you can't 
get the column by name directly.  I usually do a select followed by 
pattern matching. Something like the following:


caper.select('ran_id).map { case Row(ranId: String) => }

On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell > wrote:


Is it possible to reference a column from a SchemaRDD using the
column's name instead of its number?

For example, let's say I've created a SchemaRDD from an avro file:

val sqlContext = new SQLContext(sc)
import sqlContext._
val
caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper")
caper.registerTempTable("caper")

scala> caper
res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD

[ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
scala>

Now I want to access fields, and of course the normal thing to do
is to use a field name, not a field number.

scala> val kv = caper.map(r => (r.ran_id, r))
:23: error: value ran_id is not a member of
org.apache.spark.sql.Row
   val kv = caper.map(r => (r.ran_id, r))

How do I do this?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







Re: Array in broadcast can't be serialized

2015-02-16 Thread Ted Yu
Is it possible to port WrappedArraySerializer.scala to your app ?

Pardon me for not knowing how to integrate Chill with Spark.

Cheers

On Mon, Feb 16, 2015 at 12:31 AM, Tao Xiao  wrote:

> Thanks Ted
>
> After searching for a whole day, I still don't know how to let spark use
> twitter chill serialization - there are very few documents about how to
> integrate twitter chill into Spark for serialization. I tried the
> following, but an exception of "java.lang.ClassCastException:
> com.twitter.chill.WrappedArraySerializer cannot be cast to
> org.apache.spark.serializer.Serializer" was thrown:
>
> val conf = new SparkConf()
>.setAppName("Test Serialization")
>.set("spark.serializer",
> "com.twitter.chill.WrappedArraySerializer")
>
>
> Well, what is the correct way of configuring Spark to use the twitter
> chill serialization framework ?
>
>
>
>
>
>
>
> 2015-02-15 22:23 GMT+08:00 Ted Yu :
>
>> I was looking at https://github.com/twitter/chill
>>
>> It seems this would achieve what you want:
>> chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala
>>
>> Cheers
>>
>> On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao 
>> wrote:
>>
>>> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
>>> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
>>> serialized even when I registered both of them in Kryo.
>>>
>>> The code is as follows:
>>>
>>>val conf = new SparkConf()
>>> .setAppName("Hello Spark")
>>> .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>> .set("spark.kryo.registrator", "xt.MyKryoRegistrator")
>>>
>>> val sc = new SparkContext(conf)
>>>
>>> val rdd = sc.parallelize(List(
>>> (new ImmutableBytesWritable(Bytes.toBytes("AAA")),
>>> new KeyValue()),
>>> (new ImmutableBytesWritable(Bytes.toBytes("BBB")),
>>> new KeyValue()),
>>> (new ImmutableBytesWritable(Bytes.toBytes("CCC")),
>>> new KeyValue()),
>>> (new ImmutableBytesWritable(Bytes.toBytes("DDD")),
>>> new KeyValue())), 4)
>>>
>>> // snippet 1:  a single object of *ImmutableBytesWritable* can
>>> be serialized in broadcast
>>> val partitioner = new SingleElementPartitioner(sc.broadcast(new
>>> ImmutableBytesWritable(Bytes.toBytes(3
>>> val ret = rdd.aggregateByKey(List[KeyValue](),
>>> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
>>> println("\n\n\ret.count = " + ret.count + ",  partition size = "
>>> + ret.partitions.size)
>>>
>>> // snippet 2: an array of *ImmutableBytesWritable* can not be
>>> serialized in broadcast
>>> val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)),
>>> new ImmutableBytesWritable(Bytes.toBytes(2)), new
>>> ImmutableBytesWritable(Bytes.toBytes(3)))
>>> val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
>>> val ret1 = rdd.aggregateByKey(List[KeyValue](),
>>> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
>>> println("\n\n\nrdd2.count = " + ret1.count)
>>>
>>> sc.stop
>>>
>>>
>>>   // the following are kryo registrator and partitioners
>>>class MyKryoRegistrator extends KryoRegistrator {
>>> override def registerClasses(kryo: Kryo): Unit = {
>>>  kryo.register(classOf[ImmutableBytesWritable])   //
>>> register ImmutableBytesWritable
>>>  kryo.register(classOf[Array[ImmutableBytesWritable]])
>>>  // register Array[ImmutableBytesWritable]
>>> }
>>>}
>>>
>>>class SingleElementPartitioner(bc:
>>> Broadcast[ImmutableBytesWritable]) extends Partitioner {
>>> override def numPartitions: Int = 5
>>> def v = Bytes.toInt(bc.value.get)
>>> override def getPartition(key: Any): Int =  v - 1
>>>}
>>>
>>>
>>> class ArrayPartitioner(bc:
>>> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
>>> val arr = bc.value
>>> override def numPartitions: Int = arr.length
>>> override def getPartition(key: Any): Int =
>>> Bytes.toInt(arr(0).get)
>>> }
>>>
>>>
>>>
>>> In the code above, snippet 1 can work as expected. But snippet 2 throws
>>> "Task not serializable: java.io.NotSerializableException:
>>> org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .
>>>
>>>
>>> So do I have to implement a Kryo serializer for Array[T] if it is used
>>> in broadcast ?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>
>


Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Michael Armbrust
I can unpack the code snippet a bit:

caper.select('ran_id) is the same as saying "SELECT ran_id FROM table" in
SQL.  Its always a good idea to explicitly request the columns you need
right before using them.  That way you are tolerant of any changes to the
schema that might happen upstream.

The next part .map { case Row(ranId: String) => ... } is doing an
extraction to pull out the values of the row into typed variables.  This is
the same as doing .map(row => row(0).asInstanceOf[String]) or .map(row =>
row.getString(0)), but I find this syntax easier to read since it lines up
nicely with the select clause that comes right before it.  It's also less
verbose especially when pulling out a bunch of columns.

Regarding the differences between python and java/scala, part of this is
just due to the nature of these language.  Since java/scala are statically
typed, you will always have to explicitly say the type of the column you
are extracting (the bonus here is they are much faster than python due to
optimizations this strictness allows).  However, since its already a little
more verbose, we decided not to have the more expensive ability to look up
columns in a row by name, and instead go with a faster ordinal based API.
We could revisit this, but its not currently something we are planning to
change.

Michael

On Mon, Feb 16, 2015 at 11:04 AM, Eric Bell  wrote:

>  I am just learning scala so I don't actually understand what your code
> snippet is doing but thank you, I will learn more so I can figure it out.
>
> I am new to all of this and still trying to make the mental shift from
> normal programming to distributed programming, but it seems to me that the
> row object would know its own schema object that it came from and be able
> to ask its schema to transform a name to a column number. Am I missing
> something or is this just a matter of time constraints and this one just
> hasn't gotten into the queue yet?
>
> Baring that, do the schema classes provide methods for doing this? I've
> looked and didn't see anything.
>
> I've just discovered that the python implementation for SchemaRDD does in
> fact allow for referencing by name and column. Why is this provided in the
> python implementation but not scala or java implementations?
>
> Thanks,
>
> --eric
>
>
>
> On 02/16/2015 10:46 AM, Michael Armbrust wrote:
>
> For efficiency the row objects don't contain the schema so you can't get
> the column by name directly.  I usually do a select followed by pattern
> matching. Something like the following:
>
>  caper.select('ran_id).map { case Row(ranId: String) => }
>
> On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell  wrote:
>
>> Is it possible to reference a column from a SchemaRDD using the column's
>> name instead of its number?
>>
>> For example, let's say I've created a SchemaRDD from an avro file:
>>
>> val sqlContext = new SQLContext(sc)
>> import sqlContext._
>> val caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper")
>> caper.registerTempTable("caper")
>>
>> scala> caper
>> res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
>> SchemaRDD.scala:108
>> == Query Plan ==
>> == Physical Plan ==
>> PhysicalRDD
>> [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
>> scala>
>>
>> Now I want to access fields, and of course the normal thing to do is to
>> use a field name, not a field number.
>>
>> scala> val kv = caper.map(r => (r.ran_id, r))
>> :23: error: value ran_id is not a member of
>> org.apache.spark.sql.Row
>>val kv = caper.map(r => (r.ran_id, r))
>>
>> How do I do this?
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Tathagata Das
Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do
automatic cleanup of files based on which RDDs are used/garbage collected
by JVM. That would be the best way, but depends on the JVM GC
characteristics. If you force a GC periodically in the driver that might
help you get rid of files in the workers that are not needed.

TD

On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi 
wrote:

> spark.cleaner.ttl is not the right way - seems to be really designed for
> streaming. although it keeps the disk usage under control it also causes
> loss of rdds and broadcasts that are required later leading to crash.
>
> is there any other way?
> thanks,
> Antony.
>
>
>   On Sunday, 15 February 2015, 21:42, Antony Mayi 
> wrote:
>
>
>
> spark.cleaner.ttl ?
>
>
>   On Sunday, 15 February 2015, 18:23, Antony Mayi 
> wrote:
>
>
>
> Hi,
>
> I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using
> about 3 billions of ratings and I am doing several trainImplicit() runs in
> loop within one spark session. I have four node cluster with 3TB disk space
> on each. before starting the job there is less then 8% of the disk space
> used. while the ALS is running I can see the disk usage rapidly growing
> mainly because of files being stored
> under 
> yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA.
> after about 10 hours the disk usage hits 90% and yarn kills the particular
> containers.
>
> am I missing doing some cleanup somewhere while looping over the several
> trainImplicit() calls? taking 4*3TB of disk space seems immense.
>
> thanks for any help,
> Antony.
>
>
>
>
>
>


Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Hello all,

Trying the example code from this package (
https://github.com/Parsely/pyspark-cassandra) , I always get this error...

Can you see what I am doing wrong? from googling arounf it seems to be that
the jar is not found somehow...  The spark log shows the JAR was processed
at least.

Thank you so much.

am using spark-1.2.1-bin-hadoop2.4.tgz

test2.py is simply:

from pyspark.context import SparkConf
from pyspark_cassandra import CassandraSparkContext, saveToCassandra
conf = SparkConf().setAppName("PySpark Cassandra Sample Driver")
conf.set("spark.cassandra.connection.host", "devzero")
sc = CassandraSparkContext(conf=conf)

[root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash
-c "/spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py
--jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
/spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py"
...
15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
15/02/16 05:58:45 INFO Remoting: Starting remoting
15/02/16 05:58:45 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkDriver@devzero:38917]
15/02/16 05:58:45 INFO Utils: Successfully started service
'sparkDriver' on port 38917.
15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
/tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
server' on port 56642.
15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI'
on port 4040.
15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
15/02/16 05:58:46 INFO SparkContext: Added JAR
file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
timestamp 1424066326632
15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
/tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py
at http://10.212.55.42:56642/files/test2.py with timestamp
1424066326633
15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
/tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
15/02/16 05:58:46 INFO SparkContext: Added file
file:/spark/pyspark_cassandra.py at
http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
1424066326642
15/02/16 05:58:46 INFO Executor: Starting executor ID  on host localhost
15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895
15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager
15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block
manager localhost:32895 with 265.4 MB RAM, BlockManagerId(,
localhost, 32895)
15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at http://devzero:4040
15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler
15/02/16 05:58:48 INFO MapOutputTrackerMasterActor:
MapOutputTrackerActor stopped!
15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared
15/02/16 05:58:48 INFO BlockManager: BlockManager stopped
15/02/16 05:58:48 INFO BlockManagerMaster: BlockManagerMaster stopped
15/02/16 05:58:48 INFO SparkContext: Successfully stopped SparkContext
15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator:
Shutting down remote daemon.
15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator:
Remote daemon shut down; proceeding with flushing remote transports.
15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator:
Remoting shut down.
Traceback (most recent call last):
  File "/spark/test2.py", line 5, in 
sc = CassandraSparkContext(conf=conf)
  File "/spark/python/pyspark/context.py", line 105, in __init__
conf, jsc)
  File "/spark/pyspark_cassandra.py", line 17, in _do_init
self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
  File "/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.


Spark newbie desires feedback on first program

2015-02-16 Thread Eric Bell
I'm a spark newbie working on his first attempt to do write an ETL 
program. I could use some feedback to make sure I'm on the right path. 
I've written a basic proof of concept that runs without errors and seems 
to work, although I might be missing some issues when this is actually 
run on more than a single node.


I am working with data about people (actually healthcare patients). I 
have an RDD that contains multiple rows per person. My overall goal is 
to create a single Person object for each person in my data. In this 
example, I am serializing to JSON, mostly because this is what I know 
how to do at the moment.


Other than general feedback, is my use of the groupByKey() and 
mapValues() methods appropriate?


Thanks!


import json

class Person:
def __init__(self):
self.mydata={}
self.cpts = []
self.mydata['cpt']=self.cpts
def addRowData(self, dataRow):
# Get the CPT codes
cpt = dataRow.CPT_1
if cpt:
self.cpts.append(cpt)
def serializeToJSON(self):
return json.dumps(self.mydata)

def makeAPerson(rows):
person = Person()
for row in rows:
person.addRowData(row)
return person.serializeToJSON()

peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows: 
makeAPerson(personDataRows))

peopleRDD.saveAsTextFile("hdfs://localhost:9000/sma/processJSON/people")


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Davies Liu
It seems that the jar for cassandra is not loaded, you should have
them in the classpath.

On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
 wrote:
> Hello all,
>
> Trying the example code from this package
> (https://github.com/Parsely/pyspark-cassandra) , I always get this error...
>
> Can you see what I am doing wrong? from googling arounf it seems to be that
> the jar is not found somehow...  The spark log shows the JAR was processed
> at least.
>
> Thank you so much.
>
> am using spark-1.2.1-bin-hadoop2.4.tgz
>
> test2.py is simply:
>
> from pyspark.context import SparkConf
> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
> conf = SparkConf().setAppName("PySpark Cassandra Sample Driver")
> conf.set("spark.cassandra.connection.host", "devzero")
> sc = CassandraSparkContext(conf=conf)
>
> [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash -c
> "/spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
> /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
> /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py"
> ...
> 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
> 15/02/16 05:58:45 INFO Remoting: Starting remoting
> 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@devzero:38917]
> 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver' on
> port 38917.
> 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
> 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
> 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
> /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
> 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4
> MB
> 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
> 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
> 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
> server' on port 56642.
> 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on port
> 4040.
> 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
> 15/02/16 05:58:46 INFO SparkContext: Added JAR
> file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
> http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
> timestamp 1424066326632
> 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
> /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
> 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at
> http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
> 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
> /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
> 15/02/16 05:58:46 INFO SparkContext: Added file
> file:/spark/pyspark_cassandra.py at
> http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
> 1424066326642
> 15/02/16 05:58:46 INFO Executor: Starting executor ID  on host
> localhost
> 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
> akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
> 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895
> 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager
> 15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager
> localhost:32895 with 265.4 MB RAM, BlockManagerId(, localhost,
> 32895)
> 15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
> 15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at http://devzero:4040
> 15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler
> 15/02/16 05:58:48 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
> stopped!
> 15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared
> 15/02/16 05:58:48 INFO BlockManager: BlockManager stopped
> 15/02/16 05:58:48 INFO BlockManagerMaster: BlockManagerMaster stopped
> 15/02/16 05:58:48 INFO SparkContext: Successfully stopped SparkContext
> 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
> down remote daemon.
> 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remote
> daemon shut down; proceeding with flushing remote transports.
> 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
> shut down.
> Traceback (most recent call last):
>   File "/spark/test2.py", line 5, in 
> sc = CassandraSparkContext(conf=conf)
>   File "/spark/python/pyspark/context.py", line 105, in __init__
> conf, jsc)
>   File "/spark/pyspark_cassandra.py", line 17, in _do_init
> self._jcsc = self._jvm.CassandraJavaUtil

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Yes, am sure the system cant find the jar.. but how do I fix that... my
submit command includes the jar:

/spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
/spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
/spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py

and the spark output seems to indicate it is handling it:

15/02/16 05:58:46 INFO SparkContext: Added JAR
file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
timestamp 1424066326632


I don't really know what else I could try any suggestions highly
appreciated.

Thanks,
Mohamed.


On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu  wrote:

> It seems that the jar for cassandra is not loaded, you should have
> them in the classpath.
>
> On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
>  wrote:
> > Hello all,
> >
> > Trying the example code from this package
> > (https://github.com/Parsely/pyspark-cassandra) , I always get this
> error...
> >
> > Can you see what I am doing wrong? from googling arounf it seems to be
> that
> > the jar is not found somehow...  The spark log shows the JAR was
> processed
> > at least.
> >
> > Thank you so much.
> >
> > am using spark-1.2.1-bin-hadoop2.4.tgz
> >
> > test2.py is simply:
> >
> > from pyspark.context import SparkConf
> > from pyspark_cassandra import CassandraSparkContext, saveToCassandra
> > conf = SparkConf().setAppName("PySpark Cassandra Sample Driver")
> > conf.set("spark.cassandra.connection.host", "devzero")
> > sc = CassandraSparkContext(conf=conf)
> >
> > [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash -c
> > "/spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py"
> > ...
> > 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
> > 15/02/16 05:58:45 INFO Remoting: Starting remoting
> > 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses
> > :[akka.tcp://sparkDriver@devzero:38917]
> > 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver'
> on
> > port 38917.
> > 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
> > 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
> > 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
> >
> /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
> > 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity
> 265.4
> > MB
> > 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
> > library for your platform... using builtin-java classes where applicable
> > 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
> >
> /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
> > 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
> > 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
> > server' on port 56642.
> > 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on
> port
> > 4040.
> > 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
> > 15/02/16 05:58:46 INFO SparkContext: Added JAR
> > file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
> > http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
> > timestamp 1424066326632
> > 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
> >
> /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
> > 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at
> > http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
> > 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
> >
> /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
> > 15/02/16 05:58:46 INFO SparkContext: Added file
> > file:/spark/pyspark_cassandra.py at
> > http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
> > 1424066326642
> > 15/02/16 05:58:46 INFO Executor: Starting executor ID  on host
> > localhost
> > 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
> > akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
> > 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895
> > 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register
> BlockManager
> > 15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager
> > localhost:32895 with 265.4 MB RAM, BlockManagerId(, localhost,
> > 32895)
> > 15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
> > 15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at
> http://devzero:4040
> > 15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler
> > 15/02/16 05:58:48 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
> > stopped!
> > 15/02/16 

Re: Spark newbie desires feedback on first program

2015-02-16 Thread Charles Feduke
I cannot comment about the correctness of Python code. I will assume your
caper_kv is keyed on something that uniquely identifies all the rows that
make up the person's record so your group by key makes sense, as does the
map. (I will also assume all of the rows that comprise a single person's
record will always fit in memory. If not you will need another approach.)

You should be able to get away with removing the "localhost:9000" from your
HDFS URL, i.e., "hdfs:///sma/processJSON/people" and let your HDFS
configuration for Spark supply the missing pieces.

On Mon Feb 16 2015 at 3:38:31 PM Eric Bell  wrote:

> I'm a spark newbie working on his first attempt to do write an ETL
> program. I could use some feedback to make sure I'm on the right path.
> I've written a basic proof of concept that runs without errors and seems
> to work, although I might be missing some issues when this is actually
> run on more than a single node.
>
> I am working with data about people (actually healthcare patients). I
> have an RDD that contains multiple rows per person. My overall goal is
> to create a single Person object for each person in my data. In this
> example, I am serializing to JSON, mostly because this is what I know
> how to do at the moment.
>
> Other than general feedback, is my use of the groupByKey() and
> mapValues() methods appropriate?
>
> Thanks!
>
>
> import json
>
> class Person:
>  def __init__(self):
>  self.mydata={}
>  self.cpts = []
>  self.mydata['cpt']=self.cpts
>  def addRowData(self, dataRow):
>  # Get the CPT codes
>  cpt = dataRow.CPT_1
>  if cpt:
>  self.cpts.append(cpt)
>  def serializeToJSON(self):
>  return json.dumps(self.mydata)
>
> def makeAPerson(rows):
>  person = Person()
>  for row in rows:
>  person.addRowData(row)
>  return person.serializeToJSON()
>
> peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows:
> makeAPerson(personDataRows))
> peopleRDD.saveAsTextFile("hdfs://localhost:9000/sma/processJSON/people")
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark newbie desires feedback on first program

2015-02-16 Thread Eric Bell
Thanks Charles. I just realized a few minutes ago that I neglected to 
show the step where I generated the key on the person ID. Thanks for the 
pointer on the HDFS URL.


Next step is to process data from multiple RDDS. My data originates from 
7 tables in a MySQL database. I used sqoop to create avro files from 
these tables, and in turn created RDDs using SparkSQL from the avro 
files. Since the groupByKey only operates on a single RDD, I'm not quite 
sure yet how I'm going to process 7 tables as a transformation to get 
all the data I need into my objects.


I'm vascillating on whether I should be doing it this way, or if it 
would be a lot simpler to query MySQL to get all the Person IDs, 
parallelize them, and have my Person class make queries directly to the 
MySQL database. Since in theory I only have to do this once, I'm not 
sure there's much to be gained in moving the data from MySQL to Spark first.


I have yet to find any non-trivial examples of ETL logic on the web ... 
it seems like it's mostly word count map-reduce replacements.


On 02/16/2015 01:32 PM, Charles Feduke wrote:
I cannot comment about the correctness of Python code. I will assume 
your caper_kv is keyed on something that uniquely identifies all the 
rows that make up the person's record so your group by key makes 
sense, as does the map. (I will also assume all of the rows that 
comprise a single person's record will always fit in memory. If not 
you will need another approach.)


You should be able to get away with removing the "localhost:9000" from 
your HDFS URL, i.e., "hdfs:///sma/processJSON/people" and let your 
HDFS configuration for Spark supply the missing pieces.





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to broadcast dimension tables with Spark SQL

2015-02-16 Thread Sunita Arvind
Hi Experts,

I have a large table with 54 million records (fact table), being joined
with 6 small tables (dimension tables). The size on disk of small tables is
within 5k and the record count is in the range of 4 - 200
All the worker nodes have RAM of 32GB allocated for spark. I have tried the
below approaches and looks like the small tables are not being broadcast,
which is causing timeouts as expected and failure of the job.
The reason for this, AFAIK is, the small table is also getting shuffled and
is fitting into a single node's partition. Then the large table is made to
flow to the same node which stays busy while all other nodes are idle.

Note: The spark version in use on cluster as well as my local setup is
1.1.0. I also tried with Spark 1.2.0 in the local setup, however the
queryPlan showed no change.

1. Broadcast the RDD before registering as table:
 val k = sqlContext.parquetFile(p.fileName)
 val t = sc.broadcast(k)
 t.value.registerTempTable(p.tableName)

2. Set the variable
 sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold","1")


3. Added limit to each small table before registering as table. I guess
this gives optimizer a way compute statistics and determine that the other
table is small enough for broadcast:
   sqlContext.sql("select * from a_nolim limit 7").registerTempTable("edu")

  also tried DSL style:

 a.limit(7).registerTempTable("edu")

   Tried explicit broadcasting of the tables as below:

   sc.broadcast(sqlContext.sql("select * from edu_nolim limit
7")).value.registerTempTable("edu")

   and tried dsl style with broadcast done on the rdd as well

4. Used DSL style of join:
   val try2 = a1.join(cdemo,LeftOuter,Some("dem.key1".attr ===
"ed.key1".attr ))

5. Ran the below commad in hive for all small tables:
   ANALYZE TABLE  tableName COMPUTE STATISTICS noscan

   Please note, the application uses SQLContext and not hive context. Hence
I ran the compute statistics out of the application from hue -> hive
editor. I am assuming the statistics are available in the metastore,
however, not sure
if spark can fetch these statistics since I am not using hive context
within the application.

6. Not sure if these are valid flags, but tried with them set anyways:
  sqlContext.setConf("spark.sql.planner.dimensionJoin","true")
  sqlContext.setConf("spark.sql.planner.generatedDimensionJoin","true")
  sqlContext.setConf("multiWayJoin","true")
  sqlContext.setConf("turbo", "true")

7. Tried CacheTable for all small tables. This changes the query execution
to InMemoryRelation instead of ParquetTableScan, however, shuffle -
 Exchange (HashPartitioning [i1_education_cust_demo#29], 200) remains.

8. Reduced the shuffle partition number with this parameter -
sqlContext.setConf("spark.sql.shuffle.partitions","8"). But this did not
help.

With all these attempts, the small tables are still getting shuffled I
guess. Below are the queryExecutions printed on every attempt and they have
remained almost same on every attempt:

DSL Style execution plan(i.e.
rdd1.join(rdd2,LeftOuter,Some("rdd1.key".attr === "rdd2.key".attr))
-
DSL Style execution plan --> HashOuterJoin [education#18],
[i1_education_cust_demo#29], LeftOuter, None
 Exchange (HashPartitioning [education#18], 200)
  ParquetTableScan [education#18,education_desc#19], (ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_education,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []
 Exchange (HashPartitioning [i1_education_cust_demo#29], 200)
  ParquetTableScan
[customer_id_cust_demo#20,age_dt_cust_demo#21,gndr_cd_cust_demo#22,hh_income_cust_demo#23,marital_status_cust_demo#24,ethnicity_cust_demo#25,length_of_residence_cust_demo#26,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,i1_education_cust_demo#29],
(ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_cust_demo_dm_sample,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []


SQL Style execution plan (i.e sqlContext.sql("select a,b,c,d,e from t1 left
outer join t2 on t1.a = t2.a")
--
Project
[customer_id_cust_demo#20,i1_education_cust_demo#29,marital_status_cust_demo#24,hh_income_cust_demo#23,length_of_residence_cust_demo#26,ethnicity_cust_demo#25,gndr_cd_cust_demo#22,age_dt_cust_demo#21,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,education_desc#19]
 HashOuterJoin [i1_education_cust_demo#29], [education#18], LeftOuter, N

Re: Spark newbie desires feedback on first program

2015-02-16 Thread Charles Feduke
My first problem was somewhat similar to yours. You won't find a whole lot
of JDBC to Spark examples since I think a lot of the adoption for Spark is
from teams already experienced with Hadoop and already have an established
big data solution (so their data is already extracted from whatever
sources, e.g., log files, Hive, other M/R jobs). JDBC support is
somewhat... lacking.

Our application uses a 12 node PostgreSQL distributed RDBMS that is sharded
at the application tier. I had to write my own JDBC RDD to support this
logical schema. However because you are coming from a single MySQL DB you
should be able to get away with using the JdbcRDD[1]... but I cannot find a
reference to it for the Python API so someone familiar with using Python
and Spark will have to chime in on that.

You need to consider _how_ the data gets from MySQL to the workers. It
might work to pull all of the data to a single node and then parallelize
that data across the cluster but its not going to be as efficient as range
querying from each worker in the cluster to the database. If you're working
with TBs of data then you will see very big benefits by distributing the
data across workers from the get go; if you don't it will take however long
it takes to copy all the data to a single worker and distribute as your
startup code for each execution. (By range querying what I mean is
basically what the JdbcRDD does - it forces you to include a conditional
statement like "id > ? AND id <= ?" in your SQL which it formats at each
worker so each worker only gets a piece of the pie). The JdbcRDD makes
assumptions about numeric keys for range querying.

The next thing to consider is if you're going against your production
database, will massive reads cause degradation for production users? I am
using read replicas to mitigate this for our production installation, as
copying TBs of data out of PostgreSQL would have some negative effect on
our users. Running your jobs during low traffic is obviously an option
here, as is restoring a read-only version from backup and explicitly
querying that instance (in which case parallelizing user IDs and querying
MySQL directly might get you near to the JdbcRDD behavior). And of course
if the MySQL instance is already your analytics solution then query on.

1.
https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/rdd/JdbcRDD.html

On Mon Feb 16 2015 at 4:42:30 PM Eric Bell  wrote:

> Thanks Charles. I just realized a few minutes ago that I neglected to
> show the step where I generated the key on the person ID. Thanks for the
> pointer on the HDFS URL.
>
> Next step is to process data from multiple RDDS. My data originates from
> 7 tables in a MySQL database. I used sqoop to create avro files from
> these tables, and in turn created RDDs using SparkSQL from the avro
> files. Since the groupByKey only operates on a single RDD, I'm not quite
> sure yet how I'm going to process 7 tables as a transformation to get
> all the data I need into my objects.
>
> I'm vascillating on whether I should be doing it this way, or if it
> would be a lot simpler to query MySQL to get all the Person IDs,
> parallelize them, and have my Person class make queries directly to the
> MySQL database. Since in theory I only have to do this once, I'm not
> sure there's much to be gained in moving the data from MySQL to Spark
> first.
>
> I have yet to find any non-trivial examples of ETL logic on the web ...
> it seems like it's mostly word count map-reduce replacements.
>
> On 02/16/2015 01:32 PM, Charles Feduke wrote:
> > I cannot comment about the correctness of Python code. I will assume
> > your caper_kv is keyed on something that uniquely identifies all the
> > rows that make up the person's record so your group by key makes
> > sense, as does the map. (I will also assume all of the rows that
> > comprise a single person's record will always fit in memory. If not
> > you will need another approach.)
> >
> > You should be able to get away with removing the "localhost:9000" from
> > your HDFS URL, i.e., "hdfs:///sma/processJSON/people" and let your
> > HDFS configuration for Spark supply the missing pieces.
> >
>
>


OOM error

2015-02-16 Thread Harshvardhan Chauhan
Hi All,


I need some help with Out Of Memory errors in my application. I am using
Spark 1.1.0 and my application is using Java API. I am running my app on
EC2  25 m3.xlarge (4 Cores 15GB Memory) instances. The app only fails
sometimes. Lots of mapToPair tasks a failing.  My app is configured to run
120 executors and executor memory is 2G.

These are various errors i see the in my logs.

15/02/16 10:53:48 INFO storage.MemoryStore: Block broadcast_1 of size
4680 dropped from memory (free 257277829)
15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception
was thrown by a user handler while handling an exception event ([id:
0x6e0138a3, /10.61.192.194:35196 => /10.164.164.228:49445] EXCEPTION:
java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at 
org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at 
org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
at 
org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception
was thrown by a user handler while handling an exception event ([id:
0x2d0c1db1, /10.169.226.254:55790 => /10.164.164.228:49445] EXCEPTION:
java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at 
org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at 
org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
at 
org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/16 10:53:50 WARN channel.DefaultChannelPipeline: An exception
was thrown by a user handler while handling an exception event ([id:
0xd4211985, /10.181.125.52:60959 => /10.164.164.228:49445] EXCEPTION:
java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at 
org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at 
org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
  

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Davies Liu
It also need the Cassandra jar: com.datastax.spark.connector.CassandraJavaUtil

Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?



On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
 wrote:
> Yes, am sure the system cant find the jar.. but how do I fix that... my
> submit command includes the jar:
>
> /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
> /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
> /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
>
> and the spark output seems to indicate it is handling it:
>
> 15/02/16 05:58:46 INFO SparkContext: Added JAR
> file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
> http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
> timestamp 1424066326632
>
>
> I don't really know what else I could try any suggestions highly
> appreciated.
>
> Thanks,
> Mohamed.
>
>
> On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu  wrote:
>>
>> It seems that the jar for cassandra is not loaded, you should have
>> them in the classpath.
>>
>> On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
>>  wrote:
>> > Hello all,
>> >
>> > Trying the example code from this package
>> > (https://github.com/Parsely/pyspark-cassandra) , I always get this
>> > error...
>> >
>> > Can you see what I am doing wrong? from googling arounf it seems to be
>> > that
>> > the jar is not found somehow...  The spark log shows the JAR was
>> > processed
>> > at least.
>> >
>> > Thank you so much.
>> >
>> > am using spark-1.2.1-bin-hadoop2.4.tgz
>> >
>> > test2.py is simply:
>> >
>> > from pyspark.context import SparkConf
>> > from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>> > conf = SparkConf().setAppName("PySpark Cassandra Sample Driver")
>> > conf.set("spark.cassandra.connection.host", "devzero")
>> > sc = CassandraSparkContext(conf=conf)
>> >
>> > [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash -c
>> > "/spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
>> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
>> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py"
>> > ...
>> > 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
>> > 15/02/16 05:58:45 INFO Remoting: Starting remoting
>> > 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on
>> > addresses
>> > :[akka.tcp://sparkDriver@devzero:38917]
>> > 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver'
>> > on
>> > port 38917.
>> > 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
>> > 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
>> > 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
>> >
>> > /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
>> > 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity
>> > 265.4
>> > MB
>> > 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
>> > library for your platform... using builtin-java classes where applicable
>> > 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
>> >
>> > /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
>> > 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
>> > 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
>> > server' on port 56642.
>> > 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on
>> > port
>> > 4040.
>> > 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
>> > 15/02/16 05:58:46 INFO SparkContext: Added JAR
>> > file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
>> > http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
>> > timestamp 1424066326632
>> > 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
>> >
>> > /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
>> > 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at
>> > http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
>> > 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
>> >
>> > /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
>> > 15/02/16 05:58:46 INFO SparkContext: Added file
>> > file:/spark/pyspark_cassandra.py at
>> > http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
>> > 1424066326642
>> > 15/02/16 05:58:46 INFO Executor: Starting executor ID  on host
>> > localhost
>> > 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
>> > akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
>> > 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on
>> > 32895
>> > 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register
>> > BlockManager
>> > 15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block
>> > manager
>> > localhost:32895 with 265.4 MB

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Oh, I don't know. thanks a lot Davies, gonna figure that out now

On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu  wrote:

> It also need the Cassandra jar:
> com.datastax.spark.connector.CassandraJavaUtil
>
> Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?
>
>
>
> On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
>  wrote:
> > Yes, am sure the system cant find the jar.. but how do I fix that... my
> > submit command includes the jar:
> >
> > /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
> >
> > and the spark output seems to indicate it is handling it:
> >
> > 15/02/16 05:58:46 INFO SparkContext: Added JAR
> > file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
> > http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
> > timestamp 1424066326632
> >
> >
> > I don't really know what else I could try any suggestions highly
> > appreciated.
> >
> > Thanks,
> > Mohamed.
> >
> >
> > On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu 
> wrote:
> >>
> >> It seems that the jar for cassandra is not loaded, you should have
> >> them in the classpath.
> >>
> >> On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
> >>  wrote:
> >> > Hello all,
> >> >
> >> > Trying the example code from this package
> >> > (https://github.com/Parsely/pyspark-cassandra) , I always get this
> >> > error...
> >> >
> >> > Can you see what I am doing wrong? from googling arounf it seems to be
> >> > that
> >> > the jar is not found somehow...  The spark log shows the JAR was
> >> > processed
> >> > at least.
> >> >
> >> > Thank you so much.
> >> >
> >> > am using spark-1.2.1-bin-hadoop2.4.tgz
> >> >
> >> > test2.py is simply:
> >> >
> >> > from pyspark.context import SparkConf
> >> > from pyspark_cassandra import CassandraSparkContext, saveToCassandra
> >> > conf = SparkConf().setAppName("PySpark Cassandra Sample Driver")
> >> > conf.set("spark.cassandra.connection.host", "devzero")
> >> > sc = CassandraSparkContext(conf=conf)
> >> >
> >> > [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash
> -c
> >> > "/spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
> >> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
> >> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py"
> >> > ...
> >> > 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
> >> > 15/02/16 05:58:45 INFO Remoting: Starting remoting
> >> > 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on
> >> > addresses
> >> > :[akka.tcp://sparkDriver@devzero:38917]
> >> > 15/02/16 05:58:45 INFO Utils: Successfully started service
> 'sparkDriver'
> >> > on
> >> > port 38917.
> >> > 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
> >> > 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
> >> > 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
> >> >
> >> >
> /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
> >> > 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity
> >> > 265.4
> >> > MB
> >> > 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
> >> > library for your platform... using builtin-java classes where
> applicable
> >> > 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
> >> >
> >> >
> /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
> >> > 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
> >> > 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
> >> > server' on port 56642.
> >> > 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI'
> on
> >> > port
> >> > 4040.
> >> > 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at
> http://devzero:4040
> >> > 15/02/16 05:58:46 INFO SparkContext: Added JAR
> >> > file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
> >> > http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar
> with
> >> > timestamp 1424066326632
> >> > 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
> >> >
> >> >
> /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
> >> > 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py
> at
> >> > http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
> >> > 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
> >> >
> >> >
> /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
> >> > 15/02/16 05:58:46 INFO SparkContext: Added file
> >> > file:/spark/pyspark_cassandra.py at
> >> > http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
> >> > 1424066326642
> >> > 15/02/16 05:58:46 INFO Executor: Starting executor ID  on host
> >> > localhost
> >> > 15/02/16 05:58:46 INFO AkkaUtils: Connecting to 

Re: Shuffle on joining two RDDs

2015-02-16 Thread Davies Liu
This will be fixed by https://github.com/apache/spark/pull/4629

On Fri, Feb 13, 2015 at 10:43 AM, Imran Rashid  wrote:
> yeah I thought the same thing at first too, I suggested something equivalent
> w/ preservesPartitioning = true, but that isn't enough.  the join is done by
> union-ing the two transformed rdds, which is very different from the way it
> works under the hood in scala to enable narrow dependencies.  It really
> needs a bigger change to pyspark.  I filed this issue:
> https://issues.apache.org/jira/browse/SPARK-5785
>
> (and the somewhat related issue about documentation:
> https://issues.apache.org/jira/browse/SPARK-5786)
>
> partitioning should still work in pyspark, you still need some notion of
> distributing work, and the pyspark functions have a partitionFunc to decide
> that.  But, I am not an authority on pyspark, so perhaps there are more
> holes I'm not aware of ...
>
> Imran
>
> On Fri, Feb 13, 2015 at 8:36 AM, Karlson  wrote:
>>
>> In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38,
>> wouldn't it help to change the lines
>>
>> vs = rdd.map(lambda (k, v): (k, (1, v)))
>> ws = other.map(lambda (k, v): (k, (2, v)))
>>
>> to
>>
>> vs = rdd.mapValues(lambda v: (1, v))
>> ws = other.mapValues(lambda v: (2, v))
>>
>> ?
>> As I understand, this would preserve the original partitioning.
>>
>>
>>
>> On 2015-02-13 12:43, Karlson wrote:
>>>
>>> Does that mean partitioning does not work in Python? Or does this only
>>> effect joining?
>>>
>>> On 2015-02-12 19:27, Davies Liu wrote:

 The feature works as expected in Scala/Java, but not implemented in
 Python.

 On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid 
 wrote:
>
> I wonder if the issue is that these lines just need to add
> preservesPartitioning = true
> ?
>
> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
>
> I am getting the feeling this is an issue w/ pyspark
>
>
> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid 
> wrote:
>>
>>
>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.
>> It
>> could be that pyspark doesn't properly support narrow dependencies, or
>> maybe
>> you need to be more explicit about the partitioner.  I am looking into
>> the
>> pyspark api but you might have some better guesses here than I
>> thought.
>>
>> My suggestion to do
>>
>> joinedRdd.getPartitions.foreach{println}
>>
>> was just to see if the partition was a NarrowCoGroupSplitDep or a
>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
>> fields
>> are hidden deeper inside and are not user-visible.  But I think a
>> better way
>> (in scala, anyway) is to look at rdd.dependencies.  its a little
>> tricky,
>> though, you need to look deep into the lineage (example at the end).
>>
>> Sean -- yes it does require both RDDs have the same partitioner, but
>> that
>> should happen naturally if you just specify the same number of
>> partitions,
>> you'll get equal HashPartitioners.  There is a little difference in
>> the
>> scala & python api that I missed here.  For partitionBy in scala, you
>> actually need to specify the partitioner, but not in python.  However
>> I
>> thought it would work like groupByKey, which does just take an int.
>>
>>
>> Here's a code example in scala -- not sure what is available from
>> python.
>> Hopefully somebody knows a simpler way to confirm narrow
>> dependencies??
>>
>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x ->
>>> x}.groupByKey(64)
>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
>>> x}.groupByKey(64)
>>> scala> d.partitioner == d2.partitioner
>>> res2: Boolean = true
>>> val joined = d.join(d2)
>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
>>> x}.groupByKey(100)
>>> val badJoined = d.join(d3)
>>>
>>> d.setName("d")
>>> d2.setName("d2")
>>> d3.setName("d3")
>>> joined.setName("joined")
>>> badJoined.setName("badJoined")
>>>
>>>
>>> //unfortunatley, just looking at the immediate dependencies of joined
>>> &
>>> badJoined is misleading, b/c join actually creates
>>> // one more step after the shuffle
>>> scala> joined.dependencies
>>> res20: Seq[org.apache.spark.Dependency[_]] =
>>> List(org.apache.spark.OneToOneDependency@74751ac8)
>>> //even with the join that does require a shuffle, we still see a
>>> OneToOneDependency, but thats just a simple flatMap step
>>> scala> badJoined.dependencies
>>> res21: Seq[org.apache.spark.Dependency[_]] =
>>> List(org.apache.spark.OneToOneDependency@1cf356cc)
>>
>>
>>
>>
>>>
>>>  //so lets make a helper function to get all the dependencies
>>> recursively
>>>
>>> def flatte

Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Antony Mayi
thanks, that looks promissing but can't find any reference giving me more 
details - can you please point me to something? Also is it possible to force GC 
from pyspark (as I am using pyspark)?
thanks,Antony. 

 On Monday, 16 February 2015, 21:05, Tathagata Das 
 wrote:
   
 

 Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do 
automatic cleanup of files based on which RDDs are used/garbage collected by 
JVM. That would be the best way, but depends on the JVM GC characteristics. If 
you force a GC periodically in the driver that might help you get rid of files 
in the workers that are not needed.
TD
On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi  
wrote:

spark.cleaner.ttl is not the right way - seems to be really designed for 
streaming. although it keeps the disk usage under control it also causes loss 
of rdds and broadcasts that are required later leading to crash.
is there any other way?thanks,Antony. 

 On Sunday, 15 February 2015, 21:42, Antony Mayi  
wrote:
   
 

 spark.cleaner.ttl ? 

 On Sunday, 15 February 2015, 18:23, Antony Mayi  
wrote:
   
 

 Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

 


 




 
   

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
So I tired building the connector from:
https://github.com/datastax/spark-cassandra-connector

which seems to include the java class referenced in the error message:

[root@devzero spark]# unzip -l
spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
|grep CassandraJavaUtil

14612  02-16-2015 23:25
com/datastax/spark/connector/japi/CassandraJavaUtil.class

[root@devzero spark]#


When I try running my spark test job, I still get the exact same error,
even though both my jars seems to have been processed by spark.


...
15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040
15/02/17 00:00:45 INFO SparkContext: Added JAR
file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
timestamp 1424131245595
15/02/17 00:00:45 INFO SparkContext: Added JAR
file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at
http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
with timestamp 1424131245623
15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to
/tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py
15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at
http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624
15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to
/tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py
15/02/17 00:00:45 INFO SparkContext: Added file
file:/spark/pyspark_cassandra.py at
http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp
1424131245633
15/02/17 00:00:45 INFO Executor: Starting executor ID  on host
localhost
15/

15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
Traceback (most recent call last):
  File "/spark/test2.py", line 5, in 
sc = CassandraSparkContext(conf=conf)
  File "/spark/python/pyspark/context.py", line 105, in __init__
conf, jsc)
  File "/spark/pyspark_cassandra.py", line 17, in _do_init
self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
  File "/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line
726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.


am I building the wrong connector jar? or using the wrong jar?

Thanks a lot,
Mohamed.



On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi <
mohamed.lrh...@georgetown.edu> wrote:

> Oh, I don't know. thanks a lot Davies, gonna figure that out now
>
> On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu  wrote:
>
>> It also need the Cassandra jar:
>> com.datastax.spark.connector.CassandraJavaUtil
>>
>> Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?
>>
>>
>>
>> On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
>>  wrote:
>> > Yes, am sure the system cant find the jar.. but how do I fix that... my
>> > submit command includes the jar:
>> >
>> > /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
>> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
>> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
>> >
>> > and the spark output seems to indicate it is handling it:
>> >
>> > 15/02/16 05:58:46 INFO SparkContext: Added JAR
>> > file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
>> > http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
>> > timestamp 1424066326632
>> >
>> >
>> > I don't really know what else I could try any suggestions highly
>> > appreciated.
>> >
>> > Thanks,
>> > Mohamed.
>> >
>> >
>> > On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu 
>> wrote:
>> >>
>> >> It seems that the jar for cassandra is not loaded, you should have
>> >> them in the classpath.
>> >>
>> >> On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
>> >>  wrote:
>> >> > Hello all,
>> >> >
>> >> > Trying the example code from this package
>> >> > (https://github.com/Parsely/pyspark-cassandra) , I always get this
>> >> > error...
>> >> >
>> >> > Can you see what I am doing wrong? from googling arounf it seems to
>> be
>> >> > that
>> >> > the jar is not found somehow...  The spark log shows the JAR was
>> >> > processed
>> >> > at least.
>> >> >
>> >> > Thank you so much.
>> >> >
>> >> > am using spark-1.2.1-bin-hadoop2.4.tgz
>> >> >
>> >> > test2.py is simply:
>> >> >
>> >> > from pyspark.context import SparkConf
>> >> > from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>> >> > conf = SparkConf().setAppName("PySpark Cassandra Sample Driver")
>> >> > conf.set("spark.cassandra.connection.host", "devzero")
>> >> > sc = CassandraSparkContext(conf=conf)
>> >> >
>> >> > [root@devzero spark]# /usr/local/bin/docker-enter  spark-master
>> bash -c
>> >> > "/spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py
>> --jars
>> >> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
>> >> >

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Davies Liu
Can you try the example in pyspark-cassandra?

If not, you could create a issue there.

On Mon, Feb 16, 2015 at 4:07 PM, Mohamed Lrhazi
 wrote:
> So I tired building the connector from:
> https://github.com/datastax/spark-cassandra-connector
>
> which seems to include the java class referenced in the error message:
>
> [root@devzero spark]# unzip -l
> spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
> |grep CassandraJavaUtil
>
> 14612  02-16-2015 23:25
> com/datastax/spark/connector/japi/CassandraJavaUtil.class
>
> [root@devzero spark]#
>
>
> When I try running my spark test job, I still get the exact same error, even
> though both my jars seems to have been processed by spark.
>
>
> ...
> 15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040
> 15/02/17 00:00:45 INFO SparkContext: Added JAR
> file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
> http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
> timestamp 1424131245595
> 15/02/17 00:00:45 INFO SparkContext: Added JAR
> file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at
> http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
> with timestamp 1424131245623
> 15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to
> /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py
> 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at
> http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624
> 15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to
> /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py
> 15/02/17 00:00:45 INFO SparkContext: Added file
> file:/spark/pyspark_cassandra.py at
> http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp
> 1424131245633
> 15/02/17 00:00:45 INFO Executor: Starting executor ID  on host
> localhost
> 15/
> 
> 15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
> shut down.
> Traceback (most recent call last):
>   File "/spark/test2.py", line 5, in 
> sc = CassandraSparkContext(conf=conf)
>   File "/spark/python/pyspark/context.py", line 105, in __init__
> conf, jsc)
>   File "/spark/pyspark_cassandra.py", line 17, in _do_init
> self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
>   File "/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line
> 726, in __getattr__
> py4j.protocol.Py4JError: Trying to call a package.
>
>
> am I building the wrong connector jar? or using the wrong jar?
>
> Thanks a lot,
> Mohamed.
>
>
>
> On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi
>  wrote:
>>
>> Oh, I don't know. thanks a lot Davies, gonna figure that out now
>>
>> On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu  wrote:
>>>
>>> It also need the Cassandra jar:
>>> com.datastax.spark.connector.CassandraJavaUtil
>>>
>>> Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?
>>>
>>>
>>>
>>> On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
>>>  wrote:
>>> > Yes, am sure the system cant find the jar.. but how do I fix that... my
>>> > submit command includes the jar:
>>> >
>>> > /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
>>> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
>>> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
>>> >
>>> > and the spark output seems to indicate it is handling it:
>>> >
>>> > 15/02/16 05:58:46 INFO SparkContext: Added JAR
>>> > file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
>>> > http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
>>> > timestamp 1424066326632
>>> >
>>> >
>>> > I don't really know what else I could try any suggestions highly
>>> > appreciated.
>>> >
>>> > Thanks,
>>> > Mohamed.
>>> >
>>> >
>>> > On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu 
>>> > wrote:
>>> >>
>>> >> It seems that the jar for cassandra is not loaded, you should have
>>> >> them in the classpath.
>>> >>
>>> >> On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
>>> >>  wrote:
>>> >> > Hello all,
>>> >> >
>>> >> > Trying the example code from this package
>>> >> > (https://github.com/Parsely/pyspark-cassandra) , I always get this
>>> >> > error...
>>> >> >
>>> >> > Can you see what I am doing wrong? from googling arounf it seems to
>>> >> > be
>>> >> > that
>>> >> > the jar is not found somehow...  The spark log shows the JAR was
>>> >> > processed
>>> >> > at least.
>>> >> >
>>> >> > Thank you so much.
>>> >> >
>>> >> > am using spark-1.2.1-bin-hadoop2.4.tgz
>>> >> >
>>> >> > test2.py is simply:
>>> >> >
>>> >> > from pyspark.context import SparkConf
>>> >> > from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>> >> > conf = SparkConf().setAppName("PySpark Cassandra Sample Driver")
>>> >> > conf.set("spark.cassandra.connection.host", "

Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Davies Liu
For the last question, you can trigger GC in JVM from Python by :

sc._jvm.System.gc()

On Mon, Feb 16, 2015 at 4:08 PM, Antony Mayi
 wrote:
> thanks, that looks promissing but can't find any reference giving me more
> details - can you please point me to something? Also is it possible to force
> GC from pyspark (as I am using pyspark)?
>
> thanks,
> Antony.
>
>
> On Monday, 16 February 2015, 21:05, Tathagata Das
>  wrote:
>
>
>
> Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do
> automatic cleanup of files based on which RDDs are used/garbage collected by
> JVM. That would be the best way, but depends on the JVM GC characteristics.
> If you force a GC periodically in the driver that might help you get rid of
> files in the workers that are not needed.
>
> TD
>
> On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi 
> wrote:
>
> spark.cleaner.ttl is not the right way - seems to be really designed for
> streaming. although it keeps the disk usage under control it also causes
> loss of rdds and broadcasts that are required later leading to crash.
>
> is there any other way?
> thanks,
> Antony.
>
>
> On Sunday, 15 February 2015, 21:42, Antony Mayi 
> wrote:
>
>
>
> spark.cleaner.ttl ?
>
>
> On Sunday, 15 February 2015, 18:23, Antony Mayi 
> wrote:
>
>
>
> Hi,
>
> I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using
> about 3 billions of ratings and I am doing several trainImplicit() runs in
> loop within one spark session. I have four node cluster with 3TB disk space
> on each. before starting the job there is less then 8% of the disk space
> used. while the ALS is running I can see the disk usage rapidly growing
> mainly because of files being stored under
> yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA.
> after about 10 hours the disk usage hits 90% and yarn kills the particular
> containers.
>
> am I missing doing some cleanup somewhere while looping over the several
> trainImplicit() calls? taking 4*3TB of disk space seems immense.
>
> thanks for any help,
> Antony.
>
>
>
>
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Will do. Thanks a lot.


On Mon, Feb 16, 2015 at 7:20 PM, Davies Liu  wrote:

> Can you try the example in pyspark-cassandra?
>
> If not, you could create a issue there.
>
> On Mon, Feb 16, 2015 at 4:07 PM, Mohamed Lrhazi
>  wrote:
> > So I tired building the connector from:
> > https://github.com/datastax/spark-cassandra-connector
> >
> > which seems to include the java class referenced in the error message:
> >
> > [root@devzero spark]# unzip -l
> >
> spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
> > |grep CassandraJavaUtil
> >
> > 14612  02-16-2015 23:25
> > com/datastax/spark/connector/japi/CassandraJavaUtil.class
> >
> > [root@devzero spark]#
> >
> >
> > When I try running my spark test job, I still get the exact same error,
> even
> > though both my jars seems to have been processed by spark.
> >
> >
> > ...
> > 15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040
> > 15/02/17 00:00:45 INFO SparkContext: Added JAR
> > file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
> > http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
> > timestamp 1424131245595
> > 15/02/17 00:00:45 INFO SparkContext: Added JAR
> > file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at
> >
> http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
> > with timestamp 1424131245623
> > 15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to
> >
> /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py
> > 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at
> > http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624
> > 15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to
> >
> /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py
> > 15/02/17 00:00:45 INFO SparkContext: Added file
> > file:/spark/pyspark_cassandra.py at
> > http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp
> > 1424131245633
> > 15/02/17 00:00:45 INFO Executor: Starting executor ID  on host
> > localhost
> > 15/
> > 
> > 15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator:
> Remoting
> > shut down.
> > Traceback (most recent call last):
> >   File "/spark/test2.py", line 5, in 
> > sc = CassandraSparkContext(conf=conf)
> >   File "/spark/python/pyspark/context.py", line 105, in __init__
> > conf, jsc)
> >   File "/spark/pyspark_cassandra.py", line 17, in _do_init
> > self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
> >   File "/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line
> > 726, in __getattr__
> > py4j.protocol.Py4JError: Trying to call a package.
> >
> >
> > am I building the wrong connector jar? or using the wrong jar?
> >
> > Thanks a lot,
> > Mohamed.
> >
> >
> >
> > On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi
> >  wrote:
> >>
> >> Oh, I don't know. thanks a lot Davies, gonna figure that out now
> >>
> >> On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu 
> wrote:
> >>>
> >>> It also need the Cassandra jar:
> >>> com.datastax.spark.connector.CassandraJavaUtil
> >>>
> >>> Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?
> >>>
> >>>
> >>>
> >>> On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
> >>>  wrote:
> >>> > Yes, am sure the system cant find the jar.. but how do I fix that...
> my
> >>> > submit command includes the jar:
> >>> >
> >>> > /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
> >>> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
> >>> > /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
> >>> >
> >>> > and the spark output seems to indicate it is handling it:
> >>> >
> >>> > 15/02/16 05:58:46 INFO SparkContext: Added JAR
> >>> > file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
> >>> > http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar
> with
> >>> > timestamp 1424066326632
> >>> >
> >>> >
> >>> > I don't really know what else I could try any suggestions highly
> >>> > appreciated.
> >>> >
> >>> > Thanks,
> >>> > Mohamed.
> >>> >
> >>> >
> >>> > On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu 
> >>> > wrote:
> >>> >>
> >>> >> It seems that the jar for cassandra is not loaded, you should have
> >>> >> them in the classpath.
> >>> >>
> >>> >> On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
> >>> >>  wrote:
> >>> >> > Hello all,
> >>> >> >
> >>> >> > Trying the example code from this package
> >>> >> > (https://github.com/Parsely/pyspark-cassandra) , I always get
> this
> >>> >> > error...
> >>> >> >
> >>> >> > Can you see what I am doing wrong? from googling arounf it seems
> to
> >>> >> > be
> >>> >> > that
> >>> >> > the jar is not found somehow...  The spark log shows the JAR was
> >>> >> > processed
> >>> >> > at least.
> >>> >> >
> >>> >> > Thank you so muc

Re: Use of nscala-time within spark-shell

2015-02-16 Thread Kevin (Sangwoo) Kim
What is your scala version used to build Spark?
It seems your nscala-time library scala version is 2.11,
and default Spark scala version is 2.10.


On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI  wrote:

> Hi All,
>
> Thanks in advance for your help. I have timestamp which I need to convert
> to datetime using scala. A folder contains the three needed jar files:
> "joda-convert-1.5.jar  joda-time-2.4.jar  nscala-time_2.11-1.8.0.jar"
> Using scala REPL and adding the jars: scala -classpath "*.jar"
> I can use nscala-time like following:
>
> scala> import com.github.nscala_time.time.Imports._
> import com.github.nscala_time.time.Imports._
>
> scala> import org.joda._
> import org.joda._
>
> scala> DateTime.now
> res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00
>
> But when i try to use spark-shell:
> ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
> /usr/local/spark/bin/spark-shell --master local --driver-memory 2g
> --executor-memory 2g --executor-cores 1
>
> It successfully imports the jars:
>
> scala> import com.github.nscala_time.time.Imports._
> import com.github.nscala_time.time.Imports._
>
> scala> import org.joda._
> import org.joda._
>
> but fails using them
> scala> DateTime.now
> java.lang.NoSuchMethodError:
> scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
> at
> com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)
>
> at
> com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)
>
> at
> com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)
>
> at com.github.nscala_time.time.Imports$.(Imports.scala:20)
> at com.github.nscala_time.time.Imports$.(Imports.scala)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:17)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:22)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24)
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:26)
> at $iwC$$iwC$$iwC$$iwC.(:28)
> at $iwC$$iwC$$iwC.(:30)
> at $iwC$$iwC.(:32)
> at $iwC.(:34)
> at (:36)
> at .(:40)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
> at
> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
> at
> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
> at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
>
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
> at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
> at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
> at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Your help is very aappreciated,
>
> Regards,
>
> Hammam
>


Question about spark streaming+Flume

2015-02-16 Thread bit1...@163.com
Hi,
I am trying Spark Streaming + Flume example:

1. Code
object SparkFlumeNGExample { 
   def main(args : Array[String]) { 
   val conf = new SparkConf().setAppName("SparkFlumeNGExample") 
   val ssc = new StreamingContext(conf, Seconds(10)) 

   val lines = FlumeUtils.createStream(ssc,"localhost",) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt => "Received " + cnt + " flume events. at " + 
System.currentTimeMillis() ).print() 
   ssc.start() 
   ssc.awaitTermination(); 
} 
}
2. I submit the application with following sh:
./spark-submit --deploy-mode client --name SparkFlumeEventCount --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 
--class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar


When I write data to flume, I only notice the following console information 
that input is added.
storage.BlockManagerInfo: Added input-0-1424151807400 in memory on 
localhost:39338 (size: 1095.0 B, free: 267.2 MB) 
15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 
142415181 ms 
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415182 ms

15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415187 ms

But I didn't the output from the code: "Received X flumes events"

I am no idea where the problem is, any idea? Thanks







Identify the performance bottleneck from hardware prospective

2015-02-16 Thread Julaiti Alafate
Hi there,

I am trying to scale up the data size that my application is handling. This
application is running on a cluster with 16 slave nodes. Each slave node
has 60GB memory. It is running in standalone mode. The data is coming from
HDFS that also in same local network.

In order to have an understanding on how my program is running, I also had
a Ganglia installed on the cluster. From previous run, I know the stage
that taking longest time to run is counting word pairs (my RDD consists of
sentences from a corpus). My goal is to identify the bottleneck of my
application, then modify my program or hardware configurations according to
that.

Unfortunately, I didn't find too much information on Spark monitoring and
optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for
application tuning from tasks perspective. Basically, his focus is on tasks
that oddly slower than the average. However, it didn't solve my problem
because there is no such tasks that run way slow than others in my case.

So I tried to identify the bottleneck from hardware prospective. I want to
know what the limitation of the cluster is. I think if the executers are
running hard, either CPU, memory or network bandwidth (or maybe the
combinations) is hitting the roof. But Ganglia reports the CPU utilization
of cluster is no more than 50%, network utilization is high for several
seconds at the beginning, then drop close to 0. From Spark UI, I can see
the nodes with maximum memory usage is consuming around 6GB, while
"spark.executor.memory" is set to be 20GB.

I am very confused that the program is not running fast enough, while
hardware resources are not in shortage. Could you please give me some hints
about what decides the performance of a Spark application from hardware
perspective?

Thanks!

Julaiti


Re: Question about spark streaming+Flume

2015-02-16 Thread Arush Kharbanda
Hi

Can you try this

val lines = FlumeUtils.createStream(ssc,"localhost",)
// Print out the count of events received from this server in each
batch
   lines.count().map(cnt => "Received " + cnt + " flume events. at " +
System.currentTimeMillis() )

lines.forechRDD(_.foreach(println))

Thanks
Arush

On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com  wrote:

> Hi,
> I am trying Spark Streaming + Flume example:
>
> 1. Code
> object SparkFlumeNGExample {
>def main(args : Array[String]) {
>val conf = new SparkConf().setAppName("SparkFlumeNGExample")
>val ssc = new StreamingContext(conf, Seconds(10))
>
>val lines = FlumeUtils.createStream(ssc,"localhost",)
> // Print out the count of events received from this server in each
> batch
>lines.count().map(cnt => "Received " + cnt + " flume events. at " +
> System.currentTimeMillis() ).print()
>ssc.start()
>ssc.awaitTermination();
> }
> }
> 2. I submit the application with following sh:
> ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master
> spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2
> --class spark.examples.streaming.SparkFlumeNGWordCount
> spark-streaming-flume.jar
>
>
> When I write data to flume, I only notice the following console
> information that input is added.
> storage.BlockManagerInfo: Added input-0-1424151807400 in memory on
> localhost:39338 (size: 1095.0 B, free: 267.2 MB)
> 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time
> 142415181 ms
> 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time
> 142415182 ms
> 
> 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time
> 142415187 ms
>
> But I didn't the output from the code: "Received X flumes events"
>
> I am no idea where the problem is, any idea? Thanks
>
>
> --
>
>


-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Re: Question about spark streaming+Flume

2015-02-16 Thread bit1...@163.com
Thanks Arush..
With your code, compiling error occurs:

Error:(19, 11) value forechRDD is not a member of 
org.apache.spark.streaming.dstream.ReceiverInputDStream[org.apache.spark.streaming.flume.SparkFlumeEvent]
 
lines.forechRDD(_.foreach(println)) 
^




From: Arush Kharbanda
Date: 2015-02-17 14:31
To: bit1...@163.com
CC: user
Subject: Re: Question about spark streaming+Flume
Hi

Can you try this

val lines = FlumeUtils.createStream(ssc,"localhost",) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt => "Received " + cnt + " flume events. at " + 
System.currentTimeMillis() )

lines.forechRDD(_.foreach(println))

Thanks
Arush

On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com  wrote:
Hi,
I am trying Spark Streaming + Flume example:

1. Code
object SparkFlumeNGExample { 
   def main(args : Array[String]) { 
   val conf = new SparkConf().setAppName("SparkFlumeNGExample") 
   val ssc = new StreamingContext(conf, Seconds(10)) 

   val lines = FlumeUtils.createStream(ssc,"localhost",) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt => "Received " + cnt + " flume events. at " + 
System.currentTimeMillis() ).print() 
   ssc.start() 
   ssc.awaitTermination(); 
} 
}
2. I submit the application with following sh:
./spark-submit --deploy-mode client --name SparkFlumeEventCount --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 
--class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar


When I write data to flume, I only notice the following console information 
that input is added.
storage.BlockManagerInfo: Added input-0-1424151807400 in memory on 
localhost:39338 (size: 1095.0 B, free: 267.2 MB) 
15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 
142415181 ms 
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415182 ms

15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415187 ms

But I didn't the output from the code: "Received X flumes events"

I am no idea where the problem is, any idea? Thanks








-- 
Arush Kharbanda || Technical Teamlead
ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Re: Question about spark streaming+Flume

2015-02-16 Thread bit1...@163.com
Ok, you are missing a letter in foreachRDD.. let me proceed..



bit1...@163.com
 
From: Arush Kharbanda
Date: 2015-02-17 14:31
To: bit1...@163.com
CC: user
Subject: Re: Question about spark streaming+Flume
Hi

Can you try this

val lines = FlumeUtils.createStream(ssc,"localhost",) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt => "Received " + cnt + " flume events. at " + 
System.currentTimeMillis() )

lines.forechRDD(_.foreach(println))

Thanks
Arush

On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com  wrote:
Hi,
I am trying Spark Streaming + Flume example:

1. Code
object SparkFlumeNGExample { 
   def main(args : Array[String]) { 
   val conf = new SparkConf().setAppName("SparkFlumeNGExample") 
   val ssc = new StreamingContext(conf, Seconds(10)) 

   val lines = FlumeUtils.createStream(ssc,"localhost",) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt => "Received " + cnt + " flume events. at " + 
System.currentTimeMillis() ).print() 
   ssc.start() 
   ssc.awaitTermination(); 
} 
}
2. I submit the application with following sh:
./spark-submit --deploy-mode client --name SparkFlumeEventCount --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 
--class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar


When I write data to flume, I only notice the following console information 
that input is added.
storage.BlockManagerInfo: Added input-0-1424151807400 in memory on 
localhost:39338 (size: 1095.0 B, free: 267.2 MB) 
15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 
142415181 ms 
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415182 ms

15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415187 ms

But I didn't the output from the code: "Received X flumes events"

I am no idea where the problem is, any idea? Thanks








-- 
Arush Kharbanda || Technical Teamlead
ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Re: Question about spark streaming+Flume

2015-02-16 Thread bit1...@163.com
Hi Arush, 
With your code, I still didn't see the output  "Received X flumes events"..



bit1...@163.com
 
From: bit1...@163.com
Date: 2015-02-17 14:08
To: Arush Kharbanda
CC: user
Subject: Re: Re: Question about spark streaming+Flume
Ok, you are missing a letter in foreachRDD.. let me proceed..



bit1...@163.com
 
From: Arush Kharbanda
Date: 2015-02-17 14:31
To: bit1...@163.com
CC: user
Subject: Re: Question about spark streaming+Flume
Hi

Can you try this

val lines = FlumeUtils.createStream(ssc,"localhost",) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt => "Received " + cnt + " flume events. at " + 
System.currentTimeMillis() )

lines.forechRDD(_.foreach(println))

Thanks
Arush

On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com  wrote:
Hi,
I am trying Spark Streaming + Flume example:

1. Code
object SparkFlumeNGExample { 
   def main(args : Array[String]) { 
   val conf = new SparkConf().setAppName("SparkFlumeNGExample") 
   val ssc = new StreamingContext(conf, Seconds(10)) 

   val lines = FlumeUtils.createStream(ssc,"localhost",) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt => "Received " + cnt + " flume events. at " + 
System.currentTimeMillis() ).print() 
   ssc.start() 
   ssc.awaitTermination(); 
} 
}
2. I submit the application with following sh:
./spark-submit --deploy-mode client --name SparkFlumeEventCount --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 
--class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar


When I write data to flume, I only notice the following console information 
that input is added.
storage.BlockManagerInfo: Added input-0-1424151807400 in memory on 
localhost:39338 (size: 1095.0 B, free: 267.2 MB) 
15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 
142415181 ms 
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415182 ms

15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415187 ms

But I didn't the output from the code: "Received X flumes events"

I am no idea where the problem is, any idea? Thanks








-- 
Arush Kharbanda || Technical Teamlead
ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


java.lang.NoClassDefFoundError: org/apache/spark/SparkConf

2015-02-16 Thread siqi chen
Hello,

I have a simple Kafka Spark Streaming example which I am still developing
in the standalone mode.

Here is what is puzzling me,

If I build the assembly jar, use bin/spark-submit to run it, it works fine.
But if I want to run the code from within Intellij IDE, then it will cry
for this error

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/SparkConf

...
Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkConf

Here is my build.sbt file


import _root_.sbt.Keys._
import _root_.sbtassembly.Plugin.AssemblyKeys._
import _root_.sbtassembly.Plugin.MergeStrategy
import _root_.sbtassembly.Plugin._
import AssemblyKeys._

assemblySettings

name := "test-kafka"

version := "1.0"

scalaVersion := "2.10.4"

jarName in assembly := "test-kafka-1.0.jar"

assemblyOption in assembly ~= { _.copy(includeScala = false) }

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.2.1" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.2.1" % "provided",
  ("org.apache.spark" %% "spark-streaming-kafka" % "1.2.1").
exclude("commons-beanutils", "commons-beanutils").
exclude("commons-collections", "commons-collections").
exclude("com.esotericsoftware.minlog", "minlog").
exclude("commons-logging", "commons-logging")
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
  case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
  case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
  case x if x.startsWith("plugin.properties") => MergeStrategy.last
  case x => old(x)
}
}

I also have this in my project/plugins.sbt

resolvers += Resolver.url("sbt-plugin-releases-scalasbt",
url("http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/";))

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")

*What is even more interesting is that if I pin the Spark jar to 1.1.1
instead of 1.2.1, then I can successfully run it within IntelliJ. *



​


Re: hive-thriftserver maven artifact

2015-02-16 Thread Arush Kharbanda
You can build your own spark with option -Phive-thriftserver.

You can publish the jars locally. I hope that would solve your problem.

On Mon, Feb 16, 2015 at 8:54 PM, Marco  wrote:

> Ok, so will it be only available for the next version (1.30)?
>
> 2015-02-16 15:24 GMT+01:00 Ted Yu :
>
>> I searched for 'spark-hive-thriftserver_2.10' on this page:
>> http://mvnrepository.com/artifact/org.apache.spark
>>
>> Looks like it is not published.
>>
>> On Mon, Feb 16, 2015 at 5:44 AM, Marco  wrote:
>>
>>> Hi,
>>>
>>> I am referring to https://issues.apache.org/jira/browse/SPARK-4925
>>> (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the
>>> artifact in a public repository ? I have not found it @Maven Central.
>>>
>>> Thanks,
>>> Marco
>>>
>>>
>>
>
>
> --
> Viele Grüße,
> Marco
>



-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Identify the performance bottleneck from hardware prospective

2015-02-16 Thread jalafate
Hi there,

I am trying to scale up the data size that my application is handling. This
application is running on a cluster with 16 slave nodes. Each slave node has
60GB memory. It is running in standalone mode. The data is coming from HDFS
that also in same local network.

In order to have an understanding on how my program is running, I also had a
Ganglia installed on the cluster. From previous run, I know the stage that
taking longest time to run is counting word pairs (my RDD consists of
sentences from a corpus). My goal is to identify the bottleneck of my
application, then modify my program or hardware configurations according to
that.

Unfortunately, I didn't find too much information on Spark monitoring and
optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for
application tuning from tasks perspective. Basically, his focus is on tasks
that oddly slower than the average. However, it didn't solve my problem
because there is no such tasks that run way slow than others in my case.

So I tried to identify the bottleneck from hardware prospective. I want to
know what the limitation of the cluster is. I think if the executers are
running hard, either CPU, memory or network bandwidth (or maybe the
combinations) is hitting the roof. But Ganglia reports the CPU utilization
of cluster is no more than 50%, network utilization is high for several
seconds at the beginning, then drop close to 0. From Spark UI, I can see the
nodes with maximum memory usage is consuming around 6GB, while
"spark.executor.memory" is set to be 20GB. 

I am very confused that the program is not running fast enough, while
hardware resources are not in shortage. Could you please give me some hints
about what decides the performance of a Spark application from hardware
perspective?

Thanks!

Julaiti



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Identify-the-performance-bottleneck-from-hardware-prospective-tp21684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Reynold Xin
BTW we merged this today: https://github.com/apache/spark/pull/4640

This should allow us in the future to address column by name in a Row.


On Mon, Feb 16, 2015 at 11:39 AM, Michael Armbrust 
wrote:

> I can unpack the code snippet a bit:
>
> caper.select('ran_id) is the same as saying "SELECT ran_id FROM table" in
> SQL.  Its always a good idea to explicitly request the columns you need
> right before using them.  That way you are tolerant of any changes to the
> schema that might happen upstream.
>
> The next part .map { case Row(ranId: String) => ... } is doing an
> extraction to pull out the values of the row into typed variables.  This is
> the same as doing .map(row => row(0).asInstanceOf[String]) or .map(row =>
> row.getString(0)), but I find this syntax easier to read since it lines
> up nicely with the select clause that comes right before it.  It's also
> less verbose especially when pulling out a bunch of columns.
>
> Regarding the differences between python and java/scala, part of this is
> just due to the nature of these language.  Since java/scala are statically
> typed, you will always have to explicitly say the type of the column you
> are extracting (the bonus here is they are much faster than python due to
> optimizations this strictness allows).  However, since its already a little
> more verbose, we decided not to have the more expensive ability to look up
> columns in a row by name, and instead go with a faster ordinal based API.
> We could revisit this, but its not currently something we are planning to
> change.
>
> Michael
>
> On Mon, Feb 16, 2015 at 11:04 AM, Eric Bell  wrote:
>
>>  I am just learning scala so I don't actually understand what your code
>> snippet is doing but thank you, I will learn more so I can figure it out.
>>
>> I am new to all of this and still trying to make the mental shift from
>> normal programming to distributed programming, but it seems to me that the
>> row object would know its own schema object that it came from and be able
>> to ask its schema to transform a name to a column number. Am I missing
>> something or is this just a matter of time constraints and this one just
>> hasn't gotten into the queue yet?
>>
>> Baring that, do the schema classes provide methods for doing this? I've
>> looked and didn't see anything.
>>
>> I've just discovered that the python implementation for SchemaRDD does in
>> fact allow for referencing by name and column. Why is this provided in the
>> python implementation but not scala or java implementations?
>>
>> Thanks,
>>
>> --eric
>>
>>
>>
>> On 02/16/2015 10:46 AM, Michael Armbrust wrote:
>>
>> For efficiency the row objects don't contain the schema so you can't get
>> the column by name directly.  I usually do a select followed by pattern
>> matching. Something like the following:
>>
>>  caper.select('ran_id).map { case Row(ranId: String) => }
>>
>> On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell  wrote:
>>
>>> Is it possible to reference a column from a SchemaRDD using the column's
>>> name instead of its number?
>>>
>>> For example, let's say I've created a SchemaRDD from an avro file:
>>>
>>> val sqlContext = new SQLContext(sc)
>>> import sqlContext._
>>> val caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper")
>>> caper.registerTempTable("caper")
>>>
>>> scala> caper
>>> res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
>>> SchemaRDD.scala:108
>>> == Query Plan ==
>>> == Physical Plan ==
>>> PhysicalRDD
>>> [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
>>> scala>
>>>
>>> Now I want to access fields, and of course the normal thing to do is to
>>> use a field name, not a field number.
>>>
>>> scala> val kv = caper.map(r => (r.ran_id, r))
>>> :23: error: value ran_id is not a member of
>>> org.apache.spark.sql.Row
>>>val kv = caper.map(r => (r.ran_id, r))
>>>
>>> How do I do this?
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>


PySpark and Cassandra

2015-02-16 Thread Rumph, Frens Jan
Hi,

I'm trying to connect to Cassandra through PySpark using the
spark-cassandra-connector from datastax based on the work of Mike
Sukmanowsky.

I can use Spark and Cassandra through the datastax connector in Scala just
fine. Where things fail in PySpark is that an exception is raised in
org.apache.spark.api.python.PythonRDD.writeIteratorToStream(...) with the
message 'Unexpected element
type com.datastax.spark.connector.japi.CassandraRow'.

So just to be sure: is it only possible to communicate between a Python
Spark program and the rest of the Spark ecosystem through binary or UTF-8
strings? Is there no way to communicate a richer object with at least types
like a float, etc.?

Cheers,
Frens


Re: OOM error

2015-02-16 Thread Akhil Das
Increase your executor memory, Also you can play around with increasing the
number of partitions/parallelism etc.

Thanks
Best Regards

On Tue, Feb 17, 2015 at 3:39 AM, Harshvardhan Chauhan 
wrote:

> Hi All,
>
>
> I need some help with Out Of Memory errors in my application. I am using
> Spark 1.1.0 and my application is using Java API. I am running my app on
> EC2  25 m3.xlarge (4 Cores 15GB Memory) instances. The app only fails
> sometimes. Lots of mapToPair tasks a failing.  My app is configured to run
> 120 executors and executor memory is 2G.
>
> These are various errors i see the in my logs.
>
> 15/02/16 10:53:48 INFO storage.MemoryStore: Block broadcast_1 of size 4680 
> dropped from memory (free 257277829)
> 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was 
> thrown by a user handler while handling an exception event ([id: 0x6e0138a3, 
> /10.61.192.194:35196 => /10.164.164.228:49445] EXCEPTION: 
> java.lang.OutOfMemoryError: Java heap space)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at 
> org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
>   at 
> org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
>   at 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
>   at 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
>   at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was 
> thrown by a user handler while handling an exception event ([id: 0x2d0c1db1, 
> /10.169.226.254:55790 => /10.164.164.228:49445] EXCEPTION: 
> java.lang.OutOfMemoryError: Java heap space)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at 
> org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
>   at 
> org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
>   at 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
>   at 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
>   at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/02/16 10:53:50 WARN channel.DefaultChannelPipeline: An exception was 
> thrown by a user handler while handling an exception event ([id: 0xd4211985, 
> /10.181.125.52:60959 => /10.164.164.228:49445] EXCEPTION: 
> java.lang.OutOfMemoryError: Java heap space)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at 
> org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
>   at 
> org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
>