RE: HBase and non-existent TableInputFormat

2014-09-16 Thread abraham.jacob
Hi,

I had a similar situation in which I needed to read data from HBase and work 
with the data inside of a spark context. After much ggling, I finally got 
mine to work. There are a bunch of steps that you need to do get this working -

The problem is that the spark context does not know anything about hbase, so 
you have to provide all the information about hbase classes to both the driver 
code and executor code...


SparkConf sconf = new SparkConf().setAppName("App").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sconf);

sparkConf.set("spark.executor.extraClassPath", "$(hbase classpath)");  
//<=== you will need to add this to tell the executor about the classpath 
for HBase.


Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "Article");


JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf, 
TableInputFormat.class,org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
org.apache.hadoop.hbase.client.Result.class);


The when you submit the spark job -


spark-submit --driver-class-path $(hbase classpath) --jars 
/usr/lib/hbase/hbase-server.jar,/usr/lib/hbase/hbase-client.jar,/usr/lib/hbase/hbase-common.jar,/usr/lib/hbase/hbase-protocol.jar,/usr/lib/hbase/lib/protobuf-java-2.5.0.jar,/usr/lib/hbase/lib/htrace-core.jar
 --class YourClassName --master local App.jar


Try this and see if it works for you.


From: Y. Dong [mailto:tq00...@gmail.com]
Sent: Tuesday, September 16, 2014 8:18 AM
To: user@spark.apache.org
Subject: HBase and non-existent TableInputFormat

Hello,

I'm currently using spark-core 1.1 and hbase 0.98.5 and I want to simply read 
from hbase. The Java code is attached. However the problem is TableInputFormat 
does not even exist in hbase-client API, is there any other way I can read from
hbase? Thanks

SparkConf sconf = new SparkConf().setAppName("App").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sconf);


Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "Article");


JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf, 
TableInputFormat.class,org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
org.apache.hadoop.hbase.client.Result.class);





RE: HBase and non-existent TableInputFormat

2014-09-16 Thread abraham.jacob
Yes that was very helpful… ☺

Here are a few more I found on my quest to get HBase working with Spark –

This one details about Hbase dependencies and spark classpaths

http://www.abcn.net/2014/07/lighting-spark-with-hbase-full-edition.html

This one has a code overview –

http://www.abcn.net/2014/07/spark-hbase-result-keyvalue-bytearray.html
http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase

All of them were very helpful…



From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com]
Sent: Tuesday, September 16, 2014 10:30 AM
To: Jacob, Abraham (Financial&Risk)
Cc: tq00...@gmail.com; user
Subject: Re: HBase and non-existent TableInputFormat

Btw, there are some examples in the Spark GitHub repo that you may find 
helpful. Here's 
one
 related to HBase.

On Tue, Sep 16, 2014 at 1:22 PM, 
mailto:abraham.ja...@thomsonreuters.com>> 
wrote:
Hi,

I had a similar situation in which I needed to read data from HBase and work 
with the data inside of a spark context. After much ggling, I finally got 
mine to work. There are a bunch of steps that you need to do get this working –

The problem is that the spark context does not know anything about hbase, so 
you have to provide all the information about hbase classes to both the driver 
code and executor code…


SparkConf sconf = new SparkConf().setAppName(“App").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sconf);

sparkConf.set("spark.executor.extraClassPath", "$(hbase classpath)");  
//<=== you will need to add this to tell the executor about the classpath 
for HBase.


Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "Article");


JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf, 
TableInputFormat.class,org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
org.apache.hadoop.hbase.client.Result.class);


The when you submit the spark job –


spark-submit --driver-class-path $(hbase classpath) --jars 
/usr/lib/hbase/hbase-server.jar,/usr/lib/hbase/hbase-client.jar,/usr/lib/hbase/hbase-common.jar,/usr/lib/hbase/hbase-protocol.jar,/usr/lib/hbase/lib/protobuf-java-2.5.0.jar,/usr/lib/hbase/lib/htrace-core.jar
 --class YourClassName --master local App.jar


Try this and see if it works for you.


From: Y. Dong [mailto:tq00...@gmail.com]
Sent: Tuesday, September 16, 2014 8:18 AM
To: user@spark.apache.org
Subject: HBase and non-existent TableInputFormat

Hello,

I’m currently using spark-core 1.1 and hbase 0.98.5 and I want to simply read 
from hbase. The Java code is attached. However the problem is TableInputFormat 
does not even exist in hbase-client API, is there any other way I can read from
hbase? Thanks

SparkConf sconf = new SparkConf().setAppName(“App").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sconf);


Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "Article");


JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf, 
TableInputFormat.class,org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
org.apache.hadoop.hbase.client.Result.class);






GroupBy Key and then sort values with the group

2014-09-17 Thread abraham.jacob
Hi Group,

I am quite fresh in the spark world. There is a particular use case that I just 
cannot understand how to accomplish in spark. I am using Cloudera 
CDH5/YARN/Java 7.

I have a dataset that has the following characteristics -

A JavaPairRDD that represents the following -

Key => {int ID}
Value => {date effectiveFrom, float value}

Let's say that the data I have is the following -


Partition - 1
[K=> 1, V=> {09-17-2014, 2.8}]
[K=> 1, V=> {09-11-2014, 3.9}]
[K=> 3, V=> {09-18-2014, 5.0}]
[K=> 3, V=> {09-10-2014, 7.4}]


Partition - 2
[K=> 2, V=> {09-13-2014, 2.5}]
[K=> 4, V=> {09-07-2014, 6.2}]
[K=> 2, V=> {09-12-2014, 1.8}]
[K=> 4, V=> {09-22-2014, 2.9}]


Grouping by key gives me the following RDD

Partition - 1
[K=> 1, V=> Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})]
[K=> 3, V=> Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})]

Partition - 2
[K=> 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})]
[K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]

Now I would like to sort by the values and the result should look like this -

Partition - 1
[K=> 1, V=> Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})]
[K=> 3, V=> Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})]

Partition - 2
[K=> 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})]
[K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]


What is the best way to do this in spark? If so desired, I can even move the 
"effectiveFrom" (the field that I want to sort on) into the key field.

A code snippet or some pointers on how to solve this would be very helpful.

Regards,
Abraham


RE: GroupBy Key and then sort values with the group

2014-09-17 Thread abraham.jacob
Thanks Sean,

Makes total sense. I guess I was so caught up with RDD's and all the wonderful 
transformations it can do, that I did not think about pain old Java 
Collections.sort(list, comparator).

Thanks,

__

Abraham


-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, September 17, 2014 9:37 AM
To: Jacob, Abraham (Financial&Risk)
Cc: user@spark.apache.org
Subject: Re: GroupBy Key and then sort values with the group

You just need to call mapValues() to change your Iterable of things into a 
sorted Iterable of things for each key-value pair. In that function you write, 
it's no different from any other Java program. I imagine you'll need to copy 
the input Iterable into an ArrayList (unfortunately), sort it with whatever 
Comparator you want, and return the result.

On Wed, Sep 17, 2014 at 4:37 PM,   wrote:
> Hi Group,
>
>
>
> I am quite fresh in the spark world. There is a particular use case 
> that I just cannot understand how to accomplish in spark. I am using 
> Cloudera CDH5/YARN/Java 7.
>
>
>
> I have a dataset that has the following characteristics –
>
>
>
> A JavaPairRDD that represents the following –
>
>
>
> Key => {int ID}
>
> Value => {date effectiveFrom, float value}
>
>
>
> Let’s say that the data I have is the following –
>
>
>
>
>
> Partition – 1
>
> [K=> 1, V=> {09-17-2014, 2.8}]
>
> [K=> 1, V=> {09-11-2014, 3.9}]
>
> [K=> 3, V=> {09-18-2014, 5.0}]
>
> [K=> 3, V=> {09-10-2014, 7.4}]
>
>
>
>
>
> Partition – 2
>
> [K=> 2, V=> {09-13-2014, 2.5}]
>
> [K=> 4, V=> {09-07-2014, 6.2}]
>
> [K=> 2, V=> {09-12-2014, 1.8}]
>
> [K=> 4, V=> {09-22-2014, 2.9}]
>
>
>
>
>
> Grouping by key gives me the following RDD
>
>
>
> Partition – 1
>
> [K=> 1, V=> Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})]
>
> [K=> 3, V=> Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})]
>
>
>
> Partition – 2
>
> [K=> 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})]
>
> [K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]
>
>
>
> Now I would like to sort by the values and the result should look like 
> this –
>
>
>
> Partition – 1
>
> [K=> 1, V=> Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})]
>
> [K=> 3, V=> Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})]
>
>
>
> Partition – 2
>
> [K=> 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})]
>
> [K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]
>
>
>
>
>
> What is the best way to do this in spark? If so desired, I can even 
> move the “effectiveFrom” (the field that I want to sort on) into the key 
> field.
>
>
>
> A code snippet or some pointers on how to solve this would be very helpful.
>
>
>
> Regards,
>
> Abraham

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Stable spark streaming app

2014-09-17 Thread abraham.jacob
Nice write-up... very helpful!


-Original Message-
From: Tim Smith [mailto:secs...@gmail.com] 
Sent: Wednesday, September 17, 2014 1:11 PM
Cc: spark users
Subject: Re: Stable spark streaming app

I don't have anything in production yet but I now at least have a stable 
(running for more than 24 hours) streaming app. Earlier, the app would crash 
for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each node (Yarn 
managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This might be 
more of a CDH issue. I noticed that while the Yarn container and other Spark 
ancillary tasks had GC options set at launch but none for the executors. So I 
played with different GC options and this worked best:
SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC 
-XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails"

I tried G1GC but for some reason it just didn't work. I am not a Java 
programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the "stdout" file in the Yarn container 
logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the 
driver node and Yarn will respect these. On CDH/CM specifically, even though 
you don't run Spark as a service (since you are using Yarn for RM), you can 
goto "Spark Client Advanced Configuration Snippet (Safety Valve) for 
spark-conf/spark-env.sh" and set SPARK_JAVA_OPTS there.

- Set these two params - "spark.yarn.executor.memoryOverhead"
"spark.yarn.driver.memoryOverhead". Earlier, my app would get killed because 
the executors running the kafka receivers would get killed by Yarn for over 
utilization of memory. Now, these are my memory settings (I will paste the 
entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 
1024 \

Your total executor JVM will consume "executor-memory" minus 
"spark.yarn.executor.memoryOverhead" so you should see each executor JVM 
consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +"%m-%d-%YT%T"`; \
nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ 
--jars 
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \ 
--spark.rdd.compress true \ --spark.io.compression.codec 
org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ 
--spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ 
--spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ 
--spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true 
\ --spark.default.parallelism 528 \
>logs/normRunLog-$run.log \
2>logs/normRunLogError-$run.log & \
echo $! > logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not scientifically 
measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as you see 
above)
- I map one receive task for each kafka partition
- Instead of doing a "union" on all the incoming streams and then
repartition() I now repartition() each incoming stream and process them 
separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a "union" on all 
incoming dStreams. I now repartition each of the five streams separately (in a 
loop) to 24.
- For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
- Process data per partition instead of per RDD: dataout.foreachRDD( rdd => 
rdd.foreachPartition(rec => { myFunc(rec) }) )
- Specific to kafka: when I create "new Producer", make sure I "close"
it else I had a ton of "too many files open" errors :)
- Use immutable objects as far as possible. If I use mutable objects within a 
method/class then I turn them into immutable before passing onto another 
class/method.
- For logging, create a LogService object that I then use for other 
object/class declarations. Once instantiated, I can make "logInfo"
calls from within other Objects/Methods/Classes and output goes to the "stderr" 
file in the Yarn container logs. Good for debugging stream processing logic.

Currently, my processing delay is lower than my dStream time window so all is 
good. I get a ton of these errors in my driver logs:
14/09/16 21:17:40 ERROR LiveListenerBus: L

RE: Accumulator Immutability?

2014-09-22 Thread abraham.jacob
I am also not familiar with Scala but I believe the concept is similar to the 
concept of String in Java.

accum point to a “Accumulator”. You can change what it points to, but not that 
which it points to.



From: Vikram Kalabi [mailto:vikram.apache@gmail.com]
Sent: Monday, September 22, 2014 11:30 AM
To: user@spark.apache.org
Subject: Accumulator Immutability?

Consider this snippet from spark 
scaladoc,


scala> val accum = sc.accumulator(0)

accum: spark.Accumulator[Int] = 0



scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

...

10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s



scala> accum.value

res2: Int = 10

How accum value is allowed to change? "accum" is defined as val, which is 
immutable; but after next step accum value has changed to 10.

I am newbie to spark and scala so please clarify this.

Thanks in advance.

Vikram


Cancelled Key exception

2014-09-22 Thread abraham.jacob
Hi Sparklers,

I was wondering if some else has also encountered this... (Actually I am not 
even sure if this is an issue)...

I have a spark job that reads data from Hbase does a bunch of transformation

sparkContext.newAPIHadoopRDD -> flatMapToPair -> groupByKey -> mapValues

After this I do a take(10) on the result to print it out in the log file.

I always get the results which I am 100% sure are correct. However, every once 
in a while I get the following in the log file even if the results are correct -

14/09/22 20:16:22 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
14/09/22 20:16:22 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.
14/09/22 20:16:22 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon 
shut down; proceeding with flushing remote transports.
14/09/22 20:16:22 INFO Remoting: Remoting shut down
14/09/22 20:16:22 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut 
down.
14/09/22 20:16:24 INFO ConnectionManager: Key not valid ? 
sun.nio.ch.SelectionKeyImpl@a4503d8
14/09/22 20:16:24 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(tr-pan--04,55008)
14/09/22 20:16:24 INFO ConnectionManager: Removing ReceivingConnection to 
ConnectionManagerId(tr-pan--04,55008)
14/09/22 20:16:24 ERROR ConnectionManager: Corresponding 
SendingConnectionManagerId not found
14/09/22 20:16:24 INFO ConnectionManager: key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@a4503d8
java.nio.channels.CancelledKeyException
at 
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363)
at 
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:116)

The spark dashboard also does not show any error of executors failing.

Would it be possible for someone to throw some light into what this actually 
means? and whether we should be concerned about it?

I am running a Cloudera CDH 5.1.2 cluster with I believe spark v1.0.0

The spark job is submitted to yarn.

-abe