spark-ec2 [Errno 110] Connection time out

2014-08-30 Thread David Matheson
I'm following the latest documentation on configuring a cluster on ec2
(http://spark.apache.org/docs/latest/ec2-scripts.html).  Running 
> ./spark-ec2 -k Blah -i .ssh/Blah.pem -s 2 launch spark-ec2-test
gets a generic timeout error that's coming from
  File "./spark_ec2.py", line 717, in real_main
conn = ec2.connect_to_region(opts.region)

Any suggestions on how to debug the cause of the timeout? 

Note: I replaced the name of my keypair with Blah.

Thanks,
David




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-Errno-110-Connection-time-out-tp13171.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark on yarn with hive

2014-08-30 Thread centerqi hu
I want to let hive run on spark and yarn clusters,Hive Metastore is stored
in MySQL

I compiled spark code:
sh make-distribution.sh --hadoop 2.4.1 --with-yarn --skpi-java-test --tgz
--with-hive

My HQL code:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.hive.LocalHiveContext

object HqlTest {
  case class Record(key: Int, value: String)
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HiveFromSpark")
val sc = new SparkContext(sparkConf)

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
hiveContext.hql("FROM tmp_adclick_udc select
uid").collect().foreach(println)
  }
}

I submitted the job code:
/usr/local/webserver/sparkhive/bin/spark-submit --class HqlTest --master
yarn --deploy-mode cluster --queue sls_queue_1 --num-executors 5
--driver-memory 6g --executor-memory 20g --executor-cores 5
target/scala-2.10/simple-project_2.10-1.0.jar
/user/www/udc/output/2014-08-10/*
/user/www/udc/input/platformuvpv2/2014-08-10

I put hive-site.xml into spark/conf/.

But, I get the following error:

14/08/30 16:30:49 INFO Datastore: The class
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
"embedded-only" so does not have its own datastore table.
14/08/30 16:30:49 INFO Datastore: The class
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
"embedded-only" so does not have its own datastore table.
14/08/30 16:30:54 ERROR Hive:
NoSuchObjectException(message:default.tmp_adclick_udc table not found)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1373)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103)
at com.sun.proxy.$Proxy26.get_table(Unknown Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:854)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

thanks
-- 
cente...@gmail.com|齐忠


Re: What is the better data structure in an RDD

2014-08-30 Thread Sean Owen
Since you can translate between the two representations, either one works.

It sounds like Measures within one bucket are always used together,
even if they are not necessarily related. Their ordering matters.
Therefore it seems like an RDD[(String,Seq[Measure])] or something is
most appropriate. You can sort or aggregate with mapValues, and can
combine RDDs with join.

On Sat, Aug 30, 2014 at 12:00 AM, cjwang  wrote:
> I need some advices regarding how data are stored in an RDD.  I have millions
> of records, called "Measures".  They are bucketed with keys of String type.
> I wonder if I need to store them as RDD[(String, Measure)] or RDD[(String,
> Iterable[Measure])], and why?
>
> Data in each bucket are not related most of the time.  The operations that I
> often needs to do are:
>
> - Sort the Measures in each bucket separately
> - Aggregate the Measures in each bucket separately
> - Combine Measures in two RDDs into one based on their bucket keys
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-better-data-structure-in-an-RDD-tp13159.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Sean Owen
I'm no expert. But as I understand, yes you create multiple streams to
consume multiple partitions in parallel. If they're all in the same
Kafka consumer group, you'll get exactly one copy of the message so
yes if you have 10 consumers and 3 Kafka partitions I believe only 3
will be getting messages.

The parallelism of Spark's processing of the RDDs of those messages is
different. There could be 4 partitions in your RDDs doing the work.
This is the kind of thing you potentially influence with repartition.
That is I believe you can get more tasks processing the messages even
if you are only able to consume messages from the queue with 3-way
parallelism, since the queue has 3 partitions.

On Aug 30, 2014 12:56 AM, "Tim Smith"  wrote:
>
> Ok, so I did this:
> val kInStreams = (1 to 10).map{_ => 
> KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
>  -> 1)) }
> val kInMsg = ssc.union(kInStreams)
> val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
>
> This has improved parallelism. Earlier I would only get a "Stream 0". Now I 
> have "Streams [0-9]". Of course, since the kafka topic has only three 
> partitions, only three of those streams are active but I am seeing more 
> blocks being pulled across the three streams total that what one was doing 
> earlier. Also, four nodes are actively processing tasks (vs only two earlier) 
> now which actually has me confused. If "Streams" are active only on 3 nodes 
> then how/why did a 4th node get work? If a 4th got work why aren't more nodes 
> getting work?
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Hive max key length is 767 bytes

2014-08-30 Thread arthur.hk.c...@gmail.com
Hi Michael,

Thank you so much!!

I have tried to change the following key length from 256 to 255 and from 767 to 
766, it still didn’t work
alter table COLUMNS_V2 modify column COMMENT VARCHAR(255);
alter table INDEX_PARAMS modify column PARAM_KEY VARCHAR(255);
alter table SD_PARAMS modify column PARAM_KEY VARCHAR(255);
alter table SERDE_PARAMS modify column PARAM_KEY VARCHAR(255);
alter table TABLE_PARAMS modify column PARAM_KEY VARCHAR(255);
alter table TBLS modify column OWNER VARCHAR(766);
alter table PART_COL_STATS modify column PARTITION_NAME VARCHAR(766);
alter table PARTITION_KEYS modify column PKEY_TYPE VARCHAR(766);
alter table PARTITIONS modify column PART_NAME VARCHAR(766);

I use Hadoop 2.4.1 HBase 0.98.5 Hive 0.13, trying Spark 1.0.2 and Shark 0.9.2, 
and JDK1.6_45.

Some questions:
shark-0.9.2 is based on which Hive version?  is HBase 0.98.x OK? is Hive 0.13.1 
OK? and which Java?  (I use JDK1.6 at the moment, it seems not working)
spark-1.0.2 is based on which Hive version?  is HBase 0.98.x OK?  

Regards
Arthur 


On 30 Aug, 2014, at 1:40 am, Michael Armbrust  wrote:

> Spark SQL is based on Hive 12.  They must have changed the maximum key size 
> between 12 and 13.
> 
> 
> On Fri, Aug 29, 2014 at 4:38 AM, arthur.hk.c...@gmail.com 
>  wrote:
> Hi,
> 
> 
> Tried the same thing in HIVE directly without issue:
> 
> HIVE:
> hive> create table test_datatype2 (testbigint bigint );
> OK
> Time taken: 0.708 seconds
> 
> hive> drop table test_datatype2;
> OK
> Time taken: 23.272 seconds
> 
> 
> 
> Then tried again in SPARK:
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> 14/08/29 19:33:52 INFO Configuration.deprecation: 
> mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
> mapreduce.reduce.speculative
> hiveContext: org.apache.spark.sql.hive.HiveContext = 
> org.apache.spark.sql.hive.HiveContext@395c7b94
> 
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> res0: org.apache.spark.sql.SchemaRDD = 
> SchemaRDD[0] at RDD at SchemaRDD.scala:104
> == Query Plan ==
> 
> 
> scala> hiveContext.hql("drop table test_datatype3")
> 
> 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
> adding/validating class(es) : Specified key was too long; max key length is 
> 767 bytes
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
> too long; max key length is 767 bytes
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
> 
> 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
> org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in 
> no possible candidates
> Error(s) were found while auto-creating/validating the datastore for classes. 
> The errors are printed in the log, and are attached to this exception.
> org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found 
> while auto-creating/validating the datastore for classes. The errors are 
> printed in the log, and are attached to this exception.
>   at 
> org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)
> 
> 
> Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: 
> Specified key was too long; max key length is 767 bytes
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
> 
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
> "embedded-only" so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" 
> so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
> "embedded-only" so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" 
> so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
> "embedded-only" so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" 
> so does not have its own datastore table.
> 14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while 
> adding/validating class(es) : Specified key was too long; max key length is 
> 767 bytes
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
> too long; max key length is 7

Re: org.apache.spark.examples.xxx

2014-08-30 Thread Akhil Das
It bundles all these src's
https://github.com/apache/spark/tree/master/examples together and also it
uses the pom file to get the dependencies list if I'm not wrong.

Thanks
Best Regards


On Fri, Aug 29, 2014 at 12:39 AM, filipus  wrote:

> hey guys
>
> i still try to get used to compile and run the example code
>
> why does the run_example code submit the class with an
> org.apache.spark.examples in front of the class itself?
>
> probably a stupid question but i would be glad some one of you explains
>
> by the way.. how was the "spark...example...jar" file build?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: org.apache.spark.examples.xxx

2014-08-30 Thread Ted Yu
bq. how was the "spark...example...jar" file build?

You can use the following command to build against hadoop 2.4:

mvn -Phadoop-2.4,yarn -Dhadoop.version=2.4.1 -DskipTests clean package

examples jar can be found under examples/target

Cheers


On Sat, Aug 30, 2014 at 6:54 AM, Akhil Das 
wrote:

> It bundles all these src's
> https://github.com/apache/spark/tree/master/examples together and also it
> uses the pom file to get the dependencies list if I'm not wrong.
>
> Thanks
> Best Regards
>
>
> On Fri, Aug 29, 2014 at 12:39 AM, filipus  wrote:
>
>> hey guys
>>
>> i still try to get used to compile and run the example code
>>
>> why does the run_example code submit the class with an
>> org.apache.spark.examples in front of the class itself?
>>
>> probably a stupid question but i would be glad some one of you explains
>>
>> by the way.. how was the "spark...example...jar" file build?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: org.apache.spark.examples.xxx

2014-08-30 Thread filipus
i try to get use to "sbt" in order to build stnd allone application by myself

the example "SimpleApp" i managed to run

than i tried to copy some example scala program like "LinearRegression" in a
local directory

.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/LinearRegression.scala

my build.sbt even when I dont know what I do looks like

name := "Linear Regression"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.2"

libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.0.2"

libraryDependencies += "com.github.scopt" %% "scopt" % "3.2.0"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/";

By the way... first i tried scalaVersion := 2.11.2 which is my installed
version. but this faild

...

sbt package builds a jar file in target but the command

spark-submit --class "LinearRegression" --master local[2]
target/scala-2.10/linear-regression_2.10-1.0.jar
~/git/spark/data/mllib/sample_linear_regression_data.txt

didnt work. it tells me

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Exception in thread "main" java.lang.NoClassDefFoundError:
scopt/OptionParser
at java.lang.Class.getDeclaredMethods0(Native Method)

AHHH: I comented /*package org.apache.spark.examples.mllib*/ in
LinearRegression.scala because otherwise it doesnt find the main class
Exception in thread "main" java.lang.ClassNotFoundException:
LinearRegression

when I does the same with the pre build jar package of examples everything
works fine

spark-submit --class  org.apache.spark.examples.mllib.LinearRegression
--master local[2] lib/spark-examples-1.0.2-hadoop2.2.0.jar
~/git/spark/data/mllib/sample_linear_regression_data.txt

works !!! 

spark-submit --class  org.apache.spark.examples.mllib.LinearRegression
--master local[2] lib/spark-examples-1.0.2-hadoop2.2.0.jar
~/git/spark/data/mllib/sample_linear_regression_data.txt



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Mapping Hadoop Reduce to Spark

2014-08-30 Thread Steve Lewis
When programming in Hadoop it is possible to guarantee
1) All keys sent to a specific partition will be handled by the same
machine (thread)
2) All keys received by a specific machine (thread) will be received in
sorted order
3) These conditions will hold even if the values associated with a specific
key are too large enough to fit in memory.

In my Hadoop code I use all of these conditions - specifically with my
larger data sets the size of data I wish to group exceeds the available
memory.

I think I understand the operation of groupby but my understanding is that
this requires that the results for a single key, and perhaps all keys fit
on a single machine.

Is there away to perform like Hadoop ad not require that an entire group
fir in memory?


Re: org.apache.spark.examples.xxx

2014-08-30 Thread filipus
compilation works but execution not at least with spark-submit as I described
above

when I make a local copy of the training set I can execute sbt "run file"
which works

sbt "run sample_linear_regression_data.txt"

when I do

sbt "run ~/git/spark/data/mllib/sample_linear_regression_data.txt"

the program fails because it doesnt find any traning set at

[error] (run-main-0) org.apache.hadoop.mapred.InvalidInputException: Input
path does not exist:
file:*/home/filip/spark-ex-regression/*~/git/spark/data/mllib/sample_linear_regression_data.txt
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
file:/home/filip/spark-ex-regression/~/git/spark/data/mllib/sample_linear_regression_data.txt

ps: does anybody knows where in the program "LinearRegression.scala" it
specifies the PATH or has it to do with "sbt"???



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: org.apache.spark.examples.xxx

2014-08-30 Thread Ted Yu
Did you run sbt under /home/filip/spark-ex-regression ?

'~/git/spark/data/mllib/sample_linear_regression_data.txt' was interpreted
as rooted under /home/filip/spark-ex-regression

Cheers


On Sat, Aug 30, 2014 at 9:28 AM, filipus  wrote:

> compilation works but execution not at least with spark-submit as I
> described
> above
>
> when I make a local copy of the training set I can execute sbt "run file"
> which works
>
> sbt "run sample_linear_regression_data.txt"
>
> when I do
>
> sbt "run ~/git/spark/data/mllib/sample_linear_regression_data.txt"
>
> the program fails because it doesnt find any traning set at
>
> [error] (run-main-0) org.apache.hadoop.mapred.InvalidInputException: Input
> path does not exist:
>
> file:*/home/filip/spark-ex-regression/*~/git/spark/data/mllib/sample_linear_regression_data.txt
> org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
>
> file:/home/filip/spark-ex-regression/~/git/spark/data/mllib/sample_linear_regression_data.txt
>
> ps: does anybody knows where in the program "LinearRegression.scala" it
> specifies the PATH or has it to do with "sbt"???
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13180.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: org.apache.spark.examples.xxx

2014-08-30 Thread filipus
ok I see :-)

.. instead of ~ works fine so

do you know the reason

sbt "run [options]" works 

after sbt package 

but 

spark-submit --class "ClassName" --master local[2]
target/scala/JarPackage.jar [options]

doesnt?

it cannot resolve everything somehow



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Mapping Hadoop Reduce to Spark

2014-08-30 Thread Matei Zaharia
In 1.1, you'll be able to get all of these properties using sortByKey, and then 
mapPartitions on top to iterate through the key-value pairs. Unfortunately 
sortByKey does not let you control the Partitioner, but it's fairly easy to 
write your own version that does if this is important.

In previous versions, the values for each key had to fit in memory (though we 
could have data on disk across keys), and this is still true for groupByKey, 
cogroup and join. Those restrictions will hopefully go away in a later release. 
But sortByKey + mapPartitions lets you just iterate through the key-value pairs 
without worrying about this.

Matei

On August 30, 2014 at 9:04:37 AM, Steve Lewis (lordjoe2...@gmail.com) wrote:

When programming in Hadoop it is possible to guarantee
1) All keys sent to a specific partition will be handled by the same machine 
(thread)
2) All keys received by a specific machine (thread) will be received in sorted 
order
3) These conditions will hold even if the values associated with a specific key 
are too large enough to fit in memory.

In my Hadoop code I use all of these conditions - specifically with my larger 
data sets the size of data I wish to group exceeds the available memory.

I think I understand the operation of groupby but my understanding is that this 
requires that the results for a single key, and perhaps all keys fit on a 
single machine.

Is there away to perform like Hadoop ad not require that an entire group fir in 
memory?



Re: Spark Hive max key length is 767 bytes

2014-08-30 Thread Denny Lee
Oh, you may be running into an issue with your MySQL setup actually, try running

alter database metastore_db character set latin1

so that way Hive (and the Spark HiveContext) can execute properly against the 
metastore.


On August 29, 2014 at 04:39:01, arthur.hk.c...@gmail.com 
(arthur.hk.c...@gmail.com) wrote:

Hi,


Tried the same thing in HIVE directly without issue:

HIVE:
hive> create table test_datatype2 (testbigint bigint );
OK
Time taken: 0.708 seconds

hive> drop table test_datatype2;
OK
Time taken: 23.272 seconds



Then tried again in SPARK:
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
14/08/29 19:33:52 INFO Configuration.deprecation: 
mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
mapreduce.reduce.speculative
hiveContext: org.apache.spark.sql.hive.HiveContext = 
org.apache.spark.sql.hive.HiveContext@395c7b94

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
res0: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[0] at RDD at SchemaRDD.scala:104
== Query Plan ==


scala> hiveContext.hql("drop table test_datatype3")

14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
adding/validating class(es) : Specified key was too long; max key length is 767 
bytes
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no 
possible candidates
Error(s) were found while auto-creating/validating the datastore for classes. 
The errors are printed in the log, and are attached to this exception.
org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while 
auto-creating/validating the datastore for classes. The errors are printed in 
the log, and are attached to this exception.
at 
org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)


Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified 
key was too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
"embedded-only" so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so 
does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
"embedded-only" so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so 
does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
"embedded-only" so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so 
does not have its own datastore table.
14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while 
adding/validating class(es) : Specified key was too long; max key length is 767 
bytes
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)


Can anyone please help?

Regards
Arthur


On 29 Aug, 2014, at 12:47 pm, arthur.hk.c...@gmail.com 
 wrote:

(Please ignore if duplicated) 


Hi,

I use Spark 1.0.2 with Hive 0.13.1

I have already set the hive mysql database to latine1; 

mysql:
alter database hive character set latin1;

Spark:
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> hiveContext.hql("create table test_datatype1 (testbigint bigint )")
scala> hiveContext.hql("drop table test_datatype1")


14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
"embedded-only" so does not have its own datastore table.
14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so 
does not have its own datastore table.
14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
"embedded-only" so does not have its own datastore table.
14/08/29 12:31:55 INFO DataNucleus.

Re: Spark Hive max key length is 767 bytes

2014-08-30 Thread arthur.hk.c...@gmail.com
Hi,

Already done but still get the same error:

(I use HIVE 0.13.1 Spark 1.0.2, Hadoop 2.4.1)

Steps:
Step 1) mysql:
> 
>> alter database hive character set latin1;
Step 2) HIVE:
>> hive> create table test_datatype2 (testbigint bigint );
>> OK
>> Time taken: 0.708 seconds
>> 
>> hive> drop table test_datatype2;
>> OK
>> Time taken: 23.272 seconds
Step 3) scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> 14/08/29 19:33:52 INFO Configuration.deprecation: 
>> mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
>> mapreduce.reduce.speculative
>> hiveContext: org.apache.spark.sql.hive.HiveContext = 
>> org.apache.spark.sql.hive.HiveContext@395c7b94
>> scala> hiveContext.hql(“create table test_datatype3 (testbigint bigint)”)
>> res0: org.apache.spark.sql.SchemaRDD = 
>> SchemaRDD[0] at RDD at SchemaRDD.scala:104
>> == Query Plan ==
>> 
>> scala> hiveContext.hql("drop table test_datatype3")
>> 
>> 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
>> adding/validating class(es) : Specified key was too long; max key length is 
>> 767 bytes
>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
>> too long; max key length is 767 bytes
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at 
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
>> 
>> 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
>> org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in 
>> no possible candidates
>> Error(s) were found while auto-creating/validating the datastore for 
>> classes. The errors are printed in the log, and are attached to this 
>> exception.
>> org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found 
>> while auto-creating/validating the datastore for classes. The errors are 
>> printed in the log, and are attached to this exception.
>> at 
>> org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)
>> 
>> 
>> Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: 
>> Specified key was too long; max key length is 767 bytes
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)



Should I use HIVE 0.12.0 instead of HIVE 0.13.1?

Regards
Arthur

On 31 Aug, 2014, at 6:01 am, Denny Lee  wrote:

> Oh, you may be running into an issue with your MySQL setup actually, try 
> running
> 
> alter database metastore_db character set latin1
> 
> so that way Hive (and the Spark HiveContext) can execute properly against the 
> metastore.
> 
> 
> On August 29, 2014 at 04:39:01, arthur.hk.c...@gmail.com 
> (arthur.hk.c...@gmail.com) wrote:
> 
>> Hi,
>> 
>> 
>> Tried the same thing in HIVE directly without issue:
>> 
>> HIVE:
>> hive> create table test_datatype2 (testbigint bigint );
>> OK
>> Time taken: 0.708 seconds
>> 
>> hive> drop table test_datatype2;
>> OK
>> Time taken: 23.272 seconds
>> 
>> 
>> 
>> Then tried again in SPARK:
>> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> 14/08/29 19:33:52 INFO Configuration.deprecation: 
>> mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
>> mapreduce.reduce.speculative
>> hiveContext: org.apache.spark.sql.hive.HiveContext = 
>> org.apache.spark.sql.hive.HiveContext@395c7b94
>> 
>> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> res0: org.apache.spark.sql.SchemaRDD = 
>> SchemaRDD[0] at RDD at SchemaRDD.scala:104
>> == Query Plan ==
>> 
>> 
>> scala> hiveContext.hql("drop table test_datatype3")
>> 
>> 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
>> adding/validating class(es) : Specified key was too long; max key length is 
>> 767 bytes
>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
>> too long; max key length is 767 bytes
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at 
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
>> 
>> 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
>> org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in 
>> no possible candidates
>> Error(s) were found while auto-creating/validating the datastore for 
>> classes. The errors are printed in the log, and are attached to this 
>> exception.
>> org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found 
>> while auto-creating/validating the datastore for classes. The errors are 
>> printed in the log, and are attached to this exception.
>> at 
>> org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)
>> 
>> 
>> Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: 
>> Specified key was too long; max key length is 767 bytes
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 

Spark Master/Slave and HA

2014-08-30 Thread arthur.hk.c...@gmail.com
Hi,

I have few questions about Spark Master and Slave setup:

Here, I have 5 Hadoop nodes (n1, n2, n3, n4, and n5 respectively), at the 
moment I run Spark under these nodes:
n1:Hadoop Active Name node, Hadoop Slave
Spark Active Master 
n2:Hadoop Standby Name Node,Hadoop Salve
Spark Slave
n3: Hadoop Salve
Spark Slave 
n4: Hadoop Salve
Spark Slave 
n5: Hadoop Salve
Spark Slave 

Questions:
Q1: If I set n1 as both Spark Master and Spark Slave, I cannot start the Spark 
Cluster. does it mean that, unlike Hadoop, I cannot use the same machine to be 
both MASTER and SLAVE in Spark?
n1:Hadoop Active Name node, Hadoop Slave
Spark Active Master Spark Slave (failed to Start Spark)
n2:Hadoop Standby Name Node,Hadoop Salve
Spark Slave
n3: Hadoop Salve
Spark Slave 
n4: Hadoop 
SalveSpark Slave 
n5: Hadoop Salve
Spark Slave 

Q2: I am planning Spark HA, what if I use n2 as "Spark Standby Master and Spark 
Slave”? is Spark allowed to run Standby Master and Slave under same machine?
n1:Hadoop Active Name node, Hadoop Slave
Spark Active Master 
n2:Hadoop Standby Name Node,Hadoop SalveSpark 
Standby MasterSpark Slave 
n3: Hadoop Salve
Spark Slave 
n4: Hadoop Salve
Spark Slave 
n5:  Hadoop Salve   
Spark Slave 

Q3: Does the Spark Master node do actual computation work like a worker or just 
a pure monitoring node? 

Regards
Arthur

Spark and Shark Node: RAM Allocation

2014-08-30 Thread arthur.hk.c...@gmail.com
Hi,

Is there any formula to calculate proper RAM allocation values for Spark and 
Shark based on Physical RAM, HADOOP and HBASE RAM usage?
e.g. if a node has 32GB physical RAM


spark-defaults.conf
spark.executor.memory   ?g

spark-env.sh
export SPARK_WORKER_MEMORY=?
export HADOOP_HEAPSIZE=?


shark-env.sh
export SPARK_MEM=?g
export SHARK_MASTER_MEM=?g

spark-defaults.conf
spark.executor.memory   ?g


Regards
Arthur




Fwd: What does "appMasterRpcPort: -1" indicate ?

2014-08-30 Thread Tao Xiao
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.

Following How-to: Run a Simple Apache Spark App in CDH 5

, I tried to submit my job in local mode, Spark Standalone mode and YARN
mode. I successfully submitted my job in local mode and Standalone mode,
however, I noticed the following messages printed on console when I
submitted my job in YARN mode:

 14/08/29 22:27:29 INFO Client: Submitting application to ASM
14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
application_1406949333981_0015
14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: 0
 appStartTime: 1409365649836
 yarnAppState: RUNNING

The job finished successfully and produced correct results.
But I'm not sure what those messages mean? Does "appMasterRpcPort: -1" indicate
an error or exception ?

Thanks


Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Roger Hoover
I have this same question.  Isn't there somewhere that the Kafka range
metadata can be saved?  From my naive perspective, it seems like it should
be very similar to HDFS lineage.  The original HDFS blocks are kept
somewhere (in the driver?) so that if an RDD partition is lost, it can be
recomputed.  In this case, all we need is the Kafka topic, partition, and
offset range.

Can someone enlighten us on why two copies of the RDD are needed (or some
other mechanism like a WAL) for fault tolerance when using Kafka but not
when reading from say HDFS?


On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges  wrote:

> 'this 2-node replication is mainly for failover in case the receiver dies
> while data is in flight.  there's still chance for data loss as there's no
> write ahead log on the hot path, but this is being addressed.'
>
> Can you comment a little on how this will be addressed, will there be a
> durable WAL?  Is there a JIRA for tracking this effort?
>
> I am curious without WAL if you can avoid this data loss with explicit
> management of Kafka offsets e.g. don't commit offset unless data is
> replicated to multiple nodes or maybe not until processed.  The incoming
> data will always be durably stored to disk in Kafka so can be replayed in
> failure scenarios to avoid data loss if the offsets are managed properly.
>
>
>
>
> On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly  wrote:
>
>> @bharat-
>>
>> overall, i've noticed a lot of confusion about how Spark Streaming scales
>> - as well as how it handles failover and checkpointing, but we can discuss
>> that separately.
>>
>> there's actually 2 dimensions to scaling here:  receiving and processing.
>>
>> *Receiving*
>> receiving can be scaled out by submitting new DStreams/Receivers to the
>> cluster as i've done in the Kinesis example.  in fact, i purposely chose to
>> submit multiple receivers in my Kinesis example because i feel it should be
>> the norm and not the exception - particularly for partitioned and
>> checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
>> only way to scale.
>>
>> a side note here is that each receiver running in the cluster will
>> immediately replicates to 1 other node for fault-tolerance of that specific
>> receiver.  this is where the confusion lies.  this 2-node replication is
>> mainly for failover in case the receiver dies while data is in flight.
>>  there's still chance for data loss as there's no write ahead log on the
>> hot path, but this is being addressed.
>>
>> this in mentioned in the docs here:
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>
>> *Processing*
>> once data is received, tasks are scheduled across the Spark cluster just
>> like any other non-streaming task where you can specify the number of
>> partitions for reduces, etc.  this is the part of scaling that is sometimes
>> overlooked - probably because it "works just like regular Spark", but it is
>> worth highlighting.
>>
>> Here's a blurb in the docs:
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing
>>
>> the other thing that's confusing with Spark Streaming is that in Scala,
>> you need to explicitly
>>
>> import
>> org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
>>
>> in order to pick up the implicits that allow DStream.reduceByKey and such
>> (versus DStream.transform(rddBatch => rddBatch.reduceByKey())
>>
>> in other words, DStreams appear to be relatively featureless until you
>> discover this implicit.  otherwise, you need to operate on the underlying
>> RDD's explicitly which is not ideal.
>>
>> the Kinesis example referenced earlier in the thread uses the DStream
>> implicits.
>>
>>
>> side note to all of this - i've recently convinced my publisher for my
>> upcoming book, Spark In Action, to let me jump ahead and write the Spark
>> Streaming chapter ahead of other more well-understood libraries.  early
>> release is in a month or so.  sign up  @ http://sparkinaction.com if you
>> wanna get notified.
>>
>> shameless plug that i wouldn't otherwise do, but i really think it will
>> help clear a lot of confusion in this area as i hear these questions asked
>> a lot in my talks and such.  and i think a clear, crisp story on scaling
>> and fault-tolerance will help Spark Streaming's adoption.
>>
>> hope that helps!
>>
>> -chris
>>
>>
>>
>>
>> On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> I agree. This issue should be fixed in Spark rather rely on replay of
>>> Kafka messages.
>>>
>>> Dib
>>> On Aug 28, 2014 6:45 AM, "RodrigoB"  wrote:
>>>
 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that
 the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major

Powered By Spark

2014-08-30 Thread Yi Tian
Hi, 

Could you please add Asiainfo to the Powered By Spark page?

Thanks

Asiainfo
www.asiainfo.com
Core, SQL, Streaming, MLlib, GraphX
 
We leverage Spark and Hadoop ecosystem to build cost effective data center 
solution for our customer in teleco industry as well as other industrial 
sectors. Meantime we also build innovative big data applications to help our 
customer in real time marketing, cross product selling, customer behavior 
analysis as well as other areas by using Spark technology.


Yi Tian

Re: saveAsSequenceFile for DStream

2014-08-30 Thread Chris Fregly
couple things to add here:

1) you can import the
org.apache.spark.streaming.dstream.PairDStreamFunctions implicit which adds
a whole ton of functionality to DStream itself.  this lets you work at the
DStream level versus digging into the underlying RDDs.

2) you can use ssc.fileStream(directory) to create an input stream made up
of files in a given directory.  new files will be added to the stream as
they appear in that directory.  note:  files must be immutable.


On Tue, Jul 22, 2014 at 8:39 AM, Barnaby Falls  wrote:

> Thanks Sean! I got that working last night similar to how you solved it.
> Any ideas about how to monitor that same folder in another script by
> creating a stream? I can use sc.sequenceFile() to read in the RDD, but how
> do I get the name of the file that got added since there is no
> sequenceFileStream() method? Thanks again for your help.
>
> > On Jul 22, 2014, at 1:57, "Sean Owen"  wrote:
> >
> > What about simply:
> >
> > dstream.foreachRDD(_.saveAsSequenceFile(...))
> >
> > ?
> >
> >> On Tue, Jul 22, 2014 at 2:06 AM, Barnaby  wrote:
> >> First of all, I do not know Scala, but learning.
> >>
> >> I'm doing a proof of concept by streaming content from a socket,
> counting
> >> the words and write it to a Tachyon disk. A different script will read
> the
> >> file stream and print out the results.
> >>
> >> val lines = ssc.socketTextStream(args(0), args(1).toInt,
> >> StorageLevel.MEMORY_AND_DISK_SER)
> >> val words = lines.flatMap(_.split(" "))
> >> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> >> wordCounts.saveAs???Files("tachyon://localhost:19998/files/WordCounts")
> >> ssc.start()
> >> ssc.awaitTermination()
> >>
> >> I already did a proof of concept to write and read sequence files but
> there
> >> doesn't seem to be a saveAsSequenceFiles() method in DStream. What is
> the
> >> best way to write out an RDD to a stream so that the timestamps are in
> the
> >> filenames and so there is minimal overhead in reading the data back in
> as
> >> "objects", see below.
> >>
> >> My simple successful proof was the following:
> >> val rdd =  sc.parallelize(Array(("a",2), ("b",3), ("c",1)))
> >> rdd.saveAsSequenceFile("tachyon://.../123.sf2")
> >> val rdd2 = sc.sequenceFile[String,Int]("tachyon://.../123.sf2")
> >>
> >> How can I do something similar with streaming?
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


How can a "deserialized Java object" be stored on disk?

2014-08-30 Thread Tao Xiao
Reading about RDD Persistency
,
I
learned that the storage level "MEMORY_AND_DISK" means that " Store RDD as
deserialized Java objects in the JVM. If the RDD does not fit in memory,
store the partitions that don't fit on disk, and read them from there when
they're needed. "

But how can a "deserialized Java object" be stored on disk? As far as I
know, a Java object should be stored as an array of bytes on disk, which
means that Java object should be firtly converted into an array of bytes (a
serialized object).

Thanks .


Re: data locality

2014-08-30 Thread Chris Fregly
you can view the Locality Level of each task within a stage by using the
Spark Web UI under the Stages tab.

levels are as follows (in order of decreasing desirability):
1) PROCESS_LOCAL <- data was found directly in the executor JVM
2) NODE_LOCAL <- data was found on the same node as the executor JVM
3) RACK_LOCAL <- data was found in the same rack
4) ANY <- outside the rack

also, the Aggregated Metrics by Executor section of the Stage detail view
shows how much data is being shuffled across the network (Shuffle
Read/Write).  0 is where you wanna be with that metric.

-chris


On Fri, Jul 25, 2014 at 4:13 AM, Tsai Li Ming  wrote:

> Hi,
>
> In the standalone mode, how can we check data locality is working as
> expected when tasks are assigned?
>
> Thanks!
>
>
> On 23 Jul, 2014, at 12:49 am, Sandy Ryza  wrote:
>
> On standalone there is still special handling for assigning tasks within
> executors.  There just isn't special handling for where to place executors,
> because standalone generally places an executor on every node.
>
>
> On Mon, Jul 21, 2014 at 7:42 PM, Haopu Wang  wrote:
>
>>   Sandy,
>>
>>
>>
>> I just tried the standalone cluster and didn't have chance to try Yarn
>> yet.
>>
>> So if I understand correctly, there are **no** special handling of task
>> assignment according to the HDFS block's location when Spark is running as
>> a **standalone** cluster.
>>
>> Please correct me if I'm wrong. Thank you for your patience!
>>
>>
>>  --
>>
>> *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
>> *Sent:* 2014年7月22日 9:47
>>
>> *To:* user@spark.apache.org
>> *Subject:* Re: data locality
>>
>>
>>
>> This currently only works for YARN.  The standalone default is to place
>> an executor on every node for every job.
>>
>>
>>
>> The total number of executors is specified by the user.
>>
>>
>>
>> -Sandy
>>
>>
>>
>> On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang  wrote:
>>
>> Sandy,
>>
>>
>>
>> Do you mean the “preferred location” is working for standalone cluster
>> also? Because I check the code of SparkContext and see comments as below:
>>
>>
>>
>>   // This is used only by YARN for now, but should be relevant to other
>> cluster types (*Mesos*,
>>
>>   // etc) too. This is typically generated from
>> InputFormatInfo.computePreferredLocations. It
>>
>>   // contains a map from *hostname* to a list of input format splits on
>> the host.
>>
>>   *private*[spark] *var* preferredNodeLocationData: Map[String,
>> Set[SplitInfo]] = Map()
>>
>>
>>
>> BTW, even with the preferred hosts, how does Spark decide how many total
>> executors to use for this application?
>>
>>
>>
>> Thanks again!
>>
>>
>>  --
>>
>> *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
>> *Sent:* Friday, July 18, 2014 3:44 PM
>> *To:* user@spark.apache.org
>> *Subject:* Re: data locality
>>
>>
>>
>> Hi Haopu,
>>
>>
>>
>> Spark will ask HDFS for file block locations and try to assign tasks
>> based on these.
>>
>>
>>
>> There is a snag.  Spark schedules its tasks inside of "executor"
>> processes that stick around for the lifetime of a Spark application.  Spark
>> requests executors before it runs any jobs, i.e. before it has any
>> information about where the input data for the jobs is located.  If the
>> executors occupy significantly fewer nodes than exist in the cluster, it
>> can be difficult for Spark to achieve data locality.  The workaround for
>> this is an API that allows passing in a set of preferred locations when
>> instantiating a Spark context.  This API is currently broken in Spark 1.0,
>> and will likely changed to be something a little simpler in a future
>> release.
>>
>>
>>
>> val locData = InputFormatInfo.computePreferredLocations
>>
>>   (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new
>> Path(“myfile.txt”)))
>>
>>
>>
>> val sc = new SparkContext(conf, locData)
>>
>>
>>
>> -Sandy
>>
>>
>>
>>
>>
>> On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang  wrote:
>>
>> I have a standalone spark cluster and a HDFS cluster which share some of
>> nodes.
>>
>>
>>
>> When reading HDFS file, how does spark assign tasks to nodes? Will it ask
>> HDFS the location for each file block in order to get a right worker node?
>>
>>
>>
>> How about a spark cluster on Yarn?
>>
>>
>>
>> Thank you very much!
>>
>>
>>
>>
>>
>>
>>
>
>
>


Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Tim Smith
I'd be interested to understand this mechanism as well. But this is the
error recovery part of the equation. Consuming from Kafka has two aspects -
parallelism and error recovery and I am not sure how either works. For
error recovery, I would like to understand how:
- A failed receiver gets re-spawned. In 1.0.0, despite settings failed
tasks threshold to 64, my job aborts after 4 receiver task failures.
- Data loss recovery due to a failed receiver task/executor.


> For parallelism, I would expect a single createStream() to intelligently
map a receiver thread somewhere, one for each kafka partition, but in
different JVMs. Also, repartition() does not seem to work as advertised. A
repartition(512) should get nodes other than the receiver nodes to get some
RDDs to process. No?


On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover 
wrote:

> I have this same question.  Isn't there somewhere that the Kafka range
> metadata can be saved?  From my naive perspective, it seems like it should
> be very similar to HDFS lineage.  The original HDFS blocks are kept
> somewhere (in the driver?) so that if an RDD partition is lost, it can be
> recomputed.  In this case, all we need is the Kafka topic, partition, and
> offset range.
>
> Can someone enlighten us on why two copies of the RDD are needed (or some
> other mechanism like a WAL) for fault tolerance when using Kafka but not
> when reading from say HDFS?
>

>
> On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges 
> wrote:
>
>> 'this 2-node replication is mainly for failover in case the receiver
>> dies while data is in flight.  there's still chance for data loss as
>> there's no write ahead log on the hot path, but this is being addressed.'
>>
>> Can you comment a little on how this will be addressed, will there be a
>> durable WAL?  Is there a JIRA for tracking this effort?
>>
>> I am curious without WAL if you can avoid this data loss with explicit
>> management of Kafka offsets e.g. don't commit offset unless data is
>> replicated to multiple nodes or maybe not until processed.  The incoming
>> data will always be durably stored to disk in Kafka so can be replayed in
>> failure scenarios to avoid data loss if the offsets are managed properly.
>>
>>
>>
>>
>> On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly  wrote:
>>
>>> @bharat-
>>>
>>> overall, i've noticed a lot of confusion about how Spark Streaming
>>> scales - as well as how it handles failover and checkpointing, but we can
>>> discuss that separately.
>>>
>>> there's actually 2 dimensions to scaling here:  receiving and processing.
>>>
>>> *Receiving*
>>> receiving can be scaled out by submitting new DStreams/Receivers to the
>>> cluster as i've done in the Kinesis example.  in fact, i purposely chose to
>>> submit multiple receivers in my Kinesis example because i feel it should be
>>> the norm and not the exception - particularly for partitioned and
>>> checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
>>> only way to scale.
>>>
>>> a side note here is that each receiver running in the cluster will
>>> immediately replicates to 1 other node for fault-tolerance of that specific
>>> receiver.  this is where the confusion lies.  this 2-node replication is
>>> mainly for failover in case the receiver dies while data is in flight.
>>>  there's still chance for data loss as there's no write ahead log on the
>>> hot path, but this is being addressed.
>>>
>>> this in mentioned in the docs here:
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>>
>>> *Processing*
>>> once data is received, tasks are scheduled across the Spark cluster just
>>> like any other non-streaming task where you can specify the number of
>>> partitions for reduces, etc.  this is the part of scaling that is sometimes
>>> overlooked - probably because it "works just like regular Spark", but it is
>>> worth highlighting.
>>>
>>> Here's a blurb in the docs:
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing
>>>
>>> the other thing that's confusing with Spark Streaming is that in Scala,
>>> you need to explicitly
>>>
>>> import
>>> org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
>>>
>>> in order to pick up the implicits that allow DStream.reduceByKey and
>>> such (versus DStream.transform(rddBatch => rddBatch.reduceByKey())
>>>
>>> in other words, DStreams appear to be relatively featureless until you
>>> discover this implicit.  otherwise, you need to operate on the underlying
>>> RDD's explicitly which is not ideal.
>>>
>>> the Kinesis example referenced earlier in the thread uses the DStream
>>> implicits.
>>>
>>>
>>> side note to all of this - i've recently convinced my publisher for my
>>> upcoming book, Spark In Action, to let me jump ahead and write the Spark
>>> Streaming chapter ahead of other more well-understood libraries.  early
>>> release is in a month or so.  sign up  @ ht

Re: Readin from Amazon S3 behaves inconsistently: return different number of lines...

2014-08-30 Thread Chris Fregly
interesting and possibly-related blog post from netflix earlier this year:
http://techblog.netflix.com/2014/01/s3mper-consistency-in-cloud.html


On Fri, Aug 1, 2014 at 8:09 AM, nit  wrote:

> @sean - I am using latest code from master branch, up to commit#
> a7d145e98c55fa66a541293930f25d9cdc25f3b4 .
>
> In my case I have multiple directories with 1024 files(in that sizes of
> files may be different). For some directories I always get consistent
> result... and for others I can reproduce the inconsistent behavior.
>
> I am not much familiar with S3 protocol or s3 driver in spark. I am
> wondering, how does s3 driver verifies that all files(and their content)
> under a directory were correctly?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Reading-from-Amazon-S3-directory-via-textFile-api-behaves-inconsistently-tp11092p11170.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>