Get spark metrics in code

2016-09-09 Thread Han JU
Hi, I'd like to know if there's a possibility to get spark's metrics from code. For example val sc = new SparkContext(conf) val result = myJob(sc, ...) result.save(...) val gauge = MetricSystem.getGauge("org.apahce.spark") println(gauge.getValue) // or send to to internal aggregat

Re: Spark SQL - Encoders - case class

2016-06-06 Thread Han JU
Hi, I think encoders for case classes are already provided in spark. You'll just need to import them. val sql = new SQLContext(sc) import sql.implicits._ And then do the cast to Dataset. 2016-06-06 14:13 GMT+02:00 Dave Maughan : > Hi, > > I've figured out how to select data from a remo

Re: Dataset API and avro type

2016-05-23 Thread Han JU
ion it > would be great if you could open a JIRA. > On May 22, 2016 12:21 PM, "Han JU" wrote: > >> Hi Michael, >> >> The error is like this under 2.0.0-preview. In 1.6.1 the error is very >> similar if not exactly the same. >> The file

Dataset API and avro type

2016-05-20 Thread Han JU
Hello, I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. However it does not seems to work with Avro data types: object Datasets extends App { val conf = new SparkConf() conf.setAppName("Dataset") conf.setMaster("local[2]") conf.setIfMissing("spark.serializer", classOf[Kr

zero-length input partitions from parquet

2016-05-02 Thread Han JU
Hi, I just found out that we can have lots of empty input partitions when reading from parquet files. Sample code as following: val hconf = sc.hadoopConfiguration val job = new Job(hconf) FileInputFormat.setInputPaths(job, new Path("path_to_data")) ParquetInputFormat.setReadSupportClass

Re: How to distribute non-serializable object in transform task or broadcast ?

2015-08-07 Thread Han JU
If the object is something like an utility object (say a DB connection handler), I often use: @transient lazy val someObj = MyFactory.getObj(...) So basically `@transient` tell the closure cleaner don't serialize this, and the `lazy val` allows it to be initiated on each executor upon its firs

Submit application to spark on mesos cluster

2014-12-09 Thread Han JU
Hi, I have a little problem in submitting our application to a mesos cluster. Basically the mesos cluster is configured and I'm able to have spark-shell working correctly. Then I tried to launch our application jar (a uber, sbt assembly jar with all deps): bin/spark-submit --master mesos://10.19

Re: mounting SSD devices of EC2 r3.8xlarge instances

2014-06-04 Thread Han JU
For SSDs in r3, maybe it's better to mount with `discard` option since it supports TRIM: What I did for r3.large: echo '/dev/xvdb /mnt ext4 defaults,noatime,nodiratime,discard 0 0' >> /etc/fstab mkfs.ext4 /dev/xvdb mount /dev/xvdb 2014-06-03 19:15 GMT+02:00 Matei Zaharia : > Those insta

Re: Shuffle file consolidation

2014-05-23 Thread Han JU
Hi Nathan, There's some explanation in the spark configuration section: ``` If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true"

Re: advice on maintaining a production spark cluster?

2014-05-21 Thread Han JU
I've seen also worker loss and that's way I asked a question about worker re-spawn. My typical case is there's some job got OOM exception. Then on the master UI some worker's state becomes DEAD. In the master's log, there's error like: ``` 14/05/21 15:38:02 ERROR remote.EndpointWriter: Associatio

Re: Worker re-spawn and dynamic node joining

2014-05-20 Thread Han JU
d that will get your worker removed. >>> >>> >>> FYI, we have a deployment tool (a web-based UI) that we use for internal >>> purposes, it is build on top of the spark-ec2 script (with some changes) >>> and it has a module for adding/removing worker nodes o

Re: Text file and shuffle

2014-05-18 Thread Han JU
I think the shuffle is unavoidable given that the input partitions (probably hadoop input spits in your case) are not arranged in the way of a cogroup job. But maybe you can try: 1) co-partition you data for cogroup: val par = HashPartitioner(128) val big = sc.textFile(..).map(...).part

Worker re-spawn and dynamic node joining

2014-05-14 Thread Han JU
Hi all, Just 2 questions: 1. Is there a way to automatically re-spawn spark workers? We've situations where executor OOM causes worker process to be DEAD and it does not came back automatically. 2. How to dynamically add (or remove) some worker machines to (from) the cluster? We'd like to le

Re: No space left on device error when pulling data from s3

2014-05-12 Thread Han JU
al.dir=/mnt/spark" > > Thanks > Best Regards > > > On Tue, May 6, 2014 at 9:35 PM, Han JU wrote: > >> Hi, >> >> I've a `no space left on device` exception when pulling some 22GB data >> from s3 block storage to the ephemeral HDFS. The cluster is on

Re: How to read a multipart s3 file?

2014-05-07 Thread Han JU
Just some complements to other answers: If you output to, say, `s3://bucket/myfile`, then you can use this bucket as the input of other jobs (sc.textFile('s3://bucket/myfile')). By default all `part-xxx` files will be used. There's also `sc.wholeTextFiles` that you can play with. If you file is s

Re: No space left on device error when pulling data from s3

2014-05-06 Thread Han JU
still write temp files to /tmp/hadoop-root ? 2014-05-06 18:05 GMT+02:00 Han JU : > Hi, > > I've a `no space left on device` exception when pulling some 22GB data > from s3 block storage to the ephemeral HDFS. The cluster is on EC2 using > spark-ec2 script with 4 m1.large.

No space left on device error when pulling data from s3

2014-05-06 Thread Han JU
Hi, I've a `no space left on device` exception when pulling some 22GB data from s3 block storage to the ephemeral HDFS. The cluster is on EC2 using spark-ec2 script with 4 m1.large. The code is basically: val in = sc.textFile("s3://...") in.saveAsTextFile("hdfs://...") Spark creates 750 inpu

Fwd: Spark RDD cache memory usage

2014-04-29 Thread Han JU
Hi, By default a fraction of the executor memory (60%) is reserved for RDD caching, so if there's no explicit caching in the code (eg. rdd.cache() etc.), or if we persist RDD with StorageLevel.DISK_ONLY, is this part of memory wated? Does Spark allocates the RDD cache memory dynamically? Or does s

Spark RDD cache memory usage

2014-04-29 Thread Han JU
Hi, By default a fraction of the executor memory (60%) is reserved for RDD caching, so if there's no explicit caching in the code (eg. rdd.cache() etc.), or if we persist RDD with StorageLevel.DISK_ONLY, is this part of memory wated? Does Spark allocates the RDD cache memory dynamically? Or does s

Questions about productionizing spark

2014-04-25 Thread Han JU
Hi all, We are actively testing/benchmarking spark for our production use. Here's some questions about problems we've encountered so far: 1. By default 66% of the executor memory is used for RDD caching, so if there's no explicit caching in the code (eg. rdd.cache(), rdd.persiste(StorageLevel.M