Re: --jars works in "yarn-client" but not "yarn-cluster" mode, why?

2015-05-20 Thread Fengyun RAO
-core-3.1.0-incubating.jar" in your spark-submit command > line. (You can also add those configs to your spark-defaults.conf to avoid > having to type them all the time; and don't forget to include any other > jars that might be needed.) > > > On Mon, May 18, 2

Re: How to share a NonSerializable variable among tasks in the same worker node?

2015-01-20 Thread Fengyun RAO
currently we migrate from 1.1 to 1.2, and found our program 3x slower, maybe due to the singleton hack? could you explain in detail why or how "The singleton hack works very different in spark 1.2.0 " thanks! 2015-01-18 20:56 GMT+08:00 octavian.ganea : > The singleton hack works very different

spark 1.2 three times slower than spark 1.1, why?

2015-01-20 Thread Fengyun RAO
Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line => LogParser.parseLine(li

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-20 Thread Fengyun RAO
, 2015 at 10:39 PM, Fengyun RAO > wrote: > > Currently we are migrating from spark 1.1 to spark 1.2, but found the > > program 3x slower, with nothing else changed. > > note: our program in spark 1.1 has successfully processed a whole year > data, > > quite stable. > &g

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
gleton pattern doesn't work or > works differently. I wonder if, for example, task scheduling is different > in 1.2 and you have more partitions across more workers and so are loading > more copies more slowly into your singletons. > On Jan 21, 2015 7:13 AM, "Fengyun RAO"

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
on-serializable > native code / objects. > > FWIW RDD#flatMap() does not appear to have changed 1.1 -> 1.2 (tho master > has a slight refactor). Agree it's worth checking the number of partitions > in your 1.1 vs 1.2 test. > > > > On Tue, Jan 20, 2015 at 11:13 PM, Fe

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
I wonder if, for example, task scheduling is different > in 1.2 and you have more partitions across more workers and so are loading > more copies more slowly into your singletons. > On Jan 21, 2015 7:13 AM, "Fengyun RAO" wrote: > >> the LogParser instance is not ser

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
of my > previous post. > > Please check your shuffle I/O differences between the two in spark web UI > because it can be possibly related to my case. > > > > Thanks > > Kevin > > > > --- *Original Message* --- > > *Sender* : Fengyun RAO > >

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
you made more executors per machine. But from > your additional info it does not sound like this is the case. I think you > need more debugging to pinpoint what is slower. > On Jan 21, 2015 9:30 AM, "Fengyun RAO" wrote: > >> thanks, Sean. >> >> I don'

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
btw: Shuffle Write(11 GB) mean 11 GB per Executor, for each task, it's ~40 MB 2015-01-21 17:53 GMT+08:00 Fengyun RAO : > I don't know how to debug distributed application, any tools or suggestion? > > but from spark web UI, > > the GC time (~0.1 s), Shuffle Write(11 GB)

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-24 Thread Fengyun RAO
caused by the cache in LogParser, so I mocked a class to avoid the cache, unfortunately it’s still slower. 2015-01-22 4:33 GMT+08:00 Davies Liu : On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO wrote: > > the LogParser instance is not serializable, and thus cannot be a > broadcast, > &g

--jars works in "yarn-client" but not "yarn-cluster" mode, why?

2015-05-13 Thread Fengyun RAO
Hadoop version: CDH 5.4. We need to connect to HBase, thus need extra "/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar" dependency. It works in yarn-client mode: "spark-submit --class xxx.xxx.MyApp --master yarn-client --num-executors 10 --executor-memory 10g --jars /opt/

Re: --jars works in "yarn-client" but not "yarn-cluster" mode, why?

2015-05-13 Thread Fengyun RAO
ting.jar I wonder why htrace exists in "spark.yarn.secondary.jars" but still not found in URLClassLoader. I tried both "local" and "file" mode for the jar, still the same error. 2015-05-14 11:37 GMT+08:00 Fengyun RAO : > Hadoop version: CDH 5.4. > > We

Re: --jars works in "yarn-client" but not "yarn-cluster" mode, why?

2015-05-14 Thread Fengyun RAO
hat is available on your classpath. It > looks like you are running into a similar situation as described in > SPARK-5377. > > Wilfred > > On 14/05/2015 13:47, Fengyun RAO wrote: > >> I look into the "Environment" in both modes. >> >> yarn-client:

Re: --jars works in "yarn-client" but not "yarn-cluster" mode, why?

2015-05-17 Thread Fengyun RAO
ed Spiegelenburg >> mailto:wspiegelenb...@cloudera.com>>: >> >> In the cluster the driver runs in the cluster and not locally in the >> spark-submit JVM. This changes what is available on your classpath. >> It looks like you are running into a similar si

Re: --jars works in "yarn-client" but not "yarn-cluster" mode, why?

2015-05-18 Thread Fengyun RAO
Thanks, Marcelo! Below is the full log, SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/p

Is it possible to read file head in each partition?

2014-07-29 Thread Fengyun RAO
Hi, all We are migrating from mapreduce to spark, and encountered a problem. Our input files are IIS logs with file head. It's easy to get the file head if we process only one file, e.g. val lines = sc.textFile('hdfs://*/u_ex14073011.log') val head = lines.take(4) Then we can write our map meth

Re: Is it possible to read file head in each partition?

2014-07-29 Thread Fengyun RAO
n gets mapped to exactly one partition? Then you could probably >> get what you want using RDD.mapPartitions(). >> >> Nick >> ​ >> >> >> On Wed, Jul 30, 2014 at 12:02 AM, Fengyun RAO >> wrote: >> >>> Hi, all >>> >>> We

Re: Is it possible to read file head in each partition?

2014-07-30 Thread Fengyun RAO
e to filter them out by > prefix string matching or regex? > > > On Wed, Jul 30, 2014 at 1:39 PM, Fengyun RAO wrote: > >> It will certainly cause bad performance, since it reads the whole content >> of a large file into one value, instead of splitting it into partitions. &

Is there a way to write spark RDD to Avro files

2014-07-30 Thread Fengyun RAO
We used mapreduce for ETL and storing results in Avro files, which are loaded to hive/impala for query. Now we are trying to migrate to spark, but didn't find a way to write resulting RDD to Avro files. I wonder if there is a way to make it, or if not, why spark doesn't support Avro as well as ma

Re: Is there a way to write spark RDD to Avro files

2014-07-31 Thread Fengyun RAO
efined here: > > http://spark.apache.org/docs/1.0.0/api/scala/#org.apache.spark.rdd.PairRDDFunctions > > Lots of RDD types include that functionality already. > > > On Wed, Jul 30, 2014 at 2:14 AM, Fengyun RAO wrote: > > We used mapreduce for ETL and storing results in Avro file

How to share a NonSerializable variable among tasks in the same worker node?

2014-07-31 Thread Fengyun RAO
As shown here: 2 - Why Is My Spark Job so Slow and Only Using a Single Thread? 123456789101112131415 object JSONParser { def parse(raw: String): String = ...}object MyFirstSparkJob { def main

Re: Is there a way to write spark RDD to Avro files

2014-08-02 Thread Fengyun RAO
Below works for me: val job = Job.getInstance val schema = Schema.create(Schema.Type.STRING) AvroJob.setOutputKeySchema(job, schema) records.map(item => (new AvroKey[String](item.getGridsumId), NullWritable.get())) .saveAsNewAPIHadoopFile(args(1),

Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-03 Thread Fengyun RAO
Could anybody help? I wonder if I asked a stupid question or I didn't make the question clear? 2014-07-31 21:47 GMT+08:00 Fengyun RAO : > As shown here: > 2 - Why Is My Spark Job so Slow and Only Using a Single Thread? > <http://engineering.sharethrough.com/blog

Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-04 Thread Fengyun RAO
are running as separate > VMs so it might need to be able to serialize and deserialize broadcasted > variables to the different executors. > > Thanks, > Ron > > On Aug 3, 2014, at 6:38 PM, Fengyun RAO wrote: > > Could anybody help? > > I wonder if I asked a stupid que

Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-04 Thread Fengyun RAO
issue. > > If you want, the equivalent of "setup()" is really "writing some code > at the start of a call to mapPartitions()" > > On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO wrote: > > Thanks, Ron. > > > > The problem is that the "parser"

Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-04 Thread Fengyun RAO
ot appear to contain a parser > object. > > On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO wrote: > > Thanks, Sean! > > > > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only > Using > > a Single Thread? says " parser instance is now a singleton

Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-09 Thread Fengyun RAO
Although nobody answers the Two questions, in my practice, it seems both are yes. 2014-08-04 19:50 GMT+08:00 Fengyun RAO : > object LogParserWrapper { > private val logParser = { > val settings = new ... > val builders = new > new LogParser(bui

Fwd: how to split RDD by key and save to different path

2014-08-12 Thread Fengyun RAO
me key in the same partition } } 2014-08-12 21:34 GMT+08:00 Fengyun RAO : > 1. be careful, HDFS are better for large files, not bunches of small files. > > 2. if that's really what you want, roll it your own. > > def writeAvro(iterator: Iterator[(String, String)]) = { >

sqlContext.parquetFile(path) fails if path is a file but succeeds if a directory

2014-08-18 Thread Fengyun RAO
I'm using CDH 5.1 with spark 1.0. When I try to run Spark SQL following the Programming Guide val parquetFile = sqlContext.parquetFile(path) If the "path" is a file, it throws an exception: Exception in thread "main" java.lang.IllegalArgumentException: Expected hdfs://*/file.parquet for be a d

[Spark SQL] How to select first row in each GROUP BY group?

2014-08-20 Thread Fengyun RAO
I have a table with 4 columns: a, b, c, time What I need is something like: SELECT a, b, GroupFirst(c) FROM t GROUP BY a, b GroupFirst means "the first" item of column c group, and by "the first" I mean minimal "time" in that group. In Oracle/Sql Server, we could write: WITH summary AS (

Re: [Spark SQL] How to select first row in each GROUP BY group?

2014-08-21 Thread Fengyun RAO
Could anybody help? I googled and read a lot, but didn’t find anything helpful. or to make the question simple: * How to set row number for each group? * SELECT a, ROW_NUMBER() OVER (PARTITION BY a) AS num FROM table. 2014-08-20 15:52 GMT+08:00 Fengyun RAO : I have a table with 4

Re: [Spark SQL] How to select first row in each GROUP BY group?

2014-08-21 Thread Fengyun RAO
then for > each group just take the first record. > > From: Fengyun RAO > Date: Thursday, August 21, 2014 at 8:26 AM > To: "user@spark.apache.org" > Subject: Re: [Spark SQL] How to select first row in each GROUP BY group? > >Could anybody help? I googled a

How to add HBase dependencies and conf with spark-submit?

2014-10-15 Thread Fengyun RAO
We use Spark 1.1, and HBase 0.98.1-cdh5.1.0, and need to read and write an HBase table in Spark program. I notice there are: spark.driver.extraClassPath spark.executor.extraClassPathproperties to manage extra ClassPath, over even an deprecated SPARK_CLASSPATH. The problem is what classpath or jar

How to close resources shared in executor?

2014-10-15 Thread Fengyun RAO
In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be shared among tasks on the same executor. e.g. val result = rdd.map(line => { val table

Re: How to add HBase dependencies and conf with spark-submit?

2014-10-15 Thread Fengyun RAO
+user@hbase 2014-10-15 20:48 GMT+08:00 Fengyun RAO : > We use Spark 1.1, and HBase 0.98.1-cdh5.1.0, and need to read and write an > HBase table in Spark program. > > I notice there are: > spark.driver.extraClassPath > spark.executor.extraClassPathproperties to manage extra Clas

Re: How to close resources shared in executor?

2014-10-15 Thread Fengyun RAO
ult = rdd.map(line => { val table = Util.Connection.getTable("user") >> ... >> Util.Connection.close() } >> >> On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO >> wrote: >> >>> In order to share an HBase connection pool, we create an object >>> >

Re: How to close resources shared in executor?

2014-10-15 Thread Fengyun RAO
documentation that statesHconnectionManager would release underlying connection automatically? If that’s true, maybe the Javadoc which recommends a shutdown hook needs update ​ 2014-10-16 14:20 GMT+08:00 Fengyun RAO : > Thanks, Ted. > Util.Connection.close() should be called only once, so

Re: How to add HBase dependencies and conf with spark-submit?

2014-10-16 Thread Fengyun RAO
ra/parcels/CDH/lib/hbase/hbase-common.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar > \ > > - Original Message - > From: "Fengyun RAO" > To: user@spark.apache.org, u...@hbase.apache.org > Sent: Wedne

Re: How to close resources shared in executor?

2014-10-16 Thread Fengyun RAO
sets cleanupConnectionOnClose to true. > > w.r.t. javadoc, the paragraph on shutdown hook is > in HConnectionManager.java of 0.94 > You don't need to use shutdown hook for 0.94+ > > Cheers > > On Wed, Oct 15, 2014 at 11:41 PM, Fengyun RAO > wrote: > >> I ma

Re: How to close resources shared in executor?

2014-10-16 Thread Fengyun RAO
nectionManager.java): > > * HTableInterface table = connection.getTable("table1"); > * try { > * // Use the table as needed, for a single operation and a single thread > * } finally { > * table.close(); > * connection.close(); > * } > > Cheers > > On T

What does KryoException: java.lang.NegativeArraySizeException mean?

2014-10-20 Thread Fengyun RAO
The exception drives me crazy, because it occurs randomly. I didn't know which line of my code causes this exception. I didn't even understand what "KryoException: java.lang.NegativeArraySizeException" means, or even implies? 14/10/20 15:59:01 WARN scheduler.TaskSetManager: Lost task 32.2 in stag

Re: What does KryoException: java.lang.NegativeArraySizeException mean?

2014-10-20 Thread Fengyun RAO
Thank you, Guillaume, my dataset is not that large, it's totally ~2GB 2014-10-20 16:58 GMT+08:00 Guillaume Pitel : > Hi, > > It happened to me with blocks which take more than 1 or 2 GB once > serialized > > I think the problem was that during serialization, a Byte Array is > created, and arrays

Re: What does KryoException: java.lang.NegativeArraySizeException mean?

2014-10-21 Thread Fengyun RAO
Thanks, Guilaume, Below is when the exception happens, nothing has spilled to disk yet. And there isn't a join, but a partitionBy and groupBy action. Actually if numPartitions is small, it succeeds, while if it's large, it fails. Partition was simply done by override def getPartition(key: A

what classes are needed to register in KryoRegistrator, e.g. Row?

2014-10-26 Thread Fengyun RAO
In Tuning Spark , it says, Spark automatically includes Kryo serializers for the *many commonly-used core Scala classes* covered in the AllScalaRegistrar from the Twitter chill library. I looked into the AllScalaR

Re: what classes are needed to register in KryoRegistrator, e.g. Row?

2014-10-28 Thread Fengyun RAO
Although nobody answers, as I tested, Row, MutableValue and there subclasses are not registered by default, which I think should be, since they would absolutely show up in Spark SQL. ​ 2014-10-26 23:43 GMT+08:00 Fengyun RAO : > In Tuning Spark <https://spark.apache.org/docs/latest/tunin