Re: Serialization issue when using HBase with Spark

2014-12-15 Thread Shixiong Zhu
Just point out a bug in your codes. You should not use `mapPartitions` like
that. For details, I recommend Section "setup() and cleanup()" in Sean
Owen's post:
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

Best Regards,
Shixiong Zhu

2014-12-14 16:35 GMT+08:00 Yanbo :
>
> In #1, class HTable can not be serializable.
> You also need to check you self defined function getUserActions and make
> sure it is a member function of one class who implement serializable
> interface.
>
> 发自我的 iPad
>
> > 在 2014年12月12日,下午4:35,yangliuyu  写道:
> >
> > The scenario is using HTable instance to scan multiple rowkey range in
> Spark
> > tasks look likes below:
> > Option 1:
> > val users = input
> >  .map { case (deviceId, uid) =>
> > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
> >  val conf = HBaseConfiguration.create()
> >  val table = new HTable(conf, "actions")
> >  val result = iterator.map{ userId=>
> >(userId, getUserActions(table, userId, timeStart, timeStop))
> >  }
> >  table.close()
> >  result
> >})
> >
> > But got the exception:
> > org.apache.spark.SparkException: Task not serializable
> >at
> >
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> >at
> > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> >at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> >at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> >at
> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)...
> > ...
> > Caused by: java.io.NotSerializableException:
> > org.apache.hadoop.conf.Configuration
> >at
> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> >at
> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >at
> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >
> > The reason not using sc.newAPIHadoopRDD is it only support one scan each
> > time.
> > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
> >  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> >  classOf[org.apache.hadoop.hbase.client.Result])
> >
> > And if using MultiTableInputFormat, driver is not possible put all
> rowkeys
> > into HBaseConfiguration
> > Option 2:
> > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
> >  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> >  classOf[org.apache.hadoop.hbase.client.Result])
> >
> > It may divide all rowkey ranges into several parts then use option 2,
> but I
> > prefer option 1. So is there any solution for option 1?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark inserting into parquet files with different schema

2014-12-15 Thread AdamPD
Hi all,

I understand that parquet allows for schema versioning automatically in the
format; however, I'm not sure whether Spark supports this.

I'm saving a SchemaRDD to a parquet file, registering it as a table, then
doing an insertInto with a SchemaRDD with an extra column.

The second SchemaRDD does in fact get inserted, but the extra column isn't
present when I try to query it with Spark SQL.

Is there anything I can do to get this working how I'm hoping?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-inserting-into-parquet-files-with-different-schema-tp20681.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD vs Broadcast

2014-12-15 Thread elitejyo
We are developing Spark framework wherein we are moving historical data into
RDD sets.

Basically, RDD is immutable, read only dataset on which we do operations.
Based on that we have moved historical data into RDD and we do computations
like filtering/mapping, etc on such RDDs.

Now there is a use case where a subset of the data in the RDD gets updated
and we have to recompute the values.

So far I have been able to think of below approaches -

Approach1 - broadcast the change: 
1. I have already filtered the historical RDD on scope
2. Whenever there is an update on the values, I apply a map phase on /RDD at
step1/ by doing a lookup on the broadcast, thereby creating a new RDD
3. now I do all the computations again on this new /RDD at step2/

Approach2:
1. Maintain historical data RDDs 
2. Maintain /Delta/ RDDs on the historical data. Since initially there are
no updates it will be an empty RDD
3. Whenever there is an update on the values, create a new /Delta/ RDD and
discard the old value
4. Recompute the values by doing a join between historical RDDs and /Delta/
RDDs

Approach 3:
I had thought of /Delta/ RDD to be a streaming RDD as well where I keep
updating the same RDD and do re-computation. But as far as I understand it
can take streams from Flume or Kafka. Whereas in my case the values are
generated in the application itself based on user interaction.
Hence I cannot see any integration points of streaming RDD in my context.

Any suggestion on which approach is better or any other approach suitable
for this scenario.

TIA!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-vs-Broadcast-tp20682.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Adding a column to a SchemaRDD

2014-12-15 Thread Yanbo Liang
Hi Nathan,

#1

Spark SQL & DSL can satisfy your requirement. You can refer the following
code snippet:

jdata.select(Star(Node), 'seven.getField("mod"), 'eleven.getField("mod"))

You need to import org.apache.spark.sql.catalyst.analysis.Star in advance.

#2

After you make the transform above, you do not need to make SchemaRDD
manually.
Because that jdata.select() return a SchemaRDD and you can operate on it
directly.

For example, the following code snippet will return a new SchemaRDD with
longer Row:

val t1 = jdata.select(Star(Node), 'seven.getField("mod") +
'eleven.getField("mod")  as 'mod_sum)

You can use t1.printSchema() to print the schema of this SchemaRDD and
check whether it satisfy your requirements.



2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld :
>
> (1) I understand about immutability, that's why I said I wanted a new
> SchemaRDD.
> (2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and
> results in a new SchemaRDD with one new function.
> (3) The DSL stuff is a big clue, but I can't find adequate documentation
> for it
>
> What I'm looking for is something like:
>
> import org.apache.spark.sql._
>
>
> val sqlc = new SQLContext(sc)
> import sqlc._
>
>
> val data = sc.parallelize(0 to 99).map(n =>
> ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
>   "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n
> % 11, n * 11))
> val jdata = sqlc.jsonRDD(data)
> jdata.registerTempTable("jdata")
>
>
> val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
> FROM jdata")
>
>
> This sqlVersion works fine, but if I try to do the same thing with a
> programatic function, I'm missing a bunch of pieces:
>
>- I assume I'd need to start with something like:
>jdata.select('*, 'seven.mod, 'eleven.mod)
>and then get and process the last two elements.  The problems are:
>   - I can't select '* - there seems no way to get the complete row
>   - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
>   seems only one deep.
>- Assuming I could do that, I don't see a way to make the result into
>a SchemaRDD.  I assume I would have to do something like:
>   1. take my row and value, and create a new, slightly longer row
>   2. take my old schema, and create a new schema with one more field
>   at the end, named and typed appropriately
>   3. combine the two into a SchemaRDD
>   I think I see how to do 3, but 1 and 2 elude me.
>
> Is there more complete documentation somewhere for the DSL portion? Anyone
> have a clue about any of the above?
>
>
>
> On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang  wrote:
>
>> RDD is immutable so you can not modify it.
>> If you want to modify some value or schema in RDD,  using map to generate
>> a new RDD.
>> The following code for your reference:
>>
>> def add(a:Int,b:Int):Int = {
>>   a + b
>> }
>>
>> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
>> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
>> d2.foreach(println)
>>
>>
>> Otherwise, if your self-defining function is straightforward and you can
>> represent it by SQL, using Spark SQL or DSL is also a good choice.
>>
>> case class Person(id: Int, score: Int, value: Int)
>>
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>
>> import sqlContext._
>>
>> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
>> val d2 = d1.select('id, 'score, 'id + 'score)
>> d2.foreach(println)
>>
>>
>> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld 
>> :
>>
>>> Hi, there.
>>>
>>> I'm trying to understand how to augment data in a SchemaRDD.
>>>
>>> I can see how to do it if can express the added values in SQL - just run
>>> "SELECT *,valueCalculation AS newColumnName FROM table"
>>>
>>> I've been searching all over for how to do this if my added value is a
>>> scala function, with no luck.
>>>
>>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add
>>> a new column, D, calculated using Utility.process(b, c), and I want (of
>>> course) to pass in the value B and C from each row, ending up with a new
>>> SchemaRDD with columns A, B, C, and D.
>>>
>>> Is this possible? If so, how?
>>>
>>> Thanks,
>>>-Nathan
>>>
>>> --
>>> Nathan Kronenfeld
>>> Senior Visualization Developer
>>> Oculus Info Inc
>>> 2 Berkeley Street, Suite 600,
>>> Toronto, Ontario M5A 4J5
>>> Phone:  +1-416-203-3003 x 238
>>> Email:  nkronenf...@oculusinfo.com
>>>
>>
>>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com
>


Re: Does filter on an RDD scan every data item ?

2014-12-15 Thread nsareen
Thanks! shall try it out.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20683.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Run Spark job on Playframework + Spark Master/Worker in one Mac

2014-12-15 Thread Tomoya Igarashi
Hi all,

I am trying to run Spark job on Playframework + Spark Master/Worker in one
Mac.
When job ran, I encountered java.lang.ClassNotFoundException.
Would you teach me how to solve it?

Here is my code in Github.
https://github.com/TomoyaIgarashi/spark_cluster_sample

* Envrionments:
Mac 10.9.5
Java 1.7.0_71
Playframework 2.2.3
Spark 1.1.1

* Setup history:
> cd ~
> git clone g...@github.com:apache/spark.git
> cd spark
> git checkout -b v1.1.1 v1.1.1
> sbt/sbt assembly
> vi ~/.bashrc
export SPARK_HOME=/Users/tomoya/spark
> . ~/.bashrc
> hostname
Tomoya-Igarashis-MacBook-Air.local
> vi $SPARK_HOME/conf/slaves
Tomoya-Igarashis-MacBook-Air.local
> play new spark_cluster_sample
default name
type -> scala

* Run history:
> $SPARK_HOME/sbin/start-all.sh
> jps
> which play
/Users/tomoya/play/play
> git clone https://github.com/TomoyaIgarashi/spark_cluster_sample
> cd spark_cluster_sample
> play run

* Error trace:
Here is error trace in Gist.
https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511

Regards


Why my SQL UDF cannot be registered?

2014-12-15 Thread Xuelin Cao

Hi,
     I tried to create a function that to convert an Unix time stamp to the 
hour number in a day.
      It works if the code is like this:sqlContext.registerFunction("toHour", 
(x:Long)=>{new java.util.Date(x*1000).getHours})

      But, if I do it like this, it doesn't work:
      def toHour (x:Long) = {new java.util.Date(x*1000).getHours}      
sqlContext.registerFunction("toHour", toHour)
      The system reports an error::23: error: missing arguments for 
method toHour;follow this method with `_' if you want to treat it as a 
partially applied function              sqlContext.registerFunction("toHour", 
toHour)             Anyone can help on dealing with this error?


Re: Run Spark job on Playframework + Spark Master/Worker in one Mac

2014-12-15 Thread Aniket Bhatnagar
Try the workaround (addClassPathJars(sparkContext,
this.getClass.getClassLoader) discussed in
http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E

Thanks,
Aniket

On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi <
tomoya.igarashi.0...@gmail.com> wrote:

> Hi all,
>
> I am trying to run Spark job on Playframework + Spark Master/Worker in one
> Mac.
> When job ran, I encountered java.lang.ClassNotFoundException.
> Would you teach me how to solve it?
>
> Here is my code in Github.
> https://github.com/TomoyaIgarashi/spark_cluster_sample
>
> * Envrionments:
> Mac 10.9.5
> Java 1.7.0_71
> Playframework 2.2.3
> Spark 1.1.1
>
> * Setup history:
> > cd ~
> > git clone g...@github.com:apache/spark.git
> > cd spark
> > git checkout -b v1.1.1 v1.1.1
> > sbt/sbt assembly
> > vi ~/.bashrc
> export SPARK_HOME=/Users/tomoya/spark
> > . ~/.bashrc
> > hostname
> Tomoya-Igarashis-MacBook-Air.local
> > vi $SPARK_HOME/conf/slaves
> Tomoya-Igarashis-MacBook-Air.local
> > play new spark_cluster_sample
> default name
> type -> scala
>
> * Run history:
> > $SPARK_HOME/sbin/start-all.sh
> > jps
> > which play
> /Users/tomoya/play/play
> > git clone https://github.com/TomoyaIgarashi/spark_cluster_sample
> > cd spark_cluster_sample
> > play run
>
> * Error trace:
> Here is error trace in Gist.
> https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511
>
> Regards
>


Re: Serialization issue when using HBase with Spark

2014-12-15 Thread Aniket Bhatnagar
"The reason not using sc.newAPIHadoopRDD is it only support one scan each
time."

I am not sure is that's true. You can use multiple scans as following:

val scanStrings = scans.map(scan => convertScanToString(scan))
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)

where convertScanToString is implemented as:

/**
 * Serializes a HBase scan into string.
 * @param scan Scan to serialize.
 * @return Base64 encoded serialized scan.
 */
private def convertScanToString(scan: Scan) = {
  val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}

Thanks,
Aniket

On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu  wrote:

> Just point out a bug in your codes. You should not use `mapPartitions`
> like that. For details, I recommend Section "setup() and cleanup()" in Sean
> Owen's post:
> http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
>
> Best Regards,
> Shixiong Zhu
>
> 2014-12-14 16:35 GMT+08:00 Yanbo :
>>
>> In #1, class HTable can not be serializable.
>> You also need to check you self defined function getUserActions and make
>> sure it is a member function of one class who implement serializable
>> interface.
>>
>> 发自我的 iPad
>>
>> > 在 2014年12月12日,下午4:35,yangliuyu  写道:
>> >
>> > The scenario is using HTable instance to scan multiple rowkey range in
>> Spark
>> > tasks look likes below:
>> > Option 1:
>> > val users = input
>> >  .map { case (deviceId, uid) =>
>> > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
>> >  val conf = HBaseConfiguration.create()
>> >  val table = new HTable(conf, "actions")
>> >  val result = iterator.map{ userId=>
>> >(userId, getUserActions(table, userId, timeStart, timeStop))
>> >  }
>> >  table.close()
>> >  result
>> >})
>> >
>> > But got the exception:
>> > org.apache.spark.SparkException: Task not serializable
>> >at
>> >
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>> >at
>> > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> >at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>> >at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>> >at
>> >
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)...
>> > ...
>> > Caused by: java.io.NotSerializableException:
>> > org.apache.hadoop.conf.Configuration
>> >at
>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>> >at
>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >at
>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >
>> > The reason not using sc.newAPIHadoopRDD is it only support one scan each
>> > time.
>> > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>> >  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>> >  classOf[org.apache.hadoop.hbase.client.Result])
>> >
>> > And if using MultiTableInputFormat, driver is not possible put all
>> rowkeys
>> > into HBaseConfiguration
>> > Option 2:
>> > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
>> >  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>> >  classOf[org.apache.hadoop.hbase.client.Result])
>> >
>> > It may divide all rowkey ranges into several parts then use option 2,
>> but I
>> > prefer option 1. So is there any solution for option 1?
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Spark with HBase

2014-12-15 Thread Aniket Bhatnagar
In case you are still looking for help, there has been multiple discussions
in this mailing list that you can try searching for. Or you can simply use
https://github.com/unicredit/hbase-rdd :-)

Thanks,
Aniket

On Wed Dec 03 2014 at 16:11:47 Ted Yu  wrote:

> Which hbase release are you running ?
> If it is 0.98, take a look at:
>
> https://issues.apache.org/jira/browse/SPARK-1297
>
> Thanks
>
> On Dec 2, 2014, at 10:21 PM, Jai  wrote:
>
> I am trying to use Apache Spark with a psuedo distributed Hadoop Hbase
> Cluster and I am looking for some links regarding the same. Can someone
> please guide me through the steps to accomplish this. Thanks a lot for
> Helping
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-HBase-tp20226.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


HiveQL support in Cassandra-Spark connector

2014-12-15 Thread shahab
Hi,

I just wonder if Cassandra-Spark connector supports executing HiveQL on
Cassandra tables?

best,
/Shahab


Re: JSON Input files

2014-12-15 Thread Madabhattula Rajesh Kumar
Hi Helena and All,

I have found one example "multi-line json file" into an RDD using "
https://github.com/alexholmes/json-mapreduce";.

val data = sc.newAPIHadoopFile(
filepath,
classOf[MultiLineJsonInputFormat],
classOf[LongWritable],
classOf[Text],
conf ).map(p => (p._1.get, p._2.toString))
 data.count

It is expecting Conf object. What Conf value I need to specify and how
to specify.
MultiLineJsonInputFormat class is expecting "member" value. How to
pass "member value. Otherwise I'm getting below exception

















*java.io.IOException: Missing configuration value for
multilinejsoninputformat.member at
com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
 at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:115)
at
org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:103)   at
org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65)at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)at
org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)  at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)  at
org.apache.spark.scheduler.Task.run(Task.scala:54)  at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at
java.lang.Thread.run(Thread.java:745)*

Please let me know who to resolve this issue

Regards,
Rajesh


On Sun, Dec 14, 2014 at 7:21 PM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:
>
> Thank you Yanbo
>
> Regards,
> Rajesh
>
> On Sun, Dec 14, 2014 at 3:15 PM, Yanbo  wrote:
>>
>> Pay attention to your JSON file, try to change it like following.
>> Each record represent as a JSON string.
>>
>>  {"NAME" : "Device 1",
>>   "GROUP" : "1",
>>   "SITE" : "qqq",
>>   "DIRECTION" : "East",
>>  }
>>  {"NAME" : "Device 2",
>>   "GROUP" : "2",
>>   "SITE" : "sss",
>>   "DIRECTION" : "North",
>>  }
>>
>> > 在 2014年12月14日,下午5:01,Madabhattula Rajesh Kumar 
>> 写道:
>> >
>> > { "Device 1" :
>> >  {"NAME" : "Device 1",
>> >   "GROUP" : "1",
>> >   "SITE" : "qqq",
>> >   "DIRECTION" : "East",
>> >  }
>> >  "Device 2" :
>> >  {"NAME" : "Device 2",
>> >   "GROUP" : "2",
>> >   "SITE" : "sss",
>> >   "DIRECTION" : "North",
>> >  }
>> > }
>>
>


Re: ...FileNotFoundException: Path is not a file: - error on accessing HDFS with sc.wholeTextFiles

2014-12-15 Thread Karen Murphy


Thanks Akhil,

In line with your suggestion I have used the following 2 commands to 
flatten the directory structure:


find . -type f -iname '*' -exec  mv '{}' . \;
find . -type d -exec rm -rf '{}' \;

Kind Regards
Karen



On 12/12/14 13:25, Akhil Das wrote:
I'm not quiet sure whether spark will go inside subdirectories and 
pick up files from it. You could do something like following to bring 
all files to one directory.


find . -iname '*' -exec mv '{}' . \;


Thanks
Best Regards

On Fri, Dec 12, 2014 at 6:34 PM, Karen Murphy > wrote:



When I try to load a text file from a HDFS path using

sc.wholeTextFiles("hdfs://localhost:54310/graphx/anywebsite.com/anywebsite.com/
")

I'm get the following error:
java.io.FileNotFoundException: Path is not a file:
/graphx/anywebsite.com/anywebsite.com/css

(full stack trace at bottom of message).

If I switch my Scala code to reading the input file from the local
disk, wholeTextFiles doesn't pickup directories (such as css in
this case) and there is no exception raised.

The trace information in the 'local file' version shows that only
plain text files are collected with sc.wholeTextFiles:

14/12/12 11:51:29 INFO WholeTextFileRDD: Input split:

Paths:/tmp/anywebsite.com/anywebsite.com/index-2.html:0+6192,/tmp/anywebsite.com/anywebsite.com/gallery.html:0+3258,/tmp/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/tmp/anywebsite.com/anywebsite.com/jquery.html:0+326,/tmp/anywebsite.com/anywebsite.com/index.html:0+6174,/tmp/anywebsite.com/anywebsite.com/contact.html:0+3050,/tmp/anywebsite.com/anywebsite.com/archive.html:0+3247



Yet the trace information in the 'HDFS file' version shows
directories too are collected with sc.wholeTextFiles:

14/12/12 11:49:07 INFO WholeTextFileRDD: Input split:

Paths:/graphx/anywebsite.com/anywebsite.com/archive.html:0+3247,/graphx/anywebsite.com/anywebsite.com/contact.html:0+3050,/graphx/anywebsite.com/anywebsite.com/css:0+0,/graphx/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/graphx/anywebsite.com/anywebsite.com/gallery.html:0+3258,/graphx/anywebsite.com/anywebsite.com/highslide:0+0,/graphx/anywebsite.com/anywebsite.com/highslideIndex:0+0,/graphx/anywebsite.com/anywebsite.com/images:0+0,/graphx/anywebsite.com/anywebsite.com/index-2.html:0+6192,/graphx/anywebsite.com/anywebsite.com/index.html:0+6174,/graphx/anywebsite.com/anywebsite.com/jquery.html:0+326,/graphx/anywebsite.com/anywebsite.com/js:0+0


14/12/12 11:49:07 ERROR Executor: Exception in task 1.0 in stage
0.0 (TID 1)
java.io.FileNotFoundException: Path is not a file:
/graphx/anywebsite.com/anywebsite.com/css

at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68)
at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)

Should the HDFS version behave the same as the local version of
wholeTextFiles as far as the treatment of directories/non plain
text files are concerned ?

Any help, advice or workaround suggestions would be much appreciated,

Thanks
Karen

VERSION INFO
Ubuntu 14.04
Spark 1.1.1
Hadoop 2.5.2
Scala 2.10.4

FULL STACK TRACE
14/12/12 12:02:31 INFO WholeTextFileRDD: Input split:

Paths:/graphx/anywebsite.com/anywebsite.com/archive.html:0+3247,/graphx/anywebsite.com/anywebsite.com/contact.html:0+3050,/graphx/anywebsite.com/anywebsite.com/css:0+0,/graphx/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/graphx/anywebsite.com/anywebsite.com/gallery.html:0+3258,/graphx/anywebsite.com/anywebsite.com/highslide:0+0,/graphx/anywebsite.com/anywebsite.com/highslideIndex:0+0,/graphx/anywebsite.com/anywebsite.com/images:0+0,/graphx/anywebsi

Re: SchemaRDD partition on specific column values?

2014-12-15 Thread Nitin Goyal
Hi Michael,

I have opened following JIRA for the same :-

https://issues.apache.org/jira/browse/SPARK-4849

I am having a look at the code to see what can be done and then we can have
a discussion over the approach.

Let me know if you have any comments/suggestions.

Thanks
-Nitin

On Sun, Dec 14, 2014 at 2:53 PM, Michael Armbrust 
wrote:
>
> I'm happy to discuss what it would take to make sure we can propagate this
> information correctly.  Please open a JIRA (and mention me in it).
>
> Regarding including it in 1.2.1, it depends on how invasive the change
> ends up being, but it is certainly possible.
>
> On Thu, Dec 11, 2014 at 3:55 AM, nitin  wrote:
>>
>> Can we take this as a performance improvement task in Spark-1.2.1? I can
>> help
>> contribute for this.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20623.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>

-- 
Regards
Nitin Goyal


Migrating Parquet inputs

2014-12-15 Thread Marius Soutier
Hi,

is there an easy way to “migrate” parquet files or indicate optional values in 
sql statements? I added a couple of new fields that I also use in a 
schemaRDD.sql() which obviously fails for input files that don’t have the new 
fields.

Thanks
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Why my SQL UDF cannot be registered?

2014-12-15 Thread Cheng, Hao
As the error log shows, you may need to register it as:

sqlContext.rgisterFunction(“toHour”, toHour _)

The “_” means you are passing the function as parameter, not invoking it.

Cheng Hao

From: Xuelin Cao [mailto:xuelin...@yahoo.com.INVALID]
Sent: Monday, December 15, 2014 5:28 PM
To: User
Subject: Why my SQL UDF cannot be registered?


Hi,

 I tried to create a function that to convert an Unix time stamp to the 
hour number in a day.

  It works if the code is like this:
sqlContext.registerFunction("toHour", (x:Long)=>{new 
java.util.Date(x*1000).getHours})

  But, if I do it like this, it doesn't work:

  def toHour (x:Long) = {new java.util.Date(x*1000).getHours}
  sqlContext.registerFunction("toHour", toHour)

  The system reports an error:
:23: error: missing arguments for method toHour;
follow this method with `_' if you want to treat it as a partially applied 
function
  sqlContext.registerFunction("toHour", toHour)

  Anyone can help on dealing with this error?



Re: JSON Input files

2014-12-15 Thread Peter Vandenabeele
On Sat, Dec 13, 2014 at 5:43 PM, Helena Edelson  wrote:

> One solution can be found here:
> https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets
>
>
As far as I understand, the people.json file is not really a proper json
file, but a file documented as:

  "... JSON files where each line of the files is a JSON object.".

This means that is a file with multiple lines, but each line needs to have
a fully self-contained JSON object
(initially confusing, this will not parse a standard multi-line JSON file).
We are working to clarify this in
https://github.com/apache/spark/pull/3517

HTH,

Peter




> - Helena
> @helenaedelson
>
> On Dec 13, 2014, at 11:18 AM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
> Hi Team,
>
> I have a large JSON file in Hadoop. Could you please let me know
>
> 1. How to read the JSON file
> 2. How to parse the JSON file
>
> Please share any example program based on Scala
>
> Regards,
> Rajesh
>
>
>


-- 
Peter Vandenabeele
http://www.allthingsdata.io
http://www.linkedin.com/in/petervandenabeele
https://twitter.com/peter_v
gsm: +32-478-27.40.69
e-mail: pe...@vandenabeele.com
skype: peter_v_be


Re: java.lang.IllegalStateException: unread block data

2014-12-15 Thread Akhil
When you say restored, does it mean the internal IP/public IP remain
unchanged to you changed them accordingly? (I'm assuming you are using a
cloud service like AWS, GCE or Azure).

What is the serializer that you are using? Try to set the following before
creating the sparkContext, might help with Serialization and all

System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator",
"com.sigmoidanalytics.MyRegistrator")


Morbious wrote
> Hi,
> 
> Recently I installed Cloudera Hadoop 5.1.1 with spark.
> I shut down slave servers and than restored them back.
> After this operation I was trying to run any task but each task with file
> bigger than few megabytes ended with errors:
> 
> 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 61 (task 1.0:61)
> 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.IllegalStateException
> java.lang.IllegalStateException: unread block data
>   at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
>   at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at
> org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:140)
>   at
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>   at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>   at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 62 (task 1.0:62)
> 14/12/12 20:25:02 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.IllegalStateException: unread block data [duplicate 1]
> 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 63 (task 1.0:63)
> 14/12/12 20:25:02 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.IllegalStateException: unread block data [duplicate 2]
> 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 64 (task 1.0:64)
> 14/12/12 20:25:02 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.IllegalStateException: unread block data [duplicate 3]
> 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 60 (task 1.0:60)
> 
> I checked security limits but everything seems to be OK.
> Before restart I was able to use word count on 100GB file, now it can be
> done only on few mb file.
> 
> Best regards,
> 
> Morbious





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p20684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: stage failure: java.lang.IllegalStateException: unread block data

2014-12-15 Thread Akhil
Try to run the spark-shell in standalone mode
(MASTER=spark://yourmasterurl:7077 $SPARK_HOME/bin/spark-shell), and do a
small count ( val d = sc.parallelize(1 to 1000).count()), If that is
failing, then something is wrong with your cluster setup as its saying
Connection refused: node001/10.180.49.228:


freedafeng wrote
> The worker side has error message as this,
> 
> 14/10/30 18:29:00 INFO Worker: Asked to launch executor
> app-20141030182900-0006/0 for testspark_v1
> 14/10/30 18:29:01 INFO ExecutorRunner: Launch command: "java" "-cp"
> "::/root/spark-1.1.0/conf:/root/spark-1.1.0/assembly/target/scala-2.10/spark-assembly-1.1.0-hadoop2.3.0.jar"
> "-XX:MaxPermSize=128m" "-Dspark.driver.port=52552" "-Xms512M" "-Xmx512M"
> "org.apache.spark.executor.CoarseGrainedExecutorBackend"
> "akka.tcp://sparkDriver@master:52552/user/CoarseGrainedScheduler" "0"
> "node001" "4" "akka.tcp://sparkWorker@node001:60184/user/Worker"
> "app-20141030182900-0006"
> 14/10/30 18:29:03 INFO Worker: Asked to kill executor
> app-20141030182900-0006/0
> 14/10/30 18:29:03 INFO ExecutorRunner: Runner thread for executor
> app-20141030182900-0006/0 interrupted
> 14/10/30 18:29:03 INFO ExecutorRunner: Killing process!
> 14/10/30 18:29:03 ERROR FileAppender: Error writing stream to file
> /root/spark-1.1.0/work/app-20141030182900-0006/0/stderr
> java.io.IOException: Stream Closed
>   at java.io.FileInputStream.readBytes(Native Method)
>   at java.io.FileInputStream.read(FileInputStream.java:214)
>   at
> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
>   at
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
>   at
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>   at
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
>   at
> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
> 14/10/30 18:29:04 INFO Worker: Executor app-20141030182900-0006/0 finished
> with state KILLED exitStatus 143
> 14/10/30 18:29:04 INFO LocalActorRef: Message
> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
> Actor[akka://sparkWorker/deadLetters] to
> Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.180.49.228%3A52120-22#1336571562]
> was not delivered. [6] dead letters encountered. This logging can be
> turned off or adjusted with configuration settings 'akka.log-dead-letters'
> and 'akka.log-dead-letters-during-shutdown'.
> 14/10/30 18:29:04 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@node001:60184] ->
> [akka.tcp://sparkExecutor@node001:37697]: Error [Association failed with
> [akka.tcp://sparkExecutor@node001:37697]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkExecutor@node001:37697]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: node001/10.180.49.228:37697
> ]
> 14/10/30 18:29:04 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@node001:60184] ->
> [akka.tcp://sparkExecutor@node001:37697]: Error [Association failed with
> [akka.tcp://sparkExecutor@node001:37697]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkExecutor@node001:37697]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: node001/10.180.49.228:37697
> ]
> 14/10/30 18:29:04 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@node001:60184] ->
> [akka.tcp://sparkExecutor@node001:37697]: Error [Association failed with
> [akka.tcp://sparkExecutor@node001:37697]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkExecutor@node001:37697]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: node001/10.180.49.228:37697
> ]
> 
> Thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/stage-failure-java-lang-IllegalStateException-unread-block-data-tp17751p20685.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



is there a way to interact with Spark clusters remotely?

2014-12-15 Thread Xiaoyong Zhu
Hi experts

I am wondering if there is a way to interactive with Spark remotely? i.e. no 
access to clusters required but submit Python/Scala scripts to cluster and get 
result based on (REST) APIs.
That will facilitate the development process a lot..

Xiaoyong


IBM open-sources Spark Kernel

2014-12-15 Thread lbustelo
This was posted on the Dev list, but it is very relevant to the user list as
well…

--
We are happy to announce a developer preview of the Spark Kernel which 
enables remote applications to dynamically interact with Spark. You can 
think of the Spark Kernel as a remote Spark Shell that uses the IPython 
notebook interface to provide a common entrypoint for any application. The 
Spark Kernel obviates the need to submit jars using spark-submit, and can 
replace the existing Spark Shell. 

You can try out the Spark Kernel today by installing it from our github 
repo at https://github.com/ibm-et/spark-kernel. To help you get a demo 
environment up and running quickly, the repository also includes a 
Dockerfile and a Vagrantfile to build a Spark Kernel container and connect 
to it from an IPython notebook. 

We have included a number of documents with the project to help explain it 
and provide how-to information: 

* A high-level overview of the Spark Kernel and its client library ( 
https://issues.apache.org/jira/secure/attachment/12683624/Kernel%20Architecture.pdf
). 

* README (https://github.com/ibm-et/spark-kernel/blob/master/README.md) - 
building and testing the kernel, and deployment options including building 
the Docker container and packaging the kernel. 

* IPython instructions ( 
https://github.com/ibm-et/spark-kernel/blob/master/docs/IPYTHON.md) - 
setting up the development version of IPython and connecting a Spark 
Kernel. 

* Client library tutorial ( 
https://github.com/ibm-et/spark-kernel/blob/master/docs/CLIENT.md) - 
building and using the client library to connect to a Spark Kernel. 

* Magics documentation ( 
https://github.com/ibm-et/spark-kernel/blob/master/docs/MAGICS.md) - the 
magics in the kernel and how to write your own. 

We think the Spark Kernel will be useful for developing applications for 
Spark, and we are making it available with the intention of improving these 
capabilities within the context of the Spark community ( 
https://issues.apache.org/jira/browse/SPARK-4605). We will continue to 
develop the codebase and welcome your comments and suggestions. 


Signed, 

Chip Senkbeil 
IBM Emerging Technology Software Engineer



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/IBM-open-sources-Spark-Kernel-tp20686.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JSON Input files

2014-12-15 Thread Madabhattula Rajesh Kumar
Hi Peter,

Thank you for the clarification.

Now we need to store each JSON object into one line. Is there any
limitation of length of JSON object? So, JSON object will not go to the
next line.

What will happen if JSON object is a big/huge one?  Will it store in a
single line in HDFS?

What will happen, if JSON object contains BLOB/CLOB value? Is this entire
JSON object stores in single line of HDFS?

What will happen, if JSON object exceeding the HDFS block size. For
example, single JSON object split into two different worker nodes. In this
case, How Spark will read this JSON object?

Could you please clarify above questions

Regards,
Rajesh


On Mon, Dec 15, 2014 at 6:52 PM, Peter Vandenabeele 
wrote:
>
>
>
> On Sat, Dec 13, 2014 at 5:43 PM, Helena Edelson <
> helena.edel...@datastax.com> wrote:
>
>> One solution can be found here:
>> https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets
>>
>>
> As far as I understand, the people.json file is not really a proper json
> file, but a file documented as:
>
>   "... JSON files where each line of the files is a JSON object.".
>
> This means that is a file with multiple lines, but each line needs to have
> a fully self-contained JSON object
> (initially confusing, this will not parse a standard multi-line JSON
> file). We are working to clarify this in
> https://github.com/apache/spark/pull/3517
>
> HTH,
>
> Peter
>
>
>
>
>> - Helena
>> @helenaedelson
>>
>> On Dec 13, 2014, at 11:18 AM, Madabhattula Rajesh Kumar <
>> mrajaf...@gmail.com> wrote:
>>
>> Hi Team,
>>
>> I have a large JSON file in Hadoop. Could you please let me know
>>
>> 1. How to read the JSON file
>> 2. How to parse the JSON file
>>
>> Please share any example program based on Scala
>>
>> Regards,
>> Rajesh
>>
>>
>>
>
>
> --
> Peter Vandenabeele
> http://www.allthingsdata.io
> http://www.linkedin.com/in/petervandenabeele
> https://twitter.com/peter_v
> gsm: +32-478-27.40.69
> e-mail: pe...@vandenabeele.com
> skype: peter_v_be
>


Re: Adding a column to a SchemaRDD

2014-12-15 Thread Nathan Kronenfeld
Perfect, that's exactly what I was looking for.

Thank you!

On Mon, Dec 15, 2014 at 3:32 AM, Yanbo Liang  wrote:
>
> Hi Nathan,
>
> #1
>
> Spark SQL & DSL can satisfy your requirement. You can refer the following
> code snippet:
>
> jdata.select(Star(Node), 'seven.getField("mod"), 'eleven.getField("mod"))
>
> You need to import org.apache.spark.sql.catalyst.analysis.Star in advance.
>
> #2
>
> After you make the transform above, you do not need to make SchemaRDD
> manually.
> Because that jdata.select() return a SchemaRDD and you can operate on it
> directly.
>
> For example, the following code snippet will return a new SchemaRDD with
> longer Row:
>
> val t1 = jdata.select(Star(Node), 'seven.getField("mod") +
> 'eleven.getField("mod")  as 'mod_sum)
>
> You can use t1.printSchema() to print the schema of this SchemaRDD and
> check whether it satisfy your requirements.
>
>
>
> 2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld :
>>
>> (1) I understand about immutability, that's why I said I wanted a new
>> SchemaRDD.
>> (2) I specfically asked for a non-SQL solution that takes a SchemaRDD,
>> and results in a new SchemaRDD with one new function.
>> (3) The DSL stuff is a big clue, but I can't find adequate documentation
>> for it
>>
>> What I'm looking for is something like:
>>
>> import org.apache.spark.sql._
>>
>>
>> val sqlc = new SQLContext(sc)
>> import sqlc._
>>
>>
>> val data = sc.parallelize(0 to 99).map(n =>
>> ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
>>   "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n
>> % 11, n * 11))
>> val jdata = sqlc.jsonRDD(data)
>> jdata.registerTempTable("jdata")
>>
>>
>> val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
>> FROM jdata")
>>
>>
>> This sqlVersion works fine, but if I try to do the same thing with a
>> programatic function, I'm missing a bunch of pieces:
>>
>>- I assume I'd need to start with something like:
>>jdata.select('*, 'seven.mod, 'eleven.mod)
>>and then get and process the last two elements.  The problems are:
>>   - I can't select '* - there seems no way to get the complete row
>>   - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
>>   seems only one deep.
>>- Assuming I could do that, I don't see a way to make the result into
>>a SchemaRDD.  I assume I would have to do something like:
>>   1. take my row and value, and create a new, slightly longer row
>>   2. take my old schema, and create a new schema with one more field
>>   at the end, named and typed appropriately
>>   3. combine the two into a SchemaRDD
>>   I think I see how to do 3, but 1 and 2 elude me.
>>
>> Is there more complete documentation somewhere for the DSL portion?
>> Anyone have a clue about any of the above?
>>
>>
>>
>> On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang 
>> wrote:
>>
>>> RDD is immutable so you can not modify it.
>>> If you want to modify some value or schema in RDD,  using map to
>>> generate a new RDD.
>>> The following code for your reference:
>>>
>>> def add(a:Int,b:Int):Int = {
>>>   a + b
>>> }
>>>
>>> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
>>> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
>>> d2.foreach(println)
>>>
>>>
>>> Otherwise, if your self-defining function is straightforward and you can
>>> represent it by SQL, using Spark SQL or DSL is also a good choice.
>>>
>>> case class Person(id: Int, score: Int, value: Int)
>>>
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>
>>> import sqlContext._
>>>
>>> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
>>> val d2 = d1.select('id, 'score, 'id + 'score)
>>> d2.foreach(println)
>>>
>>>
>>> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld >> >:
>>>
 Hi, there.

 I'm trying to understand how to augment data in a SchemaRDD.

 I can see how to do it if can express the added values in SQL - just
 run "SELECT *,valueCalculation AS newColumnName FROM table"

 I've been searching all over for how to do this if my added value is a
 scala function, with no luck.

 Let's say I have a SchemaRDD with columns A, B, and C, and I want to
 add a new column, D, calculated using Utility.process(b, c), and I want (of
 course) to pass in the value B and C from each row, ending up with a new
 SchemaRDD with columns A, B, C, and D.

 Is this possible? If so, how?

 Thanks,
-Nathan

 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com

>>>
>>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>

-- 
Nathan Kronenfeld
Se

Re: Pagerank implementation

2014-12-15 Thread kmurph

Hiya,

I too am looking for a PageRank solution in GraphX where the probabilities
sum to 1.
I tried a few modifications, including division by the total number of
vertices in the first part of the equation, as well as trying to return full
rank instead of delta (though not correctly as evident from exception at
runtime).

Tom did you manage to make a version which sums to 1 ?  Could you possibly
divulge the changes if so ?

Also, I'm interested to know if the algorithm handles the case where there
are no outgoing links from a node ?  Does it avoid unfairness with sinks ? 
I'm new to Scala (and spark).  Had a look at the code and don't see that it
is, but could be missing something,

Thanks

Karen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pagerank-implementation-tp19013p20687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



integrating long-running Spark jobs with Thriftserver

2014-12-15 Thread Tim Schweichler
Hi everybody,

I apologize if the answer to my question is obvious but I haven't been able to 
find a straightforward solution anywhere on the internet.

I have a number of Spark jobs written using the python API that do things like 
read in data from Amazon S3 to a main table in the Hive metastore, perform 
intensive calculations on that data to build derived/aggregated tables, etc. I 
also have Tableau set up to read those tables via the Spark Thriftserver.

My question is how best to integrate those two sides of Spark. I want to have 
the Thriftserver constantly running so that Tableau can update its extracts on 
a scheduled basis and users can manually query those tables as needed, but I 
also need to run those python jobs on a scheduled basis as well. What's the 
best way to do that? The options I'm considering are as follows:


  1.  Simply call the python jobs via spark-submit, scheduled by cron. My 
concern here is concurrency issues if Tableau or a user tries to read from a 
table at the same time that a job is rebuilding/updating that table. To my 
understanding the Thriftserver is designed to handle concurrency, but Spark in 
general is not if two different Spark contexts are attempting to access the 
same data (as would be the case with this approach.) Am I correct in that 
thinking or is there actually no problem with this method?
  2.  Call the python jobs through the Spark Thriftserver so that the same 
Spark context is used. My question here is how to do that. I know one can call 
a python script as part of a HiveQL query using TRANSFORM, but that seems to be 
designed more for performing quick calculations on existing data as part of a 
query rather than building tables in the first place or calling long-running 
jobs that don't return anything (again, am I correct in this thinking or would 
this actually be a viable solution?) Is there a different way to call 
long-running Spark jobs via the Thriftserver?

Are either of these good approaches or is there a better way that I'm missing?

Thanks!


Serialize mllib's MatrixFactorizationModel

2014-12-15 Thread Albert Manyà
Hi all.

I'm willing to serialize and later load a model trained using mllib's
ALS.

I've tried usign Java serialization with something like:

val model = ALS.trainImplicit(training, rank, numIter, lambda, 1)
val fos = new FileOutputStream("model.bin")
val oos = new ObjectOutputStream(fos)
oos.writeObject(bestModel.get)

But when I try to deserialize it using:

val fos = new FileInputStream("model.bin")
val oos = new ObjectInputStream(fos)
val model = oos.readObject().asInstanceOf[MatrixFactorizationModel]

 I get the error:

Exception in thread "main" java.io.IOException: PARSING_ERROR(2)

I've also tried to serialize MatrixFactorizationModel's both RDDs
(products and users) and later create the MatrixFactorizationModel by
hand passing the RDDs by constructor but I get an error cause its
private:

Error:(58, 17) constructor MatrixFactorizationModel in class
MatrixFactorizationModel cannot be accessed in object RecommendALS
val model = new MatrixFactorizationModel (8, userFeatures,
productFeatures)

Any ideas?

Thanks!

-- 
  Albert Manyà
  alber...@eml.cc

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NullPointerException When Reading Avro Sequence Files

2014-12-15 Thread Simone Franzini
To me this looks like an internal error to the REPL. I am not sure what is
causing that.
Personally I never use the REPL, can you try typing up your program and
running it from an IDE or spark-submit and see if you still get the same
error?

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Mon, Dec 15, 2014 at 4:54 PM, Cristovao Jose Domingues Cordeiro <
cristovao.corde...@cern.ch> wrote:
>
>  Sure, thanks:
> warning: there were 1 deprecation warning(s); re-run with -deprecation for
> details
> java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
> at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283)
> at org.apache.hadoop.mapreduce.Job.toString(Job.java:462)
> at
> scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
> at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
> at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
> at .(:10)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:846)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1119)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:672)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:703)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:667)
> at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:819)
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:864)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:776)
> at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:619)
> at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:627)
> at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:632)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:959)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:907)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:907)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:907)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1002)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
>
> Could something you omitted in your snippet be chaining this exception?
>
>  Cumprimentos / Best regards,
> Cristóvão José Domingues Cordeiro
> IT Department - 28/R-018
> CERN
>--
> *From:* Simone Franzini [captainfr...@gmail.com]
> *Sent:* 15 December 2014 16:52
>
> *To:* Cristovao Jose Domingues Cordeiro
> *Subject:* Re: NullPointerException When Reading Avro Sequence Files
>
>   Ok, I have no idea what that is. That appears to be an internal Spark
> exception. Maybe if you can post the entire stack trace it would give some
> more details to understand what is going on.
>
>  Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>
> On Mon, Dec 15, 2014 at 4:50 PM, Cristovao Jose Domingues Cordeiro <
> cristovao.corde...@cern.ch> wrote:
>>
>>  Hi,
>>
>> thanks for that.
>> But yeah the 2nd line is an exception. jobread is not created.
>>
>>  Cumprimentos / Best regards,
>> Cristóvão José Domingues Cordeiro
>> IT Department - 28/R-018
>> CERN
>>--
>> *From:* Simone Franzini [captainfr...@gmail.com]
>> *Sent:* 15 December 2014 16:39
>>
>> *To:* Cristovao Jose Domingues Cordeiro
>> *Subject:* Re: NullPointerException When Reading Avro Sequence Files
>>
>>I did not mention the imports needed in my code. I think these are
>> all of them:
>>
>>  import org.apache.hadoop.mapreduce.Job
>> import org.apache.hadoop.io.NullWritable
>> import org.apache.hadoop.mapreduce.lib.input.F

Re: spark kafka batch integration

2014-12-15 Thread Cody Koeninger
For an alternative take on a similar idea, see

https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka/src/main/scala/org/apache/spark/rdd/kafka

An advantage of the approach I'm taking is that the lower and upper offsets
of the RDD are known in advance, so it's deterministic.

I haven't had a need to write to kafka from spark yet, so that's an obvious
advantage of your library.

I think the existing kafka dstream is inadequate for a number of use cases,
and would really like to see some combination of these approaches make it
into the spark codebase.

On Sun, Dec 14, 2014 at 2:41 PM, Koert Kuipers  wrote:
>
> hello all,
> we at tresata wrote a library to provide for batch integration between
> spark and kafka (distributed write of rdd to kafa, distributed read of rdd
> from kafka). our main use cases are (in lambda architecture jargon):
> * period appends to the immutable master dataset on hdfs from kafka using
> spark
> * make non-streaming data available in kafka with periodic data drops from
> hdfs using spark. this is to facilitate merging the speed and batch layer
> in spark-streaming
> * distributed writes from spark-streaming
>
> see here:
> https://github.com/tresata/spark-kafka
>
> best,
> koert
>


Intermittent test failures

2014-12-15 Thread Marius Soutier
Hi,

I’m seeing strange, random errors when running unit tests for my Spark jobs. In 
this particular case I’m using Spark SQL to read and write Parquet files, and 
one error that I keep running into is this one:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in 
stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 
223, localhost): java.io.IOException: PARSING_ERROR(2)
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)

I can only prevent this from happening by using isolated Specs tests thats 
always create a new SparkContext that is not shared between tests (but there 
can also be only a single SparkContext per test), and also by using standard 
SQLContext instead of HiveContext. It does not seem to have anything to do with 
the actual files that I also create during the test run with 
SQLContext.saveAsParquetFile.


Cheers
- Marius


PS The full trace:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in 
stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 
223, localhost): java.io.IOException: PARSING_ERROR(2)
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)

org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)

org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)

org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)

org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232)

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)

org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)
sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
 ~[spark-core_2.10-1.1.1.jar:1.1.1]
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
 ~[spark-core_2.10-1.1.1.jar:1.1.1]
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
 ~[spark-core_2.10-1.1.1.jar:1.1.1]
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
~[scala-library.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
~[scala-library.jar:na]
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
~[spark-core_2.10-1.1.1.jar:1.1.1]
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark kafka batch integration

2014-12-15 Thread Koert Kuipers
thanks! i will take a look at your code. didn't realize there was already
something out there.

good point about upper offsets, i will add that feature to our version as
well if you dont mind.

i was thinking about making it deterministic for task failure transparently
(even if no upper offsets are provided) by doing a call to get the latest
offsets for all partitions, and filter the rdd based on that to make sure
nothing beyond those offsets ends up in the rdd. havent had time to test if
that works and is robust.

On Mon, Dec 15, 2014 at 11:39 AM, Cody Koeninger  wrote:
>
> For an alternative take on a similar idea, see
>
>
> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka/src/main/scala/org/apache/spark/rdd/kafka
>
> An advantage of the approach I'm taking is that the lower and upper
> offsets of the RDD are known in advance, so it's deterministic.
>
> I haven't had a need to write to kafka from spark yet, so that's an
> obvious advantage of your library.
>
> I think the existing kafka dstream is inadequate for a number of use
> cases, and would really like to see some combination of these approaches
> make it into the spark codebase.
>
> On Sun, Dec 14, 2014 at 2:41 PM, Koert Kuipers  wrote:
>
>> hello all,
>> we at tresata wrote a library to provide for batch integration between
>> spark and kafka (distributed write of rdd to kafa, distributed read of rdd
>> from kafka). our main use cases are (in lambda architecture jargon):
>> * period appends to the immutable master dataset on hdfs from kafka using
>> spark
>> * make non-streaming data available in kafka with periodic data drops from
>> hdfs using spark. this is to facilitate merging the speed and batch layer
>> in spark-streaming
>> * distributed writes from spark-streaming
>>
>> see here:
>> https://github.com/tresata/spark-kafka
>>
>> best,
>> koert
>>
>


Re: Serialize mllib's MatrixFactorizationModel

2014-12-15 Thread Sean Owen
This class is not going to be serializable, as it contains huge RDDs.
Even if the right constructor existed the RDDs inside would not
serialize.

On Mon, Dec 15, 2014 at 4:33 PM, Albert Manyà  wrote:
> Hi all.
>
> I'm willing to serialize and later load a model trained using mllib's
> ALS.
>
> I've tried usign Java serialization with something like:
>
> val model = ALS.trainImplicit(training, rank, numIter, lambda, 1)
> val fos = new FileOutputStream("model.bin")
> val oos = new ObjectOutputStream(fos)
> oos.writeObject(bestModel.get)
>
> But when I try to deserialize it using:
>
> val fos = new FileInputStream("model.bin")
> val oos = new ObjectInputStream(fos)
> val model = oos.readObject().asInstanceOf[MatrixFactorizationModel]
>
>  I get the error:
>
> Exception in thread "main" java.io.IOException: PARSING_ERROR(2)
>
> I've also tried to serialize MatrixFactorizationModel's both RDDs
> (products and users) and later create the MatrixFactorizationModel by
> hand passing the RDDs by constructor but I get an error cause its
> private:
>
> Error:(58, 17) constructor MatrixFactorizationModel in class
> MatrixFactorizationModel cannot be accessed in object RecommendALS
> val model = new MatrixFactorizationModel (8, userFeatures,
> productFeatures)
>
> Any ideas?
>
> Thanks!
>
> --
>   Albert Manyà
>   alber...@eml.cc
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: is there a way to interact with Spark clusters remotely?

2014-12-15 Thread Akhil Das
Hi Xiaoyong,

You could refer this post if you are looking on how to run spark jobs
remotely
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-td16989.html

You will of course require network access to the cluster.

Thanks
Best Regards

On Mon, Dec 15, 2014 at 7:47 PM, Xiaoyong Zhu 
wrote:
>
>  Hi experts
>
>
>
> I am wondering if there is a way to interactive with Spark remotely? i.e.
> no access to clusters required but submit Python/Scala scripts to cluster
> and get result based on (REST) APIs.
>
> That will facilitate the development process a lot..
>
>
>
> Xiaoyong
>


Re: Serialize mllib's MatrixFactorizationModel

2014-12-15 Thread Albert Manyà
In that case, what is the strategy to train a model in some background
batch process and make recommendations for some other service in real
time? Run both processes in the same spark cluster?

Thanks.

-- 
  Albert Manyà
  alber...@eml.cc

On Mon, Dec 15, 2014, at 05:58 PM, Sean Owen wrote:
> This class is not going to be serializable, as it contains huge RDDs.
> Even if the right constructor existed the RDDs inside would not
> serialize.
> 
> On Mon, Dec 15, 2014 at 4:33 PM, Albert Manyà  wrote:
> > Hi all.
> >
> > I'm willing to serialize and later load a model trained using mllib's
> > ALS.
> >
> > I've tried usign Java serialization with something like:
> >
> > val model = ALS.trainImplicit(training, rank, numIter, lambda, 1)
> > val fos = new FileOutputStream("model.bin")
> > val oos = new ObjectOutputStream(fos)
> > oos.writeObject(bestModel.get)
> >
> > But when I try to deserialize it using:
> >
> > val fos = new FileInputStream("model.bin")
> > val oos = new ObjectInputStream(fos)
> > val model = oos.readObject().asInstanceOf[MatrixFactorizationModel]
> >
> >  I get the error:
> >
> > Exception in thread "main" java.io.IOException: PARSING_ERROR(2)
> >
> > I've also tried to serialize MatrixFactorizationModel's both RDDs
> > (products and users) and later create the MatrixFactorizationModel by
> > hand passing the RDDs by constructor but I get an error cause its
> > private:
> >
> > Error:(58, 17) constructor MatrixFactorizationModel in class
> > MatrixFactorizationModel cannot be accessed in object RecommendALS
> > val model = new MatrixFactorizationModel (8, userFeatures,
> > productFeatures)
> >
> > Any ideas?
> >
> > Thanks!
> >
> > --
> >   Albert Manyà
> >   alber...@eml.cc
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: is there a way to interact with Spark clusters remotely?

2014-12-15 Thread François Le Lay
Have you seen the recent announcement around Spark Kernel using IPython/0MQ
protocol ?
https://github.com/ibm-et/spark-kernel



On Mon, Dec 15, 2014 at 12:06 PM, Akhil Das 
wrote:
>
> Hi Xiaoyong,
>
> You could refer this post if you are looking on how to run spark jobs
> remotely
> http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-td16989.html
>
> You will of course require network access to the cluster.
>
> Thanks
> Best Regards
>
> On Mon, Dec 15, 2014 at 7:47 PM, Xiaoyong Zhu 
> wrote:
>>
>>  Hi experts
>>
>>
>>
>> I am wondering if there is a way to interactive with Spark remotely? i.e.
>> no access to clusters required but submit Python/Scala scripts to cluster
>> and get result based on (REST) APIs.
>>
>> That will facilitate the development process a lot..
>>
>>
>>
>> Xiaoyong
>>
>

-- 
François /fly Le Lay - @lelayf
Data Engineering Chapter Lead
IO Tribe NYC
Phone : +1 (646)-656-0075


Re: MLLIB model export: PMML vs MLLIB serialization

2014-12-15 Thread sourabh
Thanks Vincenzo.
Are you trying out all the models implemented in mllib? Actually I don't
see decision tree there. Sorry if I missed it. When are you planning to
merge this to spark branch?

Thanks
Sourabh

On Sun, Dec 14, 2014 at 5:54 PM, selvinsource [via Apache Spark User List] <
ml-node+s1001560n20674...@n3.nabble.com> wrote:
>
> Hi Sourabh,
>
> have a look at https://issues.apache.org/jira/browse/SPARK-1406, I am
> looking into exporting models in PMML using JPMML.
>
> Regards,
> Vincenzo
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20674.html
>  To unsubscribe from MLLIB model export: PMML vs MLLIB serialization, click
> here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Serialize mllib's MatrixFactorizationModel

2014-12-15 Thread sourabh chaki
Hi Albert,
There is some discussion going on here:
http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tc20324.html#a20674
I am also looking for this solution.But looks like until mllib pmml export
is ready, there is no full proof solution to export the mllib trained model
to a different system.

Thanks
Sourabh

On Mon, Dec 15, 2014 at 10:39 PM, Albert Manyà  wrote:
>
> In that case, what is the strategy to train a model in some background
> batch process and make recommendations for some other service in real
> time? Run both processes in the same spark cluster?
>
> Thanks.
>
> --
>   Albert Manyà
>   alber...@eml.cc
>
> On Mon, Dec 15, 2014, at 05:58 PM, Sean Owen wrote:
> > This class is not going to be serializable, as it contains huge RDDs.
> > Even if the right constructor existed the RDDs inside would not
> > serialize.
> >
> > On Mon, Dec 15, 2014 at 4:33 PM, Albert Manyà  wrote:
> > > Hi all.
> > >
> > > I'm willing to serialize and later load a model trained using mllib's
> > > ALS.
> > >
> > > I've tried usign Java serialization with something like:
> > >
> > > val model = ALS.trainImplicit(training, rank, numIter, lambda, 1)
> > > val fos = new FileOutputStream("model.bin")
> > > val oos = new ObjectOutputStream(fos)
> > > oos.writeObject(bestModel.get)
> > >
> > > But when I try to deserialize it using:
> > >
> > > val fos = new FileInputStream("model.bin")
> > > val oos = new ObjectInputStream(fos)
> > > val model = oos.readObject().asInstanceOf[MatrixFactorizationModel]
> > >
> > >  I get the error:
> > >
> > > Exception in thread "main" java.io.IOException: PARSING_ERROR(2)
> > >
> > > I've also tried to serialize MatrixFactorizationModel's both RDDs
> > > (products and users) and later create the MatrixFactorizationModel by
> > > hand passing the RDDs by constructor but I get an error cause its
> > > private:
> > >
> > > Error:(58, 17) constructor MatrixFactorizationModel in class
> > > MatrixFactorizationModel cannot be accessed in object RecommendALS
> > > val model = new MatrixFactorizationModel (8, userFeatures,
> > > productFeatures)
> > >
> > > Any ideas?
> > >
> > > Thanks!
> > >
> > > --
> > >   Albert Manyà
> > >   alber...@eml.cc
> > >
> > > -
> > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > > For additional commands, e-mail: user-h...@spark.apache.org
> > >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Serialize mllib's MatrixFactorizationModel

2014-12-15 Thread Sean Owen
The thing about MatrixFactorizationModel, compared to other models, is
that it is huge. It's not just a few coefficients, but whole RDDs of
coefficients. I think you could save these RDDs of user/product
factors to persistent storage, load them, then recreate the
MatrixFactorizationModel that way. It's a bit manual, but works.

This is probably why there is no standard PMML representation for this
type of model. It is different from classic regression/classification
models, and too big for XML. So efforts to export/import PMML are not
relevant IMHO.

On Mon, Dec 15, 2014 at 5:09 PM, Albert Manyà  wrote:
> In that case, what is the strategy to train a model in some background
> batch process and make recommendations for some other service in real
> time? Run both processes in the same spark cluster?
>
> Thanks.
>
> --
>   Albert Manyà
>   alber...@eml.cc
>
> On Mon, Dec 15, 2014, at 05:58 PM, Sean Owen wrote:
>> This class is not going to be serializable, as it contains huge RDDs.
>> Even if the right constructor existed the RDDs inside would not
>> serialize.
>>
>> On Mon, Dec 15, 2014 at 4:33 PM, Albert Manyà  wrote:
>> > Hi all.
>> >
>> > I'm willing to serialize and later load a model trained using mllib's
>> > ALS.
>> >
>> > I've tried usign Java serialization with something like:
>> >
>> > val model = ALS.trainImplicit(training, rank, numIter, lambda, 1)
>> > val fos = new FileOutputStream("model.bin")
>> > val oos = new ObjectOutputStream(fos)
>> > oos.writeObject(bestModel.get)
>> >
>> > But when I try to deserialize it using:
>> >
>> > val fos = new FileInputStream("model.bin")
>> > val oos = new ObjectInputStream(fos)
>> > val model = oos.readObject().asInstanceOf[MatrixFactorizationModel]
>> >
>> >  I get the error:
>> >
>> > Exception in thread "main" java.io.IOException: PARSING_ERROR(2)
>> >
>> > I've also tried to serialize MatrixFactorizationModel's both RDDs
>> > (products and users) and later create the MatrixFactorizationModel by
>> > hand passing the RDDs by constructor but I get an error cause its
>> > private:
>> >
>> > Error:(58, 17) constructor MatrixFactorizationModel in class
>> > MatrixFactorizationModel cannot be accessed in object RecommendALS
>> > val model = new MatrixFactorizationModel (8, userFeatures,
>> > productFeatures)
>> >
>> > Any ideas?
>> >
>> > Thanks!
>> >
>> > --
>> >   Albert Manyà
>> >   alber...@eml.cc
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Custom UDTF with Lateral View throws ClassNotFound exception in Spark SQL CLI

2014-12-15 Thread shenghua
Hello, 
I met a problem when using Spark sql CLI. A custom UDTF with lateral view
throws ClassNotFound exception. I did a couple of experiments in same
environment (spark version 1.1.1): 
select + same custom UDTF (Passed)
select + lateral view + custom UDTF (ClassNotFoundException)
select + lateral view + built-in UDTF (Passed)

I have done some googling there days and found one related issue ticket of
Spark 
https://issues.apache.org/jira/browse/SPARK-4811
which is about "Custom UDTFs not working in Spark SQL".

It should be helpful to put actual code here to reproduce the problem.
However,  corporate regulations might prohibit this. So sorry about this.
Directly using explode's source code in a jar will help anyway.

Here is a portion of stack print when exception, just in case:
java.lang.ClassNotFoundException: XXX
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at
org.apache.spark.sql.hive.HiveFunctionFactory$class.createFunction(hiveUdfs.scala:81)
at
org.apache.spark.sql.hive.HiveGenericUdtf.createFunction(hiveUdfs.scala:247)
at
org.apache.spark.sql.hive.HiveGenericUdtf.function$lzycompute(hiveUdfs.scala:254)
at 
org.apache.spark.sql.hive.HiveGenericUdtf.function(hiveUdfs.scala:254)
at
org.apache.spark.sql.hive.HiveGenericUdtf.outputInspectors$lzycompute(hiveUdfs.scala:261)
at
org.apache.spark.sql.hive.HiveGenericUdtf.outputInspectors(hiveUdfs.scala:260)
at
org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes$lzycompute(hiveUdfs.scala:265)
at
org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes(hiveUdfs.scala:265)
at 
org.apache.spark.sql.hive.HiveGenericUdtf.makeOutput(hiveUdfs.scala:269)
at
org.apache.spark.sql.catalyst.expressions.Generator.output(generators.scala:60)
at
org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50)
at
org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.sql.catalyst.plans.logical.Generate.generatorOutput(basicOperators.scala:50)
at
org.apache.spark.sql.catalyst.plans.logical.Generate.output(basicOperators.scala:60)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:79)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:79)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
the rest is omitted.

Thank you.

Shenghua




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-UDTF-with-Lateral-View-throws-ClassNotFound-exception-in-Spark-SQL-CLI-tp20689.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: custom spark app name in yarn-cluster mode

2014-12-15 Thread Tomer Benyamini
Thanks Sandy, passing --name works fine :)

Tomer

On Fri, Dec 12, 2014 at 9:35 AM, Sandy Ryza  wrote:
>
> Hi Tomer,
>
> In yarn-cluster mode, the application has already been submitted to YARN
> by the time the SparkContext is created, so it's too late to set the app
> name there.  I believe giving it with the --name property to spark-submit
> should work.
>
> -Sandy
>
> On Thu, Dec 11, 2014 at 10:28 AM, Tomer Benyamini 
> wrote:
>>
>>
>>
>> On Thu, Dec 11, 2014 at 8:27 PM, Tomer Benyamini 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to set a custom spark app name when running a java spark app
>>> in yarn-cluster mode.
>>>
>>>  SparkConf sparkConf = new SparkConf();
>>>
>>>  sparkConf.setMaster(System.getProperty("spark.master"));
>>>
>>>  sparkConf.setAppName("myCustomName");
>>>
>>>  sparkConf.set("spark.logConf", "true");
>>>
>>>  JavaSparkContext sc = new JavaSparkContext(sparkConf);
>>>
>>>
>>> Apparently this only works when running in yarn-client mode; in
>>> yarn-cluster mode the app name is the class name, when viewing the app in
>>> the cluster manager UI. Any idea?
>>>
>>>
>>> Thanks,
>>>
>>> Tomer
>>>
>>>
>>>
>>


Accessing rows of a row in Spark

2014-12-15 Thread Jerry Lam
Hi spark users,

Do you know how to access rows of row?

I have a SchemaRDD called user and register it as a table with the
following schema:

root
 |-- user_id: string (nullable = true)
 |-- item: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- item_id: string (nullable = true)
 |||-- name: string (nullable = true)


val items=sqlContext.sql("select items from user where user_id = 1").first

The type of items is org.apache.spark.sql.Row. I want to iterate through
the items and count how many items that user_id = 1 has.

I could not find a method in which I can do that. The farthest I can get to
is to convert items.toSeq. The type information I got back is:

scala> items.toSeq
res57: Seq[Any] = [WrappedArray([1,orange],[2,apple])]

Any suggestion?

Best Regards,

Jerry


Re: JSON Input files

2014-12-15 Thread Michael Armbrust
Underneath the covers, jsonFile uses TextInputFormat, which will split
files correctly based on new lines.  Thus, there is no fixed maximum size
for a json object (other than the fact that it must fit into memory on the
executors).

On Mon, Dec 15, 2014 at 7:22 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:
>
> Hi Peter,
>
> Thank you for the clarification.
>
> Now we need to store each JSON object into one line. Is there any
> limitation of length of JSON object? So, JSON object will not go to the
> next line.
>
> What will happen if JSON object is a big/huge one?  Will it store in a
> single line in HDFS?
>
> What will happen, if JSON object contains BLOB/CLOB value? Is this entire
> JSON object stores in single line of HDFS?
>
> What will happen, if JSON object exceeding the HDFS block size. For
> example, single JSON object split into two different worker nodes. In this
> case, How Spark will read this JSON object?
>
> Could you please clarify above questions
>
> Regards,
> Rajesh
>
>
> On Mon, Dec 15, 2014 at 6:52 PM, Peter Vandenabeele <
> pe...@vandenabeele.com> wrote:
>>
>>
>>
>> On Sat, Dec 13, 2014 at 5:43 PM, Helena Edelson <
>> helena.edel...@datastax.com> wrote:
>>
>>> One solution can be found here:
>>> https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets
>>>
>>>
>> As far as I understand, the people.json file is not really a proper json
>> file, but a file documented as:
>>
>>   "... JSON files where each line of the files is a JSON object.".
>>
>> This means that is a file with multiple lines, but each line needs to
>> have a fully self-contained JSON object
>> (initially confusing, this will not parse a standard multi-line JSON
>> file). We are working to clarify this in
>> https://github.com/apache/spark/pull/3517
>>
>> HTH,
>>
>> Peter
>>
>>
>>
>>
>>> - Helena
>>> @helenaedelson
>>>
>>> On Dec 13, 2014, at 11:18 AM, Madabhattula Rajesh Kumar <
>>> mrajaf...@gmail.com> wrote:
>>>
>>> Hi Team,
>>>
>>> I have a large JSON file in Hadoop. Could you please let me know
>>>
>>> 1. How to read the JSON file
>>> 2. How to parse the JSON file
>>>
>>> Please share any example program based on Scala
>>>
>>> Regards,
>>> Rajesh
>>>
>>>
>>>
>>
>>
>> --
>> Peter Vandenabeele
>> http://www.allthingsdata.io
>> http://www.linkedin.com/in/petervandenabeele
>> https://twitter.com/peter_v
>> gsm: +32-478-27.40.69
>> e-mail: pe...@vandenabeele.com
>> skype: peter_v_be
>>
>


Re: Custom UDTF with Lateral View throws ClassNotFound exception in Spark SQL CLI

2014-12-15 Thread Michael Armbrust
Can you add this information to the JIRA?

On Mon, Dec 15, 2014 at 10:54 AM, shenghua  wrote:
>
> Hello,
> I met a problem when using Spark sql CLI. A custom UDTF with lateral view
> throws ClassNotFound exception. I did a couple of experiments in same
> environment (spark version 1.1.1):
> select + same custom UDTF (Passed)
> select + lateral view + custom UDTF (ClassNotFoundException)
> select + lateral view + built-in UDTF (Passed)
>
> I have done some googling there days and found one related issue ticket of
> Spark
> https://issues.apache.org/jira/browse/SPARK-4811
> which is about "Custom UDTFs not working in Spark SQL".
>
> It should be helpful to put actual code here to reproduce the problem.
> However,  corporate regulations might prohibit this. So sorry about this.
> Directly using explode's source code in a jar will help anyway.
>
> Here is a portion of stack print when exception, just in case:
> java.lang.ClassNotFoundException: XXX
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at
>
> org.apache.spark.sql.hive.HiveFunctionFactory$class.createFunction(hiveUdfs.scala:81)
> at
>
> org.apache.spark.sql.hive.HiveGenericUdtf.createFunction(hiveUdfs.scala:247)
> at
>
> org.apache.spark.sql.hive.HiveGenericUdtf.function$lzycompute(hiveUdfs.scala:254)
> at
> org.apache.spark.sql.hive.HiveGenericUdtf.function(hiveUdfs.scala:254)
> at
>
> org.apache.spark.sql.hive.HiveGenericUdtf.outputInspectors$lzycompute(hiveUdfs.scala:261)
> at
>
> org.apache.spark.sql.hive.HiveGenericUdtf.outputInspectors(hiveUdfs.scala:260)
> at
>
> org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes$lzycompute(hiveUdfs.scala:265)
> at
>
> org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes(hiveUdfs.scala:265)
> at
> org.apache.spark.sql.hive.HiveGenericUdtf.makeOutput(hiveUdfs.scala:269)
> at
>
> org.apache.spark.sql.catalyst.expressions.Generator.output(generators.scala:60)
> at
>
> org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50)
> at
>
> org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50)
> at scala.Option.map(Option.scala:145)
> at
>
> org.apache.spark.sql.catalyst.plans.logical.Generate.generatorOutput(basicOperators.scala:50)
> at
>
> org.apache.spark.sql.catalyst.plans.logical.Generate.output(basicOperators.scala:60)
> at
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:79)
> at
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:79)
> at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> the rest is omitted.
>
> Thank you.
>
> Shenghua
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Custom-UDTF-with-Lateral-View-throws-ClassNotFound-exception-in-Spark-SQL-CLI-tp20689.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Accessing rows of a row in Spark

2014-12-15 Thread Mark Hamstra
scala> val items = Row(1 -> "orange", 2 -> "apple")

items: org.apache.spark.sql.catalyst.expressions.Row =
[(1,orange),(2,apple)]


If you literally want an iterator, then this:


scala> items.toIterator.count { case (user_id, name) => user_id == 1 }

res0: Int = 1


...else:


scala> items.count { case (user_id, name) => user_id == 1 }

res1: Int = 1

On Mon, Dec 15, 2014 at 11:04 AM, Jerry Lam  wrote:
>
> Hi spark users,
>
> Do you know how to access rows of row?
>
> I have a SchemaRDD called user and register it as a table with the
> following schema:
>
> root
>  |-- user_id: string (nullable = true)
>  |-- item: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- item_id: string (nullable = true)
>  |||-- name: string (nullable = true)
>
>
> val items=sqlContext.sql("select items from user where user_id = 1").first
>
> The type of items is org.apache.spark.sql.Row. I want to iterate through
> the items and count how many items that user_id = 1 has.
>
> I could not find a method in which I can do that. The farthest I can get
> to is to convert items.toSeq. The type information I got back is:
>
> scala> items.toSeq
> res57: Seq[Any] = [WrappedArray([1,orange],[2,apple])]
>
> Any suggestion?
>
> Best Regards,
>
> Jerry
>


Re: Intermittent test failures

2014-12-15 Thread Michael Armbrust
Is it possible that you are starting more than one SparkContext in a single
JVM with out stopping previous ones?  I'd try testing with Spark 1.2, which
will throw an exception in this case.

On Mon, Dec 15, 2014 at 8:48 AM, Marius Soutier  wrote:
>
> Hi,
>
> I’m seeing strange, random errors when running unit tests for my Spark
> jobs. In this particular case I’m using Spark SQL to read and write Parquet
> files, and one error that I keep running into is this one:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 19
> in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage
> 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
> org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
> org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
>
> I can only prevent this from happening by using isolated Specs tests thats
> always create a new SparkContext that is not shared between tests (but
> there can also be only a single SparkContext per test), and also by using
> standard SQLContext instead of HiveContext. It does not seem to have
> anything to do with the actual files that I also create during the test run
> with SQLContext.saveAsParquetFile.
>
>
> Cheers
> - Marius
>
>
> PS The full trace:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 19
> in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage
> 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
> org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
> org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
>
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
>
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
>
> org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
>
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
>
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232)
>
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)
>
> org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)
> sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
>
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
> ~[spark-core_2.10-1.1.1.jar:1.1.1]
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
> ~[spark-core_2.10-1.1.1.jar:1.1.1]
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
> ~[spark-core_2.10-1.1.1.jar:1.1.1]
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[scala-library.jar:na]
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> ~[scala-library.jar:na]
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
> ~[spark-core_2.10-1.1.1.jar:1.1.1]
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark metrics for ganglia

2014-12-15 Thread danilopds
Thanks tsingfu,

I used this configuration based in your post: (with ganglia unicast mode)
# Enable GangliaSink for all instances 
*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink 
*.sink.ganglia.host=10.0.0.7
*.sink.ganglia.port=8649 
*.sink.ganglia.period=15
*.sink.ganglia.unit=seconds 
*.sink.ganglia.ttl=1 
*.sink.ganglia.mode=unicast

Then,
I have the following error now.
ERROR metrics.MetricsSystem: Sink class
org.apache.spark.metrics.sink.GangliaSink  cannot be instantialized
java.lang.ClassNotFoundException: org.apache.spark.metrics.sink.GangliaSink





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-metrics-for-ganglia-tp14335p20690.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: printing mllib.linalg.vector

2014-12-15 Thread Xiangrui Meng
you can use the default toString method to get the string
representation. if you want to customized, check the indices/values
fields. -Xiangrui

On Fri, Dec 5, 2014 at 7:32 AM, debbie  wrote:
> Basic question:
>
> What is the best way to loop through one of these and print their
> components? Convert them to an array?
>
> Thanks
>
> Deb

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accessing rows of a row in Spark

2014-12-15 Thread Jerry Lam
Hi Mark,

Thank you for helping out.

The items I got back from Spark SQL has the type information as follows:

scala> items
res16: org.apache.spark.sql.Row = [WrappedArray([1,orange],[2,apple])]

I tried to iterate the items as you suggested but no luck.

Best Regards,

Jerry


On Mon, Dec 15, 2014 at 2:18 PM, Mark Hamstra 
wrote:
>
> scala> val items = Row(1 -> "orange", 2 -> "apple")
>
> items: org.apache.spark.sql.catalyst.expressions.Row =
> [(1,orange),(2,apple)]
>
>
> If you literally want an iterator, then this:
>
>
> scala> items.toIterator.count { case (user_id, name) => user_id == 1 }
>
> res0: Int = 1
>
>
> ...else:
>
>
> scala> items.count { case (user_id, name) => user_id == 1 }
>
> res1: Int = 1
>
> On Mon, Dec 15, 2014 at 11:04 AM, Jerry Lam  wrote:
>>
>> Hi spark users,
>>
>> Do you know how to access rows of row?
>>
>> I have a SchemaRDD called user and register it as a table with the
>> following schema:
>>
>> root
>>  |-- user_id: string (nullable = true)
>>  |-- item: array (nullable = true)
>>  ||-- element: struct (containsNull = false)
>>  |||-- item_id: string (nullable = true)
>>  |||-- name: string (nullable = true)
>>
>>
>> val items=sqlContext.sql("select items from user where user_id = 1").first
>>
>> The type of items is org.apache.spark.sql.Row. I want to iterate through
>> the items and count how many items that user_id = 1 has.
>>
>> I could not find a method in which I can do that. The farthest I can get
>> to is to convert items.toSeq. The type information I got back is:
>>
>> scala> items.toSeq
>> res57: Seq[Any] = [WrappedArray([1,orange],[2,apple])]
>>
>> Any suggestion?
>>
>> Best Regards,
>>
>> Jerry
>>
>


Re: MLlib(Logistic Regression) + Spark Streaming.

2014-12-15 Thread Xiangrui Meng
If you want to train offline and predict online, you can use the
current LR implementation to train a model and then apply
model.predict on the dstream. -Xiangrui

On Sun, Dec 7, 2014 at 6:30 PM, Nasir Khan  wrote:
> I am new to spark.
> Lets say i want to develop a machine learning model. which trained on normal
> method in MLlib. I want to use that model with classifier Logistic
> regression and predict the streaming data coming from a file or socket.
>
>
> Streaming data -> Logistic Regression -> binary label prediction.
>
> Is it possible? since there is no streaming logistic regression algo like
> streaming linear regression.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Logistic-Regression-Spark-Streaming-tp20564.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLLIb: Linear regression: Loss was due to java.lang.ArrayIndexOutOfBoundsException

2014-12-15 Thread Xiangrui Meng
Is it possible that after filtering the feature dimension changed?
This may happen if you use LIBSVM format but didn't specify the number
of features. -Xiangrui

On Tue, Dec 9, 2014 at 4:54 AM, Sameer Tilak  wrote:
> Hi All,
>
>
> I was able to run LinearRegressionwithSGD for a largeer dataset (> 2GB
> sparse). I have now filtered the data and I am running regression on a
> subset of it  (~ 200 MB). I see this error, which is strange since it was
> running fine with the superset data. Is this a formatting issue (which I
> doubt) or is this some other issue in data preparation? I confirmed that
> there is no empty line in my dataset. Any help with this will be highly
> appreciated.
>
>
> 14/12/08 20:32:03 WARN TaskSetManager: Lost TID 5 (task 3.0:1)
>
> 14/12/08 20:32:03 WARN TaskSetManager: Loss was due to
> java.lang.ArrayIndexOutOfBoundsException
>
> java.lang.ArrayIndexOutOfBoundsException: 150323
>
> at
> breeze.linalg.operators.DenseVector_SparseVector_Ops$$anon$129.apply(SparseVectorOps.scala:231)
>
> at
> breeze.linalg.operators.DenseVector_SparseVector_Ops$$anon$129.apply(SparseVectorOps.scala:216)
>
> at breeze.linalg.operators.BinaryRegistry$class.apply(BinaryOp.scala:60)
>
> at breeze.linalg.VectorOps$$anon$178.apply(Vector.scala:391)
>
> at breeze.linalg.NumericOps$class.dot(NumericOps.scala:83)
>
> at breeze.linalg.DenseVector.dot(DenseVector.scala:47)
>
> at
> org.apache.spark.mllib.optimization.LeastSquaresGradient.compute(Gradient.scala:125)
>
> at
> org.apache.spark.mllib.optimization.GradientDescent$$anonfun$runMiniBatchSGD$1$$anonfun$1.apply(GradientDescent.scala:180)
>
> at
> org.apache.spark.mllib.optimization.GradientDescent$$anonfun$runMiniBatchSGD$1$$anonfun$1.apply(GradientDescent.scala:179)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>
> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>
> at
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>
> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>
> at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)
>
> at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)
>
> at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116)
>
> at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why KMeans with mllib is so slow ?

2014-12-15 Thread Xiangrui Meng
Please check the number of partitions after sc.textFile. Use
sc.textFile('...', 8) to have at least 8 partitions. -Xiangrui

On Tue, Dec 9, 2014 at 4:58 AM, DB Tsai  wrote:
> You just need to use the latest master code without any configuration
> to get performance improvement from my PR.
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Mon, Dec 8, 2014 at 7:53 AM, Jaonary Rabarisoa  wrote:
>> After some investigation, I learned that I can't compare kmeans in mllib
>> with another kmeans implementation directly. The kmeans|| initialization
>> step takes more time than the algorithm implemented in julia for example.
>> There is also the ability to run multiple runs of kmeans algorithm in mllib
>> even by default the number of runs is 1.
>>
>> DB Tsai can you please tell me the configuration you took for the
>> improvement you mention in your pull request. I'd like to run the same
>> benchmark on mnist8m on my computer.
>>
>>
>> Cheers;
>>
>>
>>
>> On Fri, Dec 5, 2014 at 10:34 PM, DB Tsai  wrote:
>>>
>>> Also, are you using the latest master in this experiment? A PR merged
>>> into the master couple days ago will spend up the k-means three times.
>>> See
>>>
>>>
>>> https://github.com/apache/spark/commit/7fc49ed91168999d24ae7b4cc46fbb4ec87febc1
>>>
>>> Sincerely,
>>>
>>> DB Tsai
>>> ---
>>> My Blog: https://www.dbtsai.com
>>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>>
>>>
>>> On Fri, Dec 5, 2014 at 9:36 AM, Jaonary Rabarisoa 
>>> wrote:
>>> > The code is really simple :
>>> >
>>> > object TestKMeans {
>>> >
>>> >   def main(args: Array[String]) {
>>> >
>>> > val conf = new SparkConf()
>>> >   .setAppName("Test KMeans")
>>> >   .setMaster("local[8]")
>>> >   .set("spark.executor.memory", "8g")
>>> >
>>> > val sc = new SparkContext(conf)
>>> >
>>> > val numClusters = 500;
>>> > val numIterations = 2;
>>> >
>>> >
>>> > val data = sc.textFile("sample.csv").map(x =>
>>> > Vectors.dense(x.split(',').map(_.toDouble)))
>>> > data.cache()
>>> >
>>> >
>>> > val clusters = KMeans.train(data, numClusters, numIterations)
>>> >
>>> > println(clusters.clusterCenters.size)
>>> >
>>> > val wssse = clusters.computeCost(data)
>>> > println(s"error : $wssse")
>>> >
>>> >   }
>>> > }
>>> >
>>> >
>>> > For the testing purpose, I was generating a sample random data with
>>> > julia
>>> > and store it in a csv file delimited by comma. The dimensions is 248000
>>> > x
>>> > 384.
>>> >
>>> > In the target application, I will have more than 248k data to cluster.
>>> >
>>> >
>>> > On Fri, Dec 5, 2014 at 6:03 PM, Davies Liu 
>>> > wrote:
>>> >>
>>> >> Could you post you script to reproduce the results (also how to
>>> >> generate the dataset)? That will help us to investigate it.
>>> >>
>>> >> On Fri, Dec 5, 2014 at 8:40 AM, Jaonary Rabarisoa 
>>> >> wrote:
>>> >> > Hmm, here I use spark on local mode on my laptop with 8 cores. The
>>> >> > data
>>> >> > is
>>> >> > on my local filesystem. Event thought, there an overhead due to the
>>> >> > distributed computation, I found the difference between the runtime
>>> >> > of
>>> >> > the
>>> >> > two implementations really, really huge. Is there a benchmark on how
>>> >> > well
>>> >> > the algorithm implemented in mllib performs ?
>>> >> >
>>> >> > On Fri, Dec 5, 2014 at 4:56 PM, Sean Owen  wrote:
>>> >> >>
>>> >> >> Spark has much more overhead, since it's set up to distribute the
>>> >> >> computation. Julia isn't distributed, and so has no such overhead in
>>> >> >> a
>>> >> >> completely in-core implementation. You generally use Spark when you
>>> >> >> have a problem large enough to warrant distributing, or, your data
>>> >> >> already lives in a distributed store like HDFS.
>>> >> >>
>>> >> >> But it's also possible you're not configuring the implementations
>>> >> >> the
>>> >> >> same way, yes. There's not enough info here really to say.
>>> >> >>
>>> >> >> On Fri, Dec 5, 2014 at 9:50 AM, Jaonary Rabarisoa
>>> >> >> 
>>> >> >> wrote:
>>> >> >> > Hi all,
>>> >> >> >
>>> >> >> > I'm trying to a run clustering with kmeans algorithm. The size of
>>> >> >> > my
>>> >> >> > data
>>> >> >> > set is about 240k vectors of dimension 384.
>>> >> >> >
>>> >> >> > Solving the problem with the kmeans available in julia (kmean++)
>>> >> >> >
>>> >> >> > http://clusteringjl.readthedocs.org/en/latest/kmeans.html
>>> >> >> >
>>> >> >> > take about 8 minutes on a single core.
>>> >> >> >
>>> >> >> > Solving the same problem with spark kmean|| take more than 1.5
>>> >> >> > hours
>>> >> >> > with 8
>>> >> >> > cores
>>> >> >> >
>>> >> >> > Either they don't implement the same algorithm either I don't
>>> >> >> > understand
>>> >> >> > how
>>> >> >> > the kmeans in spark works. Is my data not big enough to take full
>>> >> >> > advantage
>>> >> >> > of spark ? At leas

Re: Stack overflow Error while executing spark SQL

2014-12-15 Thread Xiangrui Meng
Could you post the full stacktrace? It seems to be some recursive call
in parsing. -Xiangrui

On Tue, Dec 9, 2014 at 7:44 PM,   wrote:
> Hi
>
>
>
> I am getting Stack overflow Error
>
> Exception in main java.lang.stackoverflowerror
>
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>
>at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>
>at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>
>at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>
>at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>
>at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>
>
> while executing the following code
>
> sqlContext.sql("SELECT text FROM tweetTable LIMIT
> 10").collect().foreach(println)
>
>
>
> The complete code is from github
>
> https://github.com/databricks/reference-apps/blob/master/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/ExamineAndTrain.scala
>
>
>
> import com.google.gson.{GsonBuilder, JsonParser}
>
> import org.apache.spark.mllib.clustering.KMeans
>
> import org.apache.spark.sql.SQLContext
>
> import org.apache.spark.{SparkConf, SparkContext}
>
> import org.apache.spark.mllib.clustering.KMeans
>
> /**
>
> * Examine the collected tweets and trains a model based on them.
>
> */
>
> object ExamineAndTrain {
>
> val jsonParser = new JsonParser()
>
> val gson = new GsonBuilder().setPrettyPrinting().create()
>
> def main(args: Array[String]) {
>
> // Process program arguments and set properties
>
> /*if (args.length < 3) {
>
> System.err.println("Usage: " + this.getClass.getSimpleName +
>
> "")
>
> System.exit(1)
>
> }
>
> *
>
> */
>
>val outputModelDir="C:\\MLModel"
>
>  val tweetInput="C:\\MLInput"
>
>val numClusters=10
>
>val numIterations=20
>
>
>
> //val Array(tweetInput, outputModelDir, Utils.IntParam(numClusters),
> Utils.IntParam(numIterations)) = args
>
>
>
> val conf = new
> SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[4]")
>
> val sc = new SparkContext(conf)
>
> val sqlContext = new SQLContext(sc)
>
> // Pretty print some of the tweets.
>
> val tweets = sc.textFile(tweetInput)
>
> println("Sample JSON Tweets---")
>
> for (tweet <- tweets.take(5)) {
>
> println(gson.toJson(jsonParser.parse(tweet)))
>
> }
>
> val tweetTable = sqlContext.jsonFile(tweetInput).cache()
>
> tweetTable.registerTempTable("tweetTable")
>
> println("--Tweet table Schema---")
>
> tweetTable.printSchema()
>
> println("Sample Tweet Text-")
>
>
>
> sqlContext.sql("SELECT text FROM tweetTable LIMIT
> 10").collect().foreach(println)
>
>
>
>
>
>
>
> println("--Sample Lang, Name, text---")
>
> sqlContext.sql("SELECT user.lang, user.name, text FROM tweetTable LIMIT
> 1000").collect().foreach(println)
>
> println("--Total count by languages Lang, count(*)---")
>
> sqlContext.sql("SELECT user.lang, COUNT(*) as cnt FROM tweetTable GROUP BY
> user.lang ORDER BY cnt DESC LIMIT 25").collect.foreach(println)
>
> println("--- Training the model and persist it")
>
> val texts = sqlContext.sql("SELECT text from
> tweetTable").map(_.head.toString)
>
> // Cache the vectors RDD since it will be used for all the KMeans
> iterations.
>
> val vectors = texts.map(Utils.featurize).cache()
>
> vectors.count() // Calls an action on the RDD to populate the vectors cache.
>
> val model = KMeans.train(vectors, numClusters, numIterations)
>
> sc.makeRDD(model.clusterCenters,
> numClusters).saveAsObjectFile(outputModelDir)
>
> val some_tweets = texts.take(100)
>
> println("Example tweets from the clusters")
>
> for (i <- 0 until numClusters) {
>
> println(s"\nCLUSTER $i:")
>
> some_tweets.foreach { t =>
>
> if (model.predict(Utils.featurize(t)) == i) {
>
> println(t)
>
> }
>
> }
>
> }
>
> }
>
> }
>
>
>
> Thanks & Regards
>
> Jishnu Menath Prathap
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accessing rows of a row in Spark

2014-12-15 Thread Mark Hamstra
Looks like you've got one more layer of containment than you intend -- i.e.
you've got Row[WrappedArray[Row[(Int, String)]] where you want
Row[Row[(Int, String)]].  That's easy to do if somewhere along the line you
did something like `val row = Row(collection)` instead of `val row =
Row.fromSeq(collection)`.

On Mon, Dec 15, 2014 at 11:47 AM, Jerry Lam  wrote:
>
> Hi Mark,
>
> Thank you for helping out.
>
> The items I got back from Spark SQL has the type information as follows:
>
> scala> items
> res16: org.apache.spark.sql.Row = [WrappedArray([1,orange],[2,apple])]
>
> I tried to iterate the items as you suggested but no luck.
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Dec 15, 2014 at 2:18 PM, Mark Hamstra 
> wrote:
>>
>> scala> val items = Row(1 -> "orange", 2 -> "apple")
>>
>> items: org.apache.spark.sql.catalyst.expressions.Row =
>> [(1,orange),(2,apple)]
>>
>>
>> If you literally want an iterator, then this:
>>
>>
>> scala> items.toIterator.count { case (user_id, name) => user_id == 1 }
>>
>> res0: Int = 1
>>
>>
>> ...else:
>>
>>
>> scala> items.count { case (user_id, name) => user_id == 1 }
>>
>> res1: Int = 1
>>
>> On Mon, Dec 15, 2014 at 11:04 AM, Jerry Lam  wrote:
>>>
>>> Hi spark users,
>>>
>>> Do you know how to access rows of row?
>>>
>>> I have a SchemaRDD called user and register it as a table with the
>>> following schema:
>>>
>>> root
>>>  |-- user_id: string (nullable = true)
>>>  |-- item: array (nullable = true)
>>>  ||-- element: struct (containsNull = false)
>>>  |||-- item_id: string (nullable = true)
>>>  |||-- name: string (nullable = true)
>>>
>>>
>>> val items=sqlContext.sql("select items from user where user_id =
>>> 1").first
>>>
>>> The type of items is org.apache.spark.sql.Row. I want to iterate through
>>> the items and count how many items that user_id = 1 has.
>>>
>>> I could not find a method in which I can do that. The farthest I can get
>>> to is to convert items.toSeq. The type information I got back is:
>>>
>>> scala> items.toSeq
>>> res57: Seq[Any] = [WrappedArray([1,orange],[2,apple])]
>>>
>>> Any suggestion?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>


Re: what is the best way to implement mini batches?

2014-12-15 Thread Imran Rashid
I'm a little confused by some of the responses.  It seems like there are
two different issues being discussed here:

1.  How to turn a sequential algorithm into something that works on spark.
Eg deal with the fact that data is split into partitions which are
processed in parallel (though within a partition, data is processed
sequentially).  I'm guessing folks are particularly interested in online
machine learning algos, which often have a point update and a mini batch
update.

2.  How to convert a one-point-at-a-time view of the data and convert it
into a mini batches view of the data.

(2) is pretty straightforward, eg with iterator.grouped (batchSize), or
manually put data into your own buffer etc.  This works for creating mini
batches *within* one partition in the context of spark.

But problem (1) is completely separate, and there is no general solution.
It really depends the specifics of what you're trying to do.

Some of the suggestions on this thread seem like they are basically just
falling back to sequential data processing ... but reay inefficient
sequential processing.  Eg.  It doesn't make sense to do a full scan of
your data with spark, and ignore all the records but the few that are in
the next mini batch.

It's completely reasonable to just sequentially process all the data if
that works for you.  But then it doesn't make sense to use spark, you're
not gaining anything from it.

Hope this helps, apologies if I just misunderstood the other suggested
solutions.
On Dec 14, 2014 8:35 PM, "Earthson"  wrote:

> I think it could be done like:
>
> 1. using mapPartition to randomly drop some partition
> 2. drop some elements randomly(for selected partition)
> 3. calculate gradient step for selected elements
>
> I don't think fixed step is needed, but fixed step could be done:
>
> 1. zipWithIndex
> 2. create ShuffleRDD based on the index(eg. using index/10 as key)
> 3. using mapPartition to calculate each bach
>
> I also have a question:
>
> Can mini batches run in parallel?
> I think parallel all batches just like a full batch GD in some case.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Building Desktop application for ALS-MlLib/ Training ALS

2014-12-15 Thread Xiangrui Meng
On Sun, Dec 14, 2014 at 3:06 AM, Saurabh Agrawal
 wrote:
>
>
> Hi,
>
>
>
> I am a new bee in spark and scala world
>
>
>
> I have been trying to implement Collaborative filtering using MlLib supplied
> out of the box with Spark and Scala
>
>
>
> I have 2 problems
>
>
>
> 1.   The best model was trained with rank = 20 and lambda = 5.0, and
> numIter = 10, and its RMSE on the test set is 25.718710831912485. The best
> model improves the baseline by 18.29%. Is there a scientific way in which
> RMSE could be brought down? What is a descent acceptable value for RMSE?
>

The grid search approach used in the AMPCamp tutorial is pretty
standard. Whether an RMSE is good or not really depends on your
dataset.

> 2.   I picked up the Collaborative filtering algorithm from
> http://ampcamp.berkeley.edu/5/exercises/movie-recommendation-with-mllib.html
> and executed the given code with my dataset. Now, I want to build a desktop
> application around it.
>
> a.   What is the best language to do this Java/ Scala? Any possibility
> to do this using C#?
>

We support Java/Scala/Python. Start with the one your are most
familiar with. C# is not supported.

> b.  Can somebody please share any relevant documents/ source or any
> helper links to help me get started on this?
>

For ALS, you can check the API documentation.

>
>
> Your help is greatly appreciated
>
>
>
> Thanks!!
>
>
>
> Regards,
>
> Saurabh Agrawal
>
>
> 
> This e-mail, including accompanying communications and attachments, is
> strictly confidential and only for the intended recipient. Any retention,
> use or disclosure not expressly authorised by Markit is prohibited. This
> email is subject to all waivers and other terms at the following link:
> http://www.markit.com/en/about/legal/email-disclaimer.page
>
> Please visit http://www.markit.com/en/about/contact/contact-us.page? for
> contact information on our offices worldwide.
>
> MarkitSERV Limited has its registered office located at Level 4, Ropemaker
> Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated
> by the Financial Conduct Authority with registration number 207294

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ALS failure with size > Integer.MAX_VALUE

2014-12-15 Thread Xiangrui Meng
Unfortunately, it will depends on the Sorter API in 1.2. -Xiangrui

On Mon, Dec 15, 2014 at 11:48 AM, Bharath Ravi Kumar
 wrote:
> Hi Xiangrui,
>
> The block size limit was encountered even with reduced number of item blocks
> as you had expected. I'm wondering if I could try the new implementation as
> a standalone library against a 1.1 deployment. Does it have dependencies on
> any core API's in the current master?
>
> Thanks,
> Bharath
>
> On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar 
> wrote:
>>
>> Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And
>> yes, I've been following the JIRA for the new ALS implementation. I'll try
>> it out when it's ready for testing. .
>>
>> On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng  wrote:
>>>
>>> Hi Bharath,
>>>
>>> You can try setting a small item blocks in this case. 1200 is
>>> definitely too large for ALS. Please try 30 or even smaller. I'm not
>>> sure whether this could solve the problem because you have 100 items
>>> connected with 10^8 users. There is a JIRA for this issue:
>>>
>>> https://issues.apache.org/jira/browse/SPARK-3735
>>>
>>> which I will try to implement in 1.3. I'll ping you when it is ready.
>>>
>>> Best,
>>> Xiangrui
>>>
>>> On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar 
>>> wrote:
>>> > Yes, the issue appears to be due to the 2GB block size limitation. I am
>>> > hence looking for (user, product) block sizing suggestions to work
>>> > around
>>> > the block size limitation.
>>> >
>>> > On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen  wrote:
>>> >>
>>> >> (It won't be that, since you see that the error occur when reading a
>>> >> block from disk. I think this is an instance of the 2GB block size
>>> >> limitation.)
>>> >>
>>> >> On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya
>>> >>  wrote:
>>> >> > Hi Bharath – I’m unsure if this is your problem but the
>>> >> > MatrixFactorizationModel in MLLIB which is the underlying component
>>> >> > for
>>> >> > ALS
>>> >> > expects your User/Product fields to be integers. Specifically, the
>>> >> > input
>>> >> > to
>>> >> > ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
>>> >> > wondering if
>>> >> > perhaps one of your identifiers exceeds MAX_INT, could you write a
>>> >> > quick
>>> >> > check for that?
>>> >> >
>>> >> > I have been running a very similar use case to yours (with more
>>> >> > constrained
>>> >> > hardware resources) and I haven’t seen this exact problem but I’m
>>> >> > sure
>>> >> > we’ve
>>> >> > seen similar issues. Please let me know if you have other questions.
>>> >> >
>>> >> > From: Bharath Ravi Kumar 
>>> >> > Date: Thursday, November 27, 2014 at 1:30 PM
>>> >> > To: "user@spark.apache.org" 
>>> >> > Subject: ALS failure with size > Integer.MAX_VALUE
>>> >> >
>>> >> > We're training a recommender with ALS in mllib 1.1 against a dataset
>>> >> > of
>>> >> > 150M
>>> >> > users and 4.5K items, with the total number of training records
>>> >> > being
>>> >> > 1.2
>>> >> > Billion (~30GB data). The input data is spread across 1200
>>> >> > partitions on
>>> >> > HDFS. For the training, rank=10, and we've configured {number of
>>> >> > user
>>> >> > data
>>> >> > blocks = number of item data blocks}. The number of user/item blocks
>>> >> > was
>>> >> > varied  between 50 to 1200. Irrespective of the block size (e.g. at
>>> >> > 1200
>>> >> > blocks each), there are atleast a couple of tasks that end up
>>> >> > shuffle
>>> >> > reading > 9.7G each in the aggregate stage (ALS.scala:337) and
>>> >> > failing
>>> >> > with
>>> >> > the following exception:
>>> >> >
>>> >> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>>> >> > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
>>> >> > at
>>> >> > org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
>>> >> > at
>>> >> > org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
>>> >> > at
>>> >> >
>>> >> >
>>> >> > org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
>>> >> > at
>>> >> >
>>> >> >
>>> >> > org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
>>> >> >
>>> >
>>> >
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building Desktop application for ALS-MlLib/ Training ALS

2014-12-15 Thread Abhi Basu
In case you must write c# code, you can call python code from c# or use
IronPython. :)

On Mon, Dec 15, 2014 at 12:04 PM, Xiangrui Meng  wrote:
>
> On Sun, Dec 14, 2014 at 3:06 AM, Saurabh Agrawal
>  wrote:
> >
> >
> > Hi,
> >
> >
> >
> > I am a new bee in spark and scala world
> >
> >
> >
> > I have been trying to implement Collaborative filtering using MlLib
> supplied
> > out of the box with Spark and Scala
> >
> >
> >
> > I have 2 problems
> >
> >
> >
> > 1.   The best model was trained with rank = 20 and lambda = 5.0, and
> > numIter = 10, and its RMSE on the test set is 25.718710831912485. The
> best
> > model improves the baseline by 18.29%. Is there a scientific way in which
> > RMSE could be brought down? What is a descent acceptable value for RMSE?
> >
>
> The grid search approach used in the AMPCamp tutorial is pretty
> standard. Whether an RMSE is good or not really depends on your
> dataset.
>
> > 2.   I picked up the Collaborative filtering algorithm from
> >
> http://ampcamp.berkeley.edu/5/exercises/movie-recommendation-with-mllib.html
> > and executed the given code with my dataset. Now, I want to build a
> desktop
> > application around it.
> >
> > a.   What is the best language to do this Java/ Scala? Any
> possibility
> > to do this using C#?
> >
>
> We support Java/Scala/Python. Start with the one your are most
> familiar with. C# is not supported.
>
> > b.  Can somebody please share any relevant documents/ source or any
> > helper links to help me get started on this?
> >
>
> For ALS, you can check the API documentation.
>
> >
> >
> > Your help is greatly appreciated
> >
> >
> >
> > Thanks!!
> >
> >
> >
> > Regards,
> >
> > Saurabh Agrawal
> >
> >
> > 
> > This e-mail, including accompanying communications and attachments, is
> > strictly confidential and only for the intended recipient. Any retention,
> > use or disclosure not expressly authorised by Markit is prohibited. This
> > email is subject to all waivers and other terms at the following link:
> > http://www.markit.com/en/about/legal/email-disclaimer.page
> >
> > Please visit http://www.markit.com/en/about/contact/contact-us.page? for
> > contact information on our offices worldwide.
> >
> > MarkitSERV Limited has its registered office located at Level 4,
> Ropemaker
> > Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and
> regulated
> > by the Financial Conduct Authority with registration number 207294
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

-- 
Abhi Basu


Re: java.lang.IllegalStateException: unread block data

2014-12-15 Thread Morbious
"Restored" ment reboot slave node with unchanged IP.
"Funny" thing is that for small files spark works fine.
I checked hadoop with hdfs also and I'm able to run wordcount on it without
any problems (i.e. file about 50GB size).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p20692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ERROR YarnClientClusterScheduler: Lost executor Akka client disassociated

2014-12-15 Thread DB Tsai
Hi Muhammad,

Maybe next time you can use http://pastebin.com/ to format and paste
the cleaner scala code snippet so other can help you easier. Also,
please only paste the significant portion of stack-trace which causes
the issue instead of giant logs.

First of all, In your log, it seems that you run out of memory, and I
guess the problem is you are trying to cache the whole
`clickstreamRDD`. Since you are not necessary using it so many time,
you may not need to cache it for better performance. Or at least, you
storage persistence should be `disk and memory` to avoid out of
memory.

Secondly, `groupByKey` is very expensive here. It's probably not the
root cause why the job is not finished, but `groupByKey` will shuffle
all the data to the reducer. In your case, you can do filter first
which will be executed in parallel in mapper side, and then do
`groupByKey`. You can specify higher num of task when you do
`groupByKey`. I'll recommend you to find a way to write your logic
using `reduceByKey` or `combineByKey` to yield much better performance
since those two operations can reduce or combine the data in mapper
side which will lead to much less shuffle traffic.

Finally, you may want to break down which part of your code causes the
issue to make debugging easier.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, Dec 11, 2014 at 4:48 AM, Muhammad Ahsan
 wrote:
> --
> Code
> --
> scala> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkContext._
>
> scala> import org.apache.spark.rdd.RDD
> import org.apache.spark.rdd.RDD
>
> scala> import org.apache.spark.sql.SchemaRDD
> import org.apache.spark.sql.SchemaRDD
>
> scala> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.HiveContext
>
> scala> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.{SparkConf, SparkContext}
>
> scala> val hiveContext: HiveContext = new HiveContext(sc)
> hiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@2de76244
>
> scala> val numDays = 2
> numDays: Int = 2
>
> scala> case class Click(
> /* about 20 fields of type STRING */
> )
> defined class Click
>
> scala> val inputRDD = new Array[SchemaRDD](numDays)
> inputRDD: Array[org.apache.spark.sql.SchemaRDD] = Array(null, null)
>
> scala> for (i <- 1 to numDays) {
>  | if (i < 10) {
>  | inputRDD(i - 1) =
> hiveContext.parquetFile("hdfs://" + i)
>  | } else {
>  | inputRDD(i - 1) =
> hiveContext.parquetFile("hdfs://" + i)
>  | }
>  |
>  | }
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
>
> scala> var unionRDD = inputRDD(1)
> unionRDD: org.apache.spark.sql.SchemaRDD =
> SchemaRDD[1] at RDD at SchemaRDD.scala:104
>
> scala> for (i <- 1 to inputRDD.length - 1) {
>  | unionRDD = unionRDD.unionAll(inputRDD(i))
>  | }
>
> scala> val inputRDD = unionRDD
> inputRDD: org.apache.spark.sql.SchemaRDD =
> SchemaRDD[2] at RDD at SchemaRDD.scala:104
> scala>
>
> scala> inputRDD.registerTempTable("urlInfo")
>
> scala> val clickstreamRDD = hiveContext.sql("select * from urlInfo "
> +
>  | "where guid regexp '^[0-9a-f-]{36}$' " +
>  | "AND ((callerid  > 3 AND callerid <1) OR callerid >
> 10 " +
>  | "OR (callerid=3 AND browsertype = 'IE')) " +
>  | "AND countrycode regexp '^[A-Z]{2}$'")
> clickstreamRDD: org.apache.spark.sql.SchemaRDD =
> SchemaRDD[3] at RDD at SchemaRDD.scala:104
> scala>
>
> scala> clickstreamRDD.registerTempTable("clickstream")
>
> scala> clickstreamRDD.cache()
> res4: clickstreamRDD.type =
> SchemaRDD[3] at RDD at SchemaRDD.scala:104
>
> scala> val guidClickRDD = clickstreamRDD.map(row =>
> (row(7).asInstanceOf[String], {
>  | val value = Click(row(0).asInstanceOf[String],
>  | row(1).asInstanceOf[String],
> row(2).asInstanceOf[String],
>  | row(3).asInstanceOf[String],
> row(4).asInstanceOf[String],
>  | row(5).asInstanceOf[String],
> row(6).asInstanceOf[String],
>  | row(7).asInstanceOf[String],
> row(8).asInstanceOf[String],
>  | row(9).asInstanceOf[String],
> row(10).asInstanceOf[String],
>  | row(11).asInstanceOf[String],
> row(12).asInstanceOf[String],
>  | row(13).asInstanceOf[String],
> row(14).asInstanceOf[String],
>  | row(15).asInstanceOf[Stri

Re: Including data nucleus tools

2014-12-15 Thread DB Tsai
Just out of my curiosity. Do you manually apply this patch and see if
this can actually resolve the issue? It seems that it was merged at
some point, but reverted due to that it causes some stability issue.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Sat, Dec 13, 2014 at 7:11 AM,   wrote:
> So to answer my own question. It is a bug and there is unmerged PR for that
> already.
>
> https://issues.apache.org/jira/browse/SPARK-2624
> https://github.com/apache/spark/pull/3238
>
> Jakub
>
> -- Původní zpráva --
> Od: spark.dubovsky.ja...@seznam.cz
> Komu: spark.dubovsky.ja...@seznam.cz
> Datum: 12. 12. 2014 15:26:35
>
>
> Předmět: Re: Including data nucleus tools
>
>
> Hi,
>
>   I had time to try it again. I submited my app by the same command with
> these additional options:
>
>   --jars
> lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-core-3.2.10.jar,lib/datanucleus-rdbms-3.2.9.jar
>
>   Now an app successfully creates hive context. So my question remains: Is
> "classpath entries" from sparkUI the same classpath as mentioned in submit
> script message?
>
> "Spark assembly has been built with Hive, including Datanucleus jars on
> classpath"
>
>   If so then why the script fails to really include datanucleus jars on
> classpath? I found no bug about this on jira. Or is there a way how
> particular yarn/os settings on our cluster overrides this?
>
>   Thanks in advance
>
>   Jakub
>
> -- Původní zpráva --
> Od: spark.dubovsky.ja...@seznam.cz
> Komu: Michael Armbrust 
> Datum: 7. 12. 2014 3:02:33
> Předmět: Re: Including data nucleus tools
>
>
> Next try. I copied whole dist directory created by make-distribution script
> to cluster not just assembly jar. Then I used
>
> ./bin/spark-submit --num-executors 200 --master yarn-cluster --class
> org.apache.spark.mllib.CreateGuidDomainDictionary ../spark/root-0.1.jar
> ${args}
>
>  ...to run app again. Startup scripts printed this message:
>
> "Spark assembly has been built with Hive, including Datanucleus jars on
> classpath"
>
>   ...so I thought I am finally there. But job started and failed on the same
> ClassNotFound exception as before. Is "classpath" from script message just
> classpath of driver? Or is it the same classpath which is affected by --jars
> option? I was trying to find out from scripts but I was not able to find
> where --jars option is processed.
>
>   thanks
>
> -- Původní zpráva --
> Od: Michael Armbrust 
> Komu: spark.dubovsky.ja...@seznam.cz
> Datum: 6. 12. 2014 20:39:13
> Předmět: Re: Including data nucleus tools
>
>
> On Sat, Dec 6, 2014 at 5:53 AM,  wrote:
>
> Bonus question: Should the class
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory be part of assembly?
> Because it is not in jar now.
>
>
> No these jars cannot be put into the assembly because they have extra
> metadata files that live in the same location (so if you put them all in an
> assembly they overrwrite each other).  This metadata is used in discovery.
> Instead they must be manually put on the classpath in their original form
> (usually using --jars).

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Intermittent test failures

2014-12-15 Thread Marius Soutier
Possible, yes, although I’m trying everything I can to prevent it, i.e. fork in 
Test := true and isolated. Can you confirm that reusing a single SparkContext 
for multiple tests poses a problem as well?

Other than that, just switching from SQLContext to HiveContext also provoked 
the error.


On 15.12.2014, at 20:22, Michael Armbrust  wrote:

> Is it possible that you are starting more than one SparkContext in a single 
> JVM with out stopping previous ones?  I'd try testing with Spark 1.2, which 
> will throw an exception in this case.
> 
> On Mon, Dec 15, 2014 at 8:48 AM, Marius Soutier  wrote:
> Hi,
> 
> I’m seeing strange, random errors when running unit tests for my Spark jobs. 
> In this particular case I’m using Spark SQL to read and write Parquet files, 
> and one error that I keep running into is this one:
> 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in 
> stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 
> (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
> org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
> org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
> 
> I can only prevent this from happening by using isolated Specs tests thats 
> always create a new SparkContext that is not shared between tests (but there 
> can also be only a single SparkContext per test), and also by using standard 
> SQLContext instead of HiveContext. It does not seem to have anything to do 
> with the actual files that I also create during the test run with 
> SQLContext.saveAsParquetFile.
> 
> 
> Cheers
> - Marius
> 
> 
> PS The full trace:
> 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in 
> stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 
> (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
> org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
> org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
> 
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
> 
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
> org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
> 
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
> 
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232)
> 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)
> 
> org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)
> sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
> 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160)
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>  ~[spark-core_2.10-1.1.1.jar:1.1.1]
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>  ~[spark-core_2.10-1.1.1.jar:1.1.1]
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
>  ~[spark-core_2.10-1.1.1.jar:1.1.1]
> at 
> scala.collection.mutable.ResizableArray$class.foreach(Resizabl

Re: MLLIB model export: PMML vs MLLIB serialization

2014-12-15 Thread selvinsource
I am going to try to export decision tree next, so far I focused on linear
models and k-means.

Regards,
Vincenzo





sourabh wrote
> Thanks Vincenzo.
> Are you trying out all the models implemented in mllib? Actually I don't
> see decision tree there. Sorry if I missed it. When are you planning to
> merge this to spark branch?
> 
> Thanks
> Sourabh





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20693.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark is crashing in this case. why?

2014-12-15 Thread Sameer Farooqui
Adding group back.


FYI Geneis - this was on a m3.xlarge with all default settings in Spark. I
used Spark version 1.3.0.

The 2nd case did work for me:

>>> a = [1,2,3,4,5,6,7,8,9]
>>> b = []
>>> for x in range(100):
...   b.append(a)
...
>>> rdd1 = sc.parallelize(b)
>>> rdd1.first()
14/12/15 16:33:01 WARN TaskSetManager: Stage 1 contains a task of very
large size (9766 KB). The maximum recommended task size is 100 KB.
[1, 2, 3, 4, 5, 6, 7, 8, 9]


On Mon, Dec 15, 2014 at 1:33 PM, Sameer Farooqui 
wrote:
>
> Hi Genesis,
>
>
> The 2nd case did work for me:
>
> >>> a = [1,2,3,4,5,6,7,8,9]
> >>> b = []
> >>> for x in range(100):
> ...   b.append(a)
> ...
> >>> rdd1 = sc.parallelize(b)
> >>> rdd1.first()
> 14/12/15 16:33:01 WARN TaskSetManager: Stage 1 contains a task of very
> large size (9766 KB). The maximum recommended task size is 100 KB.
> [1, 2, 3, 4, 5, 6, 7, 8, 9]
>
>
>
>
> On Sun, Dec 14, 2014 at 2:13 PM, Genesis Fatum 
> wrote:
>>
>> Hi Sameer,
>>
>> I have tried multiple configurations. For example, executor and driver
>> memory at 2G. Also played with the JRE memory size parameters (-Xms) and
>> get the same error.
>>
>> Does it work for you? I think it is a setup issue on my side, although I
>> have tried a couple laptops.
>>
>> Thanks
>>
>> On Sun, Dec 14, 2014 at 1:11 PM, Sameer Farooqui 
>> wrote:
>>>
>>> How much executor-memory are you setting for the JVM? What about the
>>> Driver JVM memory?
>>>
>>> Also check the Windows Event Log for Out of memory errors for one of the
>>> 2 above JVMs.
>>> On Dec 14, 2014 6:04 AM, "genesis fatum" 
>>> wrote:
>>>
 Hi,

 My environment is: standalone spark 1.1.1 on windows 8.1 pro.

 The following case works fine:
 >>> a = [1,2,3,4,5,6,7,8,9]
 >>> b = []
 >>> for x in range(10):
 ...  b.append(a)
 ...
 >>> rdd1 = sc.parallelize(b)
 >>> rdd1.first()
 >>>[1, 2, 3, 4, 5, 6, 7, 8, 9]

 The following case does not work. The only difference is the size of the
 array. Note the loop range: 100K vs. 1M.
 >>> a = [1,2,3,4,5,6,7,8,9]
 >>> b = []
 >>> for x in range(100):
 ...  b.append(a)
 ...
 >>> rdd1 = sc.parallelize(b)
 >>> rdd1.first()
 >>>
 14/12/14 07:52:19 ERROR PythonRDD: Python worker exited unexpectedly
 (crashed)
 java.net.SocketException: Connection reset by peer: socket write error
 at java.net.SocketOutputStream.socketWrite0(Native Method)
 at java.net.SocketOutputStream.socketWrite(Unknown Source)
 at java.net.SocketOutputStream.write(Unknown Source)
 at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
 at java.io.BufferedOutputStream.write(Unknown Source)
 at java.io.DataOutputStream.write(Unknown Source)
 at java.io.FilterOutputStream.write(Unknown Source)
 at
 org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
 1.apply(PythonRDD.scala:341)
 at
 org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
 1.apply(PythonRDD.scala:339)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD
 D.scala:339)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
 ly$mcV$sp(PythonRDD.scala:209)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
 ly(PythonRDD.scala:184)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
 ly(PythonRDD.scala:184)
 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal
 a:183)

 What I have tried:
 1. Replaced JRE 32bit with JRE64
 2. Multiple configurations when I start pyspark: --driver-memory,
 --executor-memory
 3. Tried to set the SparkConf with different settings
 4. Tried also with spark 1.1.0

 Being new to Spark, I am sure that it is something simple that I am
 missing
 and would appreciate any thoughts.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-is-crashing-in-this-case-why-tp20675.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Stop streaming context gracefully when SIGTERM is passed

2014-12-15 Thread Budde, Adam
Hi all,

We are using Spark Streaming ETL a large volume of time series datasets. In our 
current design, each dataset we ETL will have a corresponding Spark Streaming 
context + process running on our cluster. Each of these processes will be 
passed configuration options specifying the data source to process as well as 
various tuning parameters such as the number of Receiver objects to use, batch 
interval size, number of partitions, etc.

Since the volume of data we're ingesting for each dataset will fluctuate over 
time, we'd like to be able to regularly send a SIGTERM to the Spark Streaming 
process handling the ETL, have that process gracefully complete processing any 
in-flight data, and restart the process with updated configuration options. The 
most obvious solution seems to be to call the stop(stopSparkContext: Boolean, 
stopGracefully: Boolean) method provided by StreamingContext in a shutdown 
hook, but this approach doesn't seem to be working for me. Here's a rough idea 
of what my code looks like:

> val ssc = new StreamingContext(conf, Seconds(15))
>
> ...
>
> // Add shutdown hook to exit gracefully upon termination.
> Runtime.getRuntime().addShutdownHook(new Thread() extends Logging {
>   override def run() = {
> logInfo("Exiting gracefully...")
> ssc.stop(true, true)
>   }
> })
>
> ...
>
> ssc.start()
> ssc.awaitTermination()

Whenever I try to kill the process, I don't see the "Exiting gracefully…" log 
message I've added. I tried grokking through the Spark source code to see if 
some other shutdown hook might be squashing the hook I've added by causing the 
process to exit before this hook is invoked, but I haven't found anything that 
would cause concern yet. Does anybody have any advice or insight on this? I'm a 
bit of a novice when it comes to the JVM and I'm afraid that I'm reaching the 
limits of my diagnostic abilities here.

Thanks,
Adam


Re: Intermittent test failures

2014-12-15 Thread Michael Armbrust
Using a single SparkContext should not cause this problem.  In the SQL
tests we use TestSQLContext and TestHive which are global singletons for
all of our unit testing.

On Mon, Dec 15, 2014 at 1:27 PM, Marius Soutier  wrote:
>
> Possible, yes, although I’m trying everything I can to prevent it, i.e.
> fork in Test := true and isolated. Can you confirm that reusing a single
> SparkContext for multiple tests poses a problem as well?
>
> Other than that, just switching from SQLContext to HiveContext also
> provoked the error.
>
>
> On 15.12.2014, at 20:22, Michael Armbrust  wrote:
>
> Is it possible that you are starting more than one SparkContext in a
> single JVM with out stopping previous ones?  I'd try testing with Spark
> 1.2, which will throw an exception in this case.
>
> On Mon, Dec 15, 2014 at 8:48 AM, Marius Soutier  wrote:
>>
>> Hi,
>>
>> I’m seeing strange, random errors when running unit tests for my Spark
>> jobs. In this particular case I’m using Spark SQL to read and write Parquet
>> files, and one error that I keep running into is this one:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in
>> stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
>> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
>> org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
>> org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
>>
>> I can only prevent this from happening by using isolated Specs tests
>> thats always create a new SparkContext that is not shared between tests
>> (but there can also be only a single SparkContext per test), and also by
>> using standard SQLContext instead of HiveContext. It does not seem to have
>> anything to do with the actual files that I also create during the test run
>> with SQLContext.saveAsParquetFile.
>>
>>
>> Cheers
>> - Marius
>>
>>
>> PS The full trace:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in
>> stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
>> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
>> org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
>> org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
>>
>> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
>>
>> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
>>
>> org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
>>
>> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
>>
>> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232)
>>
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)
>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)
>>
>> org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)
>> sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:606)
>>
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>>
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160)
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> java.lang.Thread.run(Thread.java:745)
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>> ~[spark-core_2.10-1.1.1.jar:1.1.1]
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>> ~[spark-core_2.10-1.1.1.jar:1.1.1]
>> at
>> org.apache

Re: Intermittent test failures

2014-12-15 Thread Marius Soutier
Ok, maybe these test versions will help me then. I’ll check it out.

On 15.12.2014, at 22:33, Michael Armbrust  wrote:

> Using a single SparkContext should not cause this problem.  In the SQL tests 
> we use TestSQLContext and TestHive which are global singletons for all of our 
> unit testing.
> 
> On Mon, Dec 15, 2014 at 1:27 PM, Marius Soutier  wrote:
> Possible, yes, although I’m trying everything I can to prevent it, i.e. fork 
> in Test := true and isolated. Can you confirm that reusing a single 
> SparkContext for multiple tests poses a problem as well?
> 
> Other than that, just switching from SQLContext to HiveContext also provoked 
> the error.
> 
> 
> On 15.12.2014, at 20:22, Michael Armbrust  wrote:
> 
>> Is it possible that you are starting more than one SparkContext in a single 
>> JVM with out stopping previous ones?  I'd try testing with Spark 1.2, which 
>> will throw an exception in this case.
>> 
>> On Mon, Dec 15, 2014 at 8:48 AM, Marius Soutier  wrote:
>> Hi,
>> 
>> I’m seeing strange, random errors when running unit tests for my Spark jobs. 
>> In this particular case I’m using Spark SQL to read and write Parquet files, 
>> and one error that I keep running into is this one:
>> 
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 
>> in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 
>> 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
>> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
>> org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
>> org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
>> 
>> I can only prevent this from happening by using isolated Specs tests thats 
>> always create a new SparkContext that is not shared between tests (but there 
>> can also be only a single SparkContext per test), and also by using standard 
>> SQLContext instead of HiveContext. It does not seem to have anything to do 
>> with the actual files that I also create during the test run with 
>> SQLContext.saveAsParquetFile.
>> 
>> 
>> Cheers
>> - Marius
>> 
>> 
>> PS The full trace:
>> 
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 
>> in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 
>> 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
>> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
>> org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
>> org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
>> 
>> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
>> 
>> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
>> org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
>> 
>> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
>> 
>> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232)
>> 
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)
>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)
>> 
>> org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)
>> sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>> 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:606)
>> 
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>> 
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160)
>> 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> java.lang.Thread.run(Thread.java:745)
>> Driver stacktrace:
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$fai

NumberFormatException

2014-12-15 Thread yu
Hello, everyone

I know 'NumberFormatException' is due to the reason that String can not be
parsed properly, but I really can not find any mistakes for my code. I hope
someone may kindly help me.
My hdfs file is as follows:
8,22
3,11
40,10
49,47
48,29
24,28
50,30
33,56
4,20
30,38
...

So each line contains an integer + "," + an integer + "\n"
My code is as follows:
object StreamMonitor {
  def main(args: Array[String]): Unit = {
val myFunc = (str: String) => {
  val strArray = str.trim().split(",") 
  (strArray(0).toInt, strArray(1).toInt)
}
val conf = new SparkConf().setAppName("StreamMonitor");
val ssc = new StreamingContext(conf, Seconds(30));
val datastream = ssc.textFileStream("/user/yu/streaminput");
val newstream = datastream.map(myFunc)  
newstream.saveAsTextFiles("output/", "");   
ssc.start()
ssc.awaitTermination()
  }

}

The exception info is:
14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
(TID 0, h3): java.lang.NumberFormatException: For input string: "8"
   
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
java.lang.Integer.parseInt(Integer.java:492)
java.lang.Integer.parseInt(Integer.java:527)
   
scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:9)
StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:7)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
   
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
   
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

So based on the above info, "8" is the first number in the file and I think
it should be parsed to integer without any problems.
I know it may be a very stupid question and the answer may be very easy. But
I really can not find the reason. I am thankful to anyone who helps!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why KMeans with mllib is so slow ?

2014-12-15 Thread Jaonary Rabarisoa
I've tried some additional experiments with kmeans and I finally got it
worked as I expected. In fact, the number of partition is critical. I had a
data set of 24x784 with 12 partitions. In this case the kmeans
algorithm took a very long time (about hours to converge). When I change
the partition into 32, the same kmeans ( runs = 10, k = 10, iterations =
300, init = kmeans|| ) converges in 4 min with 8 cores 
As a comparison, the same problem solve with python scikit-learn takes 21
min on a single core.  So spark wins :)

As conclusion, setting the number of partition correctly is essential. Is
there a rule of thumb for that ?

On Mon, Dec 15, 2014 at 8:55 PM, Xiangrui Meng  wrote:
>
> Please check the number of partitions after sc.textFile. Use
> sc.textFile('...', 8) to have at least 8 partitions. -Xiangrui
>
> On Tue, Dec 9, 2014 at 4:58 AM, DB Tsai  wrote:
> > You just need to use the latest master code without any configuration
> > to get performance improvement from my PR.
> >
> > Sincerely,
> >
> > DB Tsai
> > ---
> > My Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> > On Mon, Dec 8, 2014 at 7:53 AM, Jaonary Rabarisoa 
> wrote:
> >> After some investigation, I learned that I can't compare kmeans in mllib
> >> with another kmeans implementation directly. The kmeans|| initialization
> >> step takes more time than the algorithm implemented in julia for
> example.
> >> There is also the ability to run multiple runs of kmeans algorithm in
> mllib
> >> even by default the number of runs is 1.
> >>
> >> DB Tsai can you please tell me the configuration you took for the
> >> improvement you mention in your pull request. I'd like to run the same
> >> benchmark on mnist8m on my computer.
> >>
> >>
> >> Cheers;
> >>
> >>
> >>
> >> On Fri, Dec 5, 2014 at 10:34 PM, DB Tsai  wrote:
> >>>
> >>> Also, are you using the latest master in this experiment? A PR merged
> >>> into the master couple days ago will spend up the k-means three times.
> >>> See
> >>>
> >>>
> >>>
> https://github.com/apache/spark/commit/7fc49ed91168999d24ae7b4cc46fbb4ec87febc1
> >>>
> >>> Sincerely,
> >>>
> >>> DB Tsai
> >>> ---
> >>> My Blog: https://www.dbtsai.com
> >>> LinkedIn: https://www.linkedin.com/in/dbtsai
> >>>
> >>>
> >>> On Fri, Dec 5, 2014 at 9:36 AM, Jaonary Rabarisoa 
> >>> wrote:
> >>> > The code is really simple :
> >>> >
> >>> > object TestKMeans {
> >>> >
> >>> >   def main(args: Array[String]) {
> >>> >
> >>> > val conf = new SparkConf()
> >>> >   .setAppName("Test KMeans")
> >>> >   .setMaster("local[8]")
> >>> >   .set("spark.executor.memory", "8g")
> >>> >
> >>> > val sc = new SparkContext(conf)
> >>> >
> >>> > val numClusters = 500;
> >>> > val numIterations = 2;
> >>> >
> >>> >
> >>> > val data = sc.textFile("sample.csv").map(x =>
> >>> > Vectors.dense(x.split(',').map(_.toDouble)))
> >>> > data.cache()
> >>> >
> >>> >
> >>> > val clusters = KMeans.train(data, numClusters, numIterations)
> >>> >
> >>> > println(clusters.clusterCenters.size)
> >>> >
> >>> > val wssse = clusters.computeCost(data)
> >>> > println(s"error : $wssse")
> >>> >
> >>> >   }
> >>> > }
> >>> >
> >>> >
> >>> > For the testing purpose, I was generating a sample random data with
> >>> > julia
> >>> > and store it in a csv file delimited by comma. The dimensions is
> 248000
> >>> > x
> >>> > 384.
> >>> >
> >>> > In the target application, I will have more than 248k data to
> cluster.
> >>> >
> >>> >
> >>> > On Fri, Dec 5, 2014 at 6:03 PM, Davies Liu 
> >>> > wrote:
> >>> >>
> >>> >> Could you post you script to reproduce the results (also how to
> >>> >> generate the dataset)? That will help us to investigate it.
> >>> >>
> >>> >> On Fri, Dec 5, 2014 at 8:40 AM, Jaonary Rabarisoa <
> jaon...@gmail.com>
> >>> >> wrote:
> >>> >> > Hmm, here I use spark on local mode on my laptop with 8 cores. The
> >>> >> > data
> >>> >> > is
> >>> >> > on my local filesystem. Event thought, there an overhead due to
> the
> >>> >> > distributed computation, I found the difference between the
> runtime
> >>> >> > of
> >>> >> > the
> >>> >> > two implementations really, really huge. Is there a benchmark on
> how
> >>> >> > well
> >>> >> > the algorithm implemented in mllib performs ?
> >>> >> >
> >>> >> > On Fri, Dec 5, 2014 at 4:56 PM, Sean Owen 
> wrote:
> >>> >> >>
> >>> >> >> Spark has much more overhead, since it's set up to distribute the
> >>> >> >> computation. Julia isn't distributed, and so has no such
> overhead in
> >>> >> >> a
> >>> >> >> completely in-core implementation. You generally use Spark when
> you
> >>> >> >> have a problem large enough to warrant distributing, or, your
> data
> >>> >> >> already lives in a distributed store like HDFS.
> >>> >> >>
> >>> >> >> But it's also possible you're not configuring the implementations
> >>> >> >> the
> >>> >>

MLLib: Saving and loading a model

2014-12-15 Thread Sameer Tilak
Hi All,Resending this:
I am using LinearRegressionWithSGD and then I save the model weights and 
intercept. File that contains weights have this format:
1.204550.13560.000456..
Intercept is 0 since I am using train not setting the intercept so it can be 
ignored for the moment. I would now like to initialize a new model object and 
using these saved weights from the above file. We are using CDH 5.1
Something along these lines:
val weights = sc.textFile("linear-weights");val model = new 
LinearRegressionWithSGD(weights);
then use is as:
val valuesAndPreds = testData.map { point =>  val prediction = 
model.predict(point.features)  (point.label, prediction)}

Any pointers to how do I do that?
  

Re: Stop streaming context gracefully when SIGTERM is passed

2014-12-15 Thread Soumitra Kumar
Hi Adam,

I have following scala actor based code to do graceful shutdown:

class TimerActor (val timeout : Long, val who : Actor) extends Actor {
def act {
reactWithin (timeout) {
case TIMEOUT => who ! SHUTDOWN
}
}
}

class SSCReactor (val ssc : StreamingContext) extends Actor with Logging {
def act {
react {
case SHUTDOWN =>
logger.info (s"Shutting down gracefully ...")
ssc.stop (true, true)
}
}
}

I see following message:

14/10/22 01:40:49 INFO SSCReactor: Shutting down gracefully ...
14/10/22 01:40:49 INFO JobGenerator: Stopping JobGenerator gracefully
14/10/22 01:40:49 INFO JobGenerator: Waiting for all received blocks to be
consumed for job generation
14/10/22 01:40:49 INFO JobGenerator: Waited for all received blocks to be
consumed for job generation

-Soumitra.


On Mon, Dec 15, 2014 at 1:32 PM, Budde, Adam  wrote:
>
>  Hi all,
>
>  We are using Spark Streaming ETL a large volume of time series datasets.
> In our current design, each dataset we ETL will have a corresponding Spark
> Streaming context + process running on our cluster. Each of these processes
> will be passed configuration options specifying the data source to process
> as well as various tuning parameters such as the number of Receiver objects
> to use, batch interval size, number of partitions, etc.
>
>  Since the volume of data we're ingesting for each dataset will fluctuate
> over time, we'd like to be able to regularly send a SIGTERM to the Spark
> Streaming process handling the ETL, have that process gracefully complete
> processing any in-flight data, and restart the process with updated
> configuration options. The most obvious solution seems to be to call the
> stop(stopSparkContext: Boolean, stopGracefully: Boolean) method provided
> by StreamingContext in a shutdown hook, but this approach doesn't seem to
> be working for me. Here's a rough idea of what my code looks like:
>
>  > val ssc = new StreamingContext(conf, Seconds(15))
> >
> > ...
> >
> > // Add shutdown hook to exit gracefully upon termination.
> > Runtime.getRuntime().addShutdownHook(new Thread() extends Logging {
> >   override def run() = {
> > logInfo("Exiting gracefully...")
> > ssc.stop(true, true)
> >   }
> > })
> >
> > ...
> >
> > ssc.start()
> > ssc.awaitTermination()
>
>  Whenever I try to kill the process, I don't see the "Exiting
> gracefully…" log message I've added. I tried grokking through the Spark
> source code to see if some other shutdown hook might be squashing the hook
> I've added by causing the process to exit before this hook is invoked, but
> I haven't found anything that would cause concern yet. Does anybody have
> any advice or insight on this? I'm a bit of a novice when it comes to the
> JVM and I'm afraid that I'm reaching the limits of my diagnostic abilities
> here.
>
>  Thanks,
> Adam
>


Re: NumberFormatException

2014-12-15 Thread Sean Owen
That certainly looks surprising. Are you sure there are no unprintable
characters in the file?

On Mon, Dec 15, 2014 at 9:49 PM, yu  wrote:
> The exception info is:
> 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
> (TID 0, h3): java.lang.NumberFormatException: For input string: "8"
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can spark job have sideeffects (write files to FileSystem)

2014-12-15 Thread Paweł Szulc
Yes, this is what I also found in Spark documentation, that foreach can
have side effects. Nevertheless I have this weird error, that sometimes
files are just empty.

"using" is simply a wrapper that takes our code, makes try-catch-finally
and flush & close all resources.

I honestly have no clue what can possibly be wrong.

No errors in logs.

On Thu, Dec 11, 2014 at 2:29 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:
>
> Yes, this is perfectly "legal". This is what RDD.foreach() is for! You may
> be encountering an IO exception while writing, and maybe using() suppresses
> it. (?) I'd try writing the files with java.nio.file.Files.write() -- I'd
> expect there is less that can go wrong with that simple call.
>
> On Thu, Dec 11, 2014 at 12:50 PM, Paweł Szulc 
> wrote:
>
>> Imagine simple Spark job, that will store each line of the RDD to a
>> separate file
>>
>>
>> val lines = sc.parallelize(1 to 100).map(n => s"this is line $n")
>> lines.foreach(line => writeToFile(line))
>>
>> def writeToFile(line: String) = {
>> def filePath = "file://..."
>> val file = new File(new URI(path).getPath)
>> // using function simply closes the output stream
>> using(new FileOutputStream(file)) { output =>
>>   output.write(value)
>> }
>> }
>>
>>
>> Now, example above works 99,9% of a time. Files are generated for each
>> line, each file contains that particular line.
>>
>> However, when dealing with large number of data, we encounter situations
>> where some of the files are empty! Files are generated, but there is no
>> content inside of them (0 bytes).
>>
>> Now the question is: can Spark job have side effects. Is it even legal to
>> write such code?
>> If no, then what other choice do we have when we want to save data from
>> our RDD?
>> If yes, then do you guys see what could be the reason of this job acting
>> in this strange manner 0.1% of the time?
>>
>>
>> disclaimer: we are fully aware of .saveAsTextFile method in the API,
>> however the example above is a simplification of our code - normally we
>> produce PDF files.
>>
>>
>> Best regards,
>> Paweł Szulc
>>
>>
>>
>>
>>
>>
>>
>


Re: Can spark job have sideeffects (write files to FileSystem)

2014-12-15 Thread Davies Liu
Thinking about that any task could be launched concurrently in
different nodes, so in order to make sure the generated files are
valid, you need some atomic operation (such as rename) to do it. For
example, you could generate a random name for output file, writing the
data into it, rename it to the target name finally. This is what
happened in saveAsTextFile().

On Mon, Dec 15, 2014 at 4:37 PM, Paweł Szulc  wrote:
> Yes, this is what I also found in Spark documentation, that foreach can have
> side effects. Nevertheless I have this weird error, that sometimes files are
> just empty.
>
> "using" is simply a wrapper that takes our code, makes try-catch-finally and
> flush & close all resources.
>
> I honestly have no clue what can possibly be wrong.
>
> No errors in logs.
>
> On Thu, Dec 11, 2014 at 2:29 PM, Daniel Darabos
>  wrote:
>>
>> Yes, this is perfectly "legal". This is what RDD.foreach() is for! You may
>> be encountering an IO exception while writing, and maybe using() suppresses
>> it. (?) I'd try writing the files with java.nio.file.Files.write() -- I'd
>> expect there is less that can go wrong with that simple call.
>>
>> On Thu, Dec 11, 2014 at 12:50 PM, Paweł Szulc 
>> wrote:
>>>
>>> Imagine simple Spark job, that will store each line of the RDD to a
>>> separate file
>>>
>>>
>>> val lines = sc.parallelize(1 to 100).map(n => s"this is line $n")
>>> lines.foreach(line => writeToFile(line))
>>>
>>> def writeToFile(line: String) = {
>>> def filePath = "file://..."
>>> val file = new File(new URI(path).getPath)
>>> // using function simply closes the output stream
>>> using(new FileOutputStream(file)) { output =>
>>>   output.write(value)
>>> }
>>> }
>>>
>>>
>>> Now, example above works 99,9% of a time. Files are generated for each
>>> line, each file contains that particular line.
>>>
>>> However, when dealing with large number of data, we encounter situations
>>> where some of the files are empty! Files are generated, but there is no
>>> content inside of them (0 bytes).
>>>
>>> Now the question is: can Spark job have side effects. Is it even legal to
>>> write such code?
>>> If no, then what other choice do we have when we want to save data from
>>> our RDD?
>>> If yes, then do you guys see what could be the reason of this job acting
>>> in this strange manner 0.1% of the time?
>>>
>>>
>>> disclaimer: we are fully aware of .saveAsTextFile method in the API,
>>> however the example above is a simplification of our code - normally we
>>> produce PDF files.
>>>
>>>
>>> Best regards,
>>> Paweł Szulc
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Executor memory

2014-12-15 Thread Pala M Muthaia
Hi,

Running Spark 1.0.1 on Yarn 2.5

When i specify --executor-memory 4g, the spark UI shows each executor as
having only 2.3 GB, and similarly for 8g, only 4.6 GB.

I am guessing that the executor memory corresponds to the container memory,
and that the task JVM gets only a percentage of the container total memory.
Is there a yarn or spark parameter to tune this so that my task JVM
actually gets 6GB out of the 8GB for example?


Thanks.


Re: what is the best way to implement mini batches?

2014-12-15 Thread Earthson Lu
Hi Imran, you are right. Sequentially process does not make sense to use spark.

I think Sequentially process works if batch for each iteration is large 
enough(this batch could be processed in parallel).

My point is that we shall not run mini-batches in parallel, but it still 
possible to use large batch for parallel inside each batch(It seems to be the 
way that SGD implemented in MLLib does?).


-- 
Earthson Lu

On December 16, 2014 at 04:02:22, Imran Rashid (im...@therashids.com) wrote:

I'm a little confused by some of the responses.  It seems like there are two 
different issues being discussed here:

1.  How to turn a sequential algorithm into something that works on spark.  Eg 
deal with the fact that data is split into partitions which are processed in 
parallel (though within a partition, data is processed sequentially).  I'm 
guessing folks are particularly interested in online machine learning algos, 
which often have a point update and a mini batch update.

2.  How to convert a one-point-at-a-time view of the data and convert it into a 
mini batches view of the data.

(2) is pretty straightforward, eg with iterator.grouped (batchSize), or 
manually put data into your own buffer etc.  This works for creating mini 
batches *within* one partition in the context of spark.

But problem (1) is completely separate, and there is no general solution.  It 
really depends the specifics of what you're trying to do.

Some of the suggestions on this thread seem like they are basically just 
falling back to sequential data processing ... but reay inefficient 
sequential processing.  Eg.  It doesn't make sense to do a full scan of your 
data with spark, and ignore all the records but the few that are in the next 
mini batch.

It's completely reasonable to just sequentially process all the data if that 
works for you.  But then it doesn't make sense to use spark, you're not gaining 
anything from it.

Hope this helps, apologies if I just misunderstood the other suggested 
solutions.

On Dec 14, 2014 8:35 PM, "Earthson"  wrote:
I think it could be done like:

1. using mapPartition to randomly drop some partition
2. drop some elements randomly(for selected partition)
3. calculate gradient step for selected elements

I don't think fixed step is needed, but fixed step could be done:

1. zipWithIndex
2. create ShuffleRDD based on the index(eg. using index/10 as key)
3. using mapPartition to calculate each bach

I also have a question:

Can mini batches run in parallel?
I think parallel all batches just like a full batch GD in some case.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NotSerializableException in Spark Streaming

2014-12-15 Thread Nicholas Chammas
This still seems to be broken. In 1.1.1, it errors immediately on this line
(from the above repro script):

liveTweets.map(t => noop(t)).print()

The stack trace is:

org.apache.spark.SparkException: Task not serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
at $iwC$$iwC$$iwC$$iwC.(:27)
at $iwC$$iwC$$iwC.(:32)
at $iwC$$iwC.(:34)
at $iwC.(:36)
at (:38)
at .(:42)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(Obje

Re: NumberFormatException

2014-12-15 Thread Harihar Nahak
Hi Yu,

Try this :
val data = csv.map( line => line.split(",").map(elem => elem.trim)) //lines
in rows

   data.map( rec => (rec(0).toInt, rec(1).toInt))

to convert into integer.

On 16 December 2014 at 10:49, yu [via Apache Spark User List] <
ml-node+s1001560n20694...@n3.nabble.com> wrote:
>
> Hello, everyone
>
> I know 'NumberFormatException' is due to the reason that String can not be
> parsed properly, but I really can not find any mistakes for my code. I hope
> someone may kindly help me.
> My hdfs file is as follows:
> 8,22
> 3,11
> 40,10
> 49,47
> 48,29
> 24,28
> 50,30
> 33,56
> 4,20
> 30,38
> ...
>
> So each line contains an integer + "," + an integer + "\n"
> My code is as follows:
> object StreamMonitor {
>   def main(args: Array[String]): Unit = {
> val myFunc = (str: String) => {
>   val strArray = str.trim().split(",")
>   (strArray(0).toInt, strArray(1).toInt)
> }
> val conf = new SparkConf().setAppName("StreamMonitor");
> val ssc = new StreamingContext(conf, Seconds(30));
> val datastream = ssc.textFileStream("/user/yu/streaminput");
> val newstream = datastream.map(myFunc)
> newstream.saveAsTextFiles("output/", "");
> ssc.start()
> ssc.awaitTermination()
>   }
>
> }
>
> The exception info is:
> 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 0.0 (TID 0, h3): java.lang.NumberFormatException: For input string: "8"
>
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>
> java.lang.Integer.parseInt(Integer.java:492)
> java.lang.Integer.parseInt(Integer.java:527)
>
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:9)
> StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:7)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
>
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
>
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> java.lang.Thread.run(Thread.java:745)
>
> So based on the above info, "8" is the first number in the file and I
> think it should be parsed to integer without any problems.
> I know it may be a very stupid question and the answer may be very easy.
> But I really can not find the reason. I am thankful to anyone who helps!
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>


-- 
Regards,
Harihar Nahak
BigData Developer
Wynyard
Email:hna...@wynyardgroup.com | Extn: 8019




-
--Harihar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694p20696.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: JSON Input files

2014-12-15 Thread Madabhattula Rajesh Kumar
Thank you Peter for the clarification.

Regards,
Rajesh

On Tue, Dec 16, 2014 at 12:42 AM, Michael Armbrust 
wrote:
>
> Underneath the covers, jsonFile uses TextInputFormat, which will split
> files correctly based on new lines.  Thus, there is no fixed maximum size
> for a json object (other than the fact that it must fit into memory on the
> executors).
>
> On Mon, Dec 15, 2014 at 7:22 AM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>>
>> Hi Peter,
>>
>> Thank you for the clarification.
>>
>> Now we need to store each JSON object into one line. Is there any
>> limitation of length of JSON object? So, JSON object will not go to the
>> next line.
>>
>> What will happen if JSON object is a big/huge one?  Will it store in a
>> single line in HDFS?
>>
>> What will happen, if JSON object contains BLOB/CLOB value? Is this entire
>> JSON object stores in single line of HDFS?
>>
>> What will happen, if JSON object exceeding the HDFS block size. For
>> example, single JSON object split into two different worker nodes. In this
>> case, How Spark will read this JSON object?
>>
>> Could you please clarify above questions
>>
>> Regards,
>> Rajesh
>>
>>
>> On Mon, Dec 15, 2014 at 6:52 PM, Peter Vandenabeele <
>> pe...@vandenabeele.com> wrote:
>>>
>>>
>>>
>>> On Sat, Dec 13, 2014 at 5:43 PM, Helena Edelson <
>>> helena.edel...@datastax.com> wrote:
>>>
 One solution can be found here:
 https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets


>>> As far as I understand, the people.json file is not really a proper json
>>> file, but a file documented as:
>>>
>>>   "... JSON files where each line of the files is a JSON object.".
>>>
>>> This means that is a file with multiple lines, but each line needs to
>>> have a fully self-contained JSON object
>>> (initially confusing, this will not parse a standard multi-line JSON
>>> file). We are working to clarify this in
>>> https://github.com/apache/spark/pull/3517
>>>
>>> HTH,
>>>
>>> Peter
>>>
>>>
>>>
>>>
 - Helena
 @helenaedelson

 On Dec 13, 2014, at 11:18 AM, Madabhattula Rajesh Kumar <
 mrajaf...@gmail.com> wrote:

 Hi Team,

 I have a large JSON file in Hadoop. Could you please let me know

 1. How to read the JSON file
 2. How to parse the JSON file

 Please share any example program based on Scala

 Regards,
 Rajesh



>>>
>>>
>>> --
>>> Peter Vandenabeele
>>> http://www.allthingsdata.io
>>> http://www.linkedin.com/in/petervandenabeele
>>> https://twitter.com/peter_v
>>> gsm: +32-478-27.40.69
>>> e-mail: pe...@vandenabeele.com
>>> skype: peter_v_be
>>>
>>


Re: Executor memory

2014-12-15 Thread sandy . ryza
Hi Pala,

Spark executors only reserve spark.storage.memoryFraction (default 0.6) of 
their spark.executor.memory for caching RDDs. The spark UI displays this 
fraction.

spark.executor.memory controls the executor heap size.  
spark.yarn.executor.memoryOverhead controls the extra that's tacked on for the 
container memory.

-Sandy

> On Dec 15, 2014, at 7:53 PM, Pala M Muthaia  
> wrote:
> 
> Hi,
> 
> Running Spark 1.0.1 on Yarn 2.5
> 
> When i specify --executor-memory 4g, the spark UI shows each executor as 
> having only 2.3 GB, and similarly for 8g, only 4.6 GB. 
> 
> I am guessing that the executor memory corresponds to the container memory, 
> and that the task JVM gets only a percentage of the container total memory. 
> Is there a yarn or spark parameter to tune this so that my task JVM actually 
> gets 6GB out of the 8GB for example?
> 
> 
> Thanks.
> 
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fetch Failed caused job failed.

2014-12-15 Thread Mars Max
While I was running spark MR job, there was FetchFailed(BlockManagerId(47,
xx.com, 40975, 0), shuffleId=2, mapId=5, reduceId=286), then there
were many retries, and the job failed finally. 

And the log showed the following error, does anybody meet this error ? or is
it a known issue in Spark ? Thanks.

4/12/16 10:43:43 ERROR PythonRDD: Python worker exited unexpectedly
(crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/home/spark/spark-1.1/python/pyspark/worker.py", line 75, in main
command = pickleSer._read_with_length(infile)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 146, in
_read_with_length
length = read_int(stream)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 464, in
read_int
raise EOFError
EOFError

at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
at
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: org.apache.spark.shuffle.FetchFailedException: Fetch failed:
BlockManagerId(47, nmg01-taihang-d11609.nmg01.baidu.com, 40975, 0) 2 5 286
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:68)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:335)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
14/12/16 10:43:43 ERROR PythonRDD: This may have been caused by a prior
exception:
org.apache.spark.shuffle.FetchFailedException: Fetch failed:
BlockManagerId(47, nmg01-taihang-d11609.nmg01.baidu.com, 40975, 0) 2 5 286
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:68)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark

Re: Executor memory

2014-12-15 Thread Sean Owen
I believe this corresponds to the 0.6 of the whole heap that is
allocated for caching partitions. See spark.storage.memoryFraction on
http://spark.apache.org/docs/latest/configuration.html 0.6 of 4GB is
about 2.3GB.

The note there is important, that you probably don't want to exceed
the JVM old generation size with this parameter.

On Tue, Dec 16, 2014 at 12:53 AM, Pala M Muthaia
 wrote:
> Hi,
>
> Running Spark 1.0.1 on Yarn 2.5
>
> When i specify --executor-memory 4g, the spark UI shows each executor as
> having only 2.3 GB, and similarly for 8g, only 4.6 GB.
>
> I am guessing that the executor memory corresponds to the container memory,
> and that the task JVM gets only a percentage of the container total memory.
> Is there a yarn or spark parameter to tune this so that my task JVM actually
> gets 6GB out of the 8GB for example?
>
>
> Thanks.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ALS failure with size > Integer.MAX_VALUE

2014-12-15 Thread Bharath Ravi Kumar
Ok. We'll try using it in a test cluster running 1.2.
On 16-Dec-2014 1:36 am, "Xiangrui Meng"  wrote:

Unfortunately, it will depends on the Sorter API in 1.2. -Xiangrui

On Mon, Dec 15, 2014 at 11:48 AM, Bharath Ravi Kumar
 wrote:
> Hi Xiangrui,
>
> The block size limit was encountered even with reduced number of item
blocks
> as you had expected. I'm wondering if I could try the new implementation
as
> a standalone library against a 1.1 deployment. Does it have dependencies
on
> any core API's in the current master?
>
> Thanks,
> Bharath
>
> On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar 
> wrote:
>>
>> Thanks Xiangrui. I'll try out setting a smaller number of item blocks.
And
>> yes, I've been following the JIRA for the new ALS implementation. I'll
try
>> it out when it's ready for testing. .
>>
>> On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng  wrote:
>>>
>>> Hi Bharath,
>>>
>>> You can try setting a small item blocks in this case. 1200 is
>>> definitely too large for ALS. Please try 30 or even smaller. I'm not
>>> sure whether this could solve the problem because you have 100 items
>>> connected with 10^8 users. There is a JIRA for this issue:
>>>
>>> https://issues.apache.org/jira/browse/SPARK-3735
>>>
>>> which I will try to implement in 1.3. I'll ping you when it is ready.
>>>
>>> Best,
>>> Xiangrui
>>>
>>> On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar 
>>> wrote:
>>> > Yes, the issue appears to be due to the 2GB block size limitation. I
am
>>> > hence looking for (user, product) block sizing suggestions to work
>>> > around
>>> > the block size limitation.
>>> >
>>> > On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen  wrote:
>>> >>
>>> >> (It won't be that, since you see that the error occur when reading a
>>> >> block from disk. I think this is an instance of the 2GB block size
>>> >> limitation.)
>>> >>
>>> >> On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya
>>> >>  wrote:
>>> >> > Hi Bharath – I’m unsure if this is your problem but the
>>> >> > MatrixFactorizationModel in MLLIB which is the underlying component
>>> >> > for
>>> >> > ALS
>>> >> > expects your User/Product fields to be integers. Specifically, the
>>> >> > input
>>> >> > to
>>> >> > ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
>>> >> > wondering if
>>> >> > perhaps one of your identifiers exceeds MAX_INT, could you write a
>>> >> > quick
>>> >> > check for that?
>>> >> >
>>> >> > I have been running a very similar use case to yours (with more
>>> >> > constrained
>>> >> > hardware resources) and I haven’t seen this exact problem but I’m
>>> >> > sure
>>> >> > we’ve
>>> >> > seen similar issues. Please let me know if you have other
questions.
>>> >> >
>>> >> > From: Bharath Ravi Kumar 
>>> >> > Date: Thursday, November 27, 2014 at 1:30 PM
>>> >> > To: "user@spark.apache.org" 
>>> >> > Subject: ALS failure with size > Integer.MAX_VALUE
>>> >> >
>>> >> > We're training a recommender with ALS in mllib 1.1 against a
dataset
>>> >> > of
>>> >> > 150M
>>> >> > users and 4.5K items, with the total number of training records
>>> >> > being
>>> >> > 1.2
>>> >> > Billion (~30GB data). The input data is spread across 1200
>>> >> > partitions on
>>> >> > HDFS. For the training, rank=10, and we've configured {number of
>>> >> > user
>>> >> > data
>>> >> > blocks = number of item data blocks}. The number of user/item
blocks
>>> >> > was
>>> >> > varied  between 50 to 1200. Irrespective of the block size (e.g. at
>>> >> > 1200
>>> >> > blocks each), there are atleast a couple of tasks that end up
>>> >> > shuffle
>>> >> > reading > 9.7G each in the aggregate stage (ALS.scala:337) and
>>> >> > failing
>>> >> > with
>>> >> > the following exception:
>>> >> >
>>> >> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>>> >> > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
>>> >> > at
>>> >> > org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
>>> >> > at
>>> >> > org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
>>> >> > at
>>> >> >
>>> >> >
>>> >> >
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
>>> >> > at
>>> >> >
>>> >> >
>>> >> >
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
>>> >> >
>>> >
>>> >
>>
>>
>


Fwd: Run Spark job on Playframework + Spark Master/Worker in one Mac

2014-12-15 Thread Tomoya Igarashi
Hi Aniket,
Thanks for your reply.

I followed your advice to modified my code.
Here is latest one.
https://github.com/TomoyaIgarashi/spark_cluster_sample/commit/ce7613c42d3adbe6ae44e264c11f3829460f3c35

As a result, It works correctly! Thank you very much.

But, "AssociationError" Message appears line 397 in Playframework logs as
follows.
https://gist.github.com/TomoyaIgarashi/9688bdd5663af95ddd4d

Is there any problem?


2014-12-15 18:48 GMT+09:00 Aniket Bhatnagar :
>
> Try the workaround (addClassPathJars(sparkContext,
> this.getClass.getClassLoader) discussed in
> http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E
>
> Thanks,
> Aniket
>
>
> On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi <
> tomoya.igarashi.0...@gmail.com> wrote:
>
>> Hi all,
>>
>> I am trying to run Spark job on Playframework + Spark Master/Worker in
>> one Mac.
>> When job ran, I encountered java.lang.ClassNotFoundException.
>> Would you teach me how to solve it?
>>
>> Here is my code in Github.
>> https://github.com/TomoyaIgarashi/spark_cluster_sample
>>
>> * Envrionments:
>> Mac 10.9.5
>> Java 1.7.0_71
>> Playframework 2.2.3
>> Spark 1.1.1
>>
>> * Setup history:
>> > cd ~
>> > git clone g...@github.com:apache/spark.git
>> > cd spark
>> > git checkout -b v1.1.1 v1.1.1
>> > sbt/sbt assembly
>> > vi ~/.bashrc
>> export SPARK_HOME=/Users/tomoya/spark
>> > . ~/.bashrc
>> > hostname
>> Tomoya-Igarashis-MacBook-Air.local
>> > vi $SPARK_HOME/conf/slaves
>> Tomoya-Igarashis-MacBook-Air.local
>> > play new spark_cluster_sample
>> default name
>> type -> scala
>>
>> * Run history:
>> > $SPARK_HOME/sbin/start-all.sh
>> > jps
>> > which play
>> /Users/tomoya/play/play
>> > git clone https://github.com/TomoyaIgarashi/spark_cluster_sample
>> > cd spark_cluster_sample
>> > play run
>>
>> * Error trace:
>> Here is error trace in Gist.
>> https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511
>>
>> Regards
>>
>


Can I set max execution time for any task in a job?

2014-12-15 Thread Mohamed Lrhazi
Is that possible, if not, how would one do it from PySpark ?

This probably does not make sense in most cases, but am writing a script
where my job involves downloading and pushing data into cassandra..
sometimes a task hangs forever, and I dont really mind killing it.. The job
is not actually computing some result that requires all tasks to succeed.

Thanks,
Mohamed.


RE: is there a way to interact with Spark clusters remotely?

2014-12-15 Thread Xiaoyong Zhu
Thanks all for your information! What Pietro mentioned seems to be the 
appropriate solution.. I also find a 
slides
 talking about it.
Several quick questions:

1.   Is it already available in Spark main branch? (seems not but I am not 
sure if it is in plan)

2.   It seems that the current job sever can only submit Java jars (or 
Scala I guess?) - is there any plan to support Python in the future?
Thanks and any information would be appreciated!

Xiaoyong

From: Pietro Gentile [mailto:pietro.gentil...@gmail.com]
Sent: Monday, December 15, 2014 10:33 PM
To: Xiaoyong Zhu
Subject: R: is there a way to interact with Spark clusters remotely?

Hi,

try this https://github.com/spark-jobserver/spark-jobserver .

Best Regards,

Pietro Gentile


Da: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com]
Inviato: lunedì 15 dicembre 2014 15:17
A: user@spark.apache.org
Oggetto: is there a way to interact with Spark clusters remotely?

Hi experts

I am wondering if there is a way to interactive with Spark remotely? i.e. no 
access to clusters required but submit Python/Scala scripts to cluster and get 
result based on (REST) APIs.
That will facilitate the development process a lot..

Xiaoyong


[http://static.avast.com/emails/avast-mail-stamp.png]


Questa e-mail è priva di virus e malware perché è attiva la protezione avast! 
Antivirus .




Re: Run Spark job on Playframework + Spark Master/Worker in one Mac

2014-12-15 Thread Aniket Bhatnagar
Seems you are using standalone mode. Can you check spark worker logs or
application logs in spark work directory to find any errors?

On Tue, Dec 16, 2014, 9:09 AM Tomoya Igarashi <
tomoya.igarashi.0...@gmail.com> wrote:

> Hi Aniket,
> Thanks for your reply.
>
> I followed your advice to modified my code.
> Here is latest one.
>
> https://github.com/TomoyaIgarashi/spark_cluster_sample/commit/ce7613c42d3adbe6ae44e264c11f3829460f3c35
>
> As a result, It works correctly! Thank you very much.
>
> But, "AssociationError" Message appears line 397 in Playframework logs as
> follows.
> https://gist.github.com/TomoyaIgarashi/9688bdd5663af95ddd4d
>
> Is there any problem?
>
>
> 2014-12-15 18:48 GMT+09:00 Aniket Bhatnagar :
>>
>> Try the workaround (addClassPathJars(sparkContext,
>> this.getClass.getClassLoader) discussed in
>> http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E
>>
>> Thanks,
>> Aniket
>>
>>
>> On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi <
>> tomoya.igarashi.0...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I am trying to run Spark job on Playframework + Spark Master/Worker in
>>> one Mac.
>>> When job ran, I encountered java.lang.ClassNotFoundException.
>>> Would you teach me how to solve it?
>>>
>>> Here is my code in Github.
>>> https://github.com/TomoyaIgarashi/spark_cluster_sample
>>>
>>> * Envrionments:
>>> Mac 10.9.5
>>> Java 1.7.0_71
>>> Playframework 2.2.3
>>> Spark 1.1.1
>>>
>>> * Setup history:
>>> > cd ~
>>> > git clone g...@github.com:apache/spark.git
>>> > cd spark
>>> > git checkout -b v1.1.1 v1.1.1
>>> > sbt/sbt assembly
>>> > vi ~/.bashrc
>>> export SPARK_HOME=/Users/tomoya/spark
>>> > . ~/.bashrc
>>> > hostname
>>> Tomoya-Igarashis-MacBook-Air.local
>>> > vi $SPARK_HOME/conf/slaves
>>> Tomoya-Igarashis-MacBook-Air.local
>>> > play new spark_cluster_sample
>>> default name
>>> type -> scala
>>>
>>> * Run history:
>>> > $SPARK_HOME/sbin/start-all.sh
>>> > jps
>>> > which play
>>> /Users/tomoya/play/play
>>> > git clone https://github.com/TomoyaIgarashi/spark_cluster_sample
>>> > cd spark_cluster_sample
>>> > play run
>>>
>>> * Error trace:
>>> Here is error trace in Gist.
>>> https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511
>>>
>>> Regards
>>>
>>


Re: Can I set max execution time for any task in a job?

2014-12-15 Thread Akhil Das
There is a spark listener interface

which
can be used to trigger events like jobStarted, TaskGotResults etc but i
don't think you can set execution time anywhere. If a task is hung, its
mostly because of the GC pause (depends on your job), if you can paste the
code, then probably we can tell you where the bottleneck is.

Thanks
Best Regards

On Tue, Dec 16, 2014 at 9:59 AM, Mohamed Lrhazi <
mohamed.lrh...@georgetown.edu> wrote:
>
> Is that possible, if not, how would one do it from PySpark ?
>
> This probably does not make sense in most cases, but am writing a script
> where my job involves downloading and pushing data into cassandra..
> sometimes a task hangs forever, and I dont really mind killing it.. The job
> is not actually computing some result that requires all tasks to succeed.
>
> Thanks,
> Mohamed.
>


Re: Fetch Failed caused job failed.

2014-12-15 Thread Akhil Das
You could try setting the following while creating the sparkContext

  .set("spark.rdd.compress","true")
.set("spark.storage.memoryFraction","1")
.set("spark.core.connection.ack.wait.timeout","600")
.set("spark.akka.frameSize","50")



Thanks
Best Regards

On Tue, Dec 16, 2014 at 8:30 AM, Mars Max  wrote:
>
> While I was running spark MR job, there was FetchFailed(BlockManagerId(47,
> xx.com, 40975, 0), shuffleId=2, mapId=5, reduceId=286), then there
> were many retries, and the job failed finally.
>
> And the log showed the following error, does anybody meet this error ? or
> is
> it a known issue in Spark ? Thanks.
>
> 4/12/16 10:43:43 ERROR PythonRDD: Python worker exited unexpectedly
> (crashed)
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File "/home/spark/spark-1.1/python/pyspark/worker.py", line 75, in main
> command = pickleSer._read_with_length(infile)
>   File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 146, in
> _read_with_length
> length = read_int(stream)
>   File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 464, in
> read_int
> raise EOFError
> EOFError
>
> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
> at
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154)
> at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at
> org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: org.apache.spark.shuffle.FetchFailedException: Fetch failed:
> BlockManagerId(47, nmg01-taihang-d11609.nmg01.baidu.com, 40975, 0) 2 5 286
> at
>
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:68)
> at
>
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
> at
>
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> at
>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
>
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:335)
> at
>
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
> at
>
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
> at
>
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
> 14/12/16 10:43:43 ERROR PythonRDD: This may have been caused by a prior
> exception:
> org.apache.spark.shuffle.FetchFailedException: Fetch failed:
> BlockManagerId(47, nmg01-taihang-d11609.nmg01.baidu.com, 40975, 0) 2 5 286
> at
>
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:68)
> at
>
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
> at
>
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
>
> org.apache.spark.util.CompletionIterator.hasNe

Re: NumberFormatException

2014-12-15 Thread Akhil Das
There could be some other character like a space or ^M etc. You could try
the following and see the actual row.

val newstream = datastream.map(row => {
try{

val strArray = str.trim().split(",")
(strArray(0).toInt, strArray(1).toInt)
//Instead try this
//*(strArray(0).trim().toInt, strArray(1).trim().toInt)*

}catch{ case e: Exception => println("W000t!! Exception!! => " + e + "\n
The line was :" + row); (0, 0) }
})


Thanks
Best Regards

On Tue, Dec 16, 2014 at 3:19 AM, yu  wrote:
>
> Hello, everyone
>
> I know 'NumberFormatException' is due to the reason that String can not be
> parsed properly, but I really can not find any mistakes for my code. I hope
> someone may kindly help me.
> My hdfs file is as follows:
> 8,22
> 3,11
> 40,10
> 49,47
> 48,29
> 24,28
> 50,30
> 33,56
> 4,20
> 30,38
> ...
>
> So each line contains an integer + "," + an integer + "\n"
> My code is as follows:
> object StreamMonitor {
>   def main(args: Array[String]): Unit = {
> val myFunc = (str: String) => {
>   val strArray = str.trim().split(",")
>   (strArray(0).toInt, strArray(1).toInt)
> }
> val conf = new SparkConf().setAppName("StreamMonitor");
> val ssc = new StreamingContext(conf, Seconds(30));
> val datastream = ssc.textFileStream("/user/yu/streaminput");
> val newstream = datastream.map(myFunc)
> newstream.saveAsTextFiles("output/", "");
> ssc.start()
> ssc.awaitTermination()
>   }
>
> }
>
> The exception info is:
> 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
> (TID 0, h3): java.lang.NumberFormatException: For input string: "8"
>
>
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> java.lang.Integer.parseInt(Integer.java:492)
> java.lang.Integer.parseInt(Integer.java:527)
>
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:9)
> StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:7)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
>
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
>
> So based on the above info, "8" is the first number in the file and I think
> it should be parsed to integer without any problems.
> I know it may be a very stupid question and the answer may be very easy.
> But
> I really can not find the reason. I am thankful to anyone who helps!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming Python APIs?

2014-12-15 Thread smallmonkey...@hotmail.com
Hi zhu:
maybe there is not the python api for spark-stream
baishuo


smallmonkey...@hotmail.com
 
From: Xiaoyong Zhu
Date: 2014-12-15 10:52
To: user@spark.apache.org
Subject: Spark Streaming Python APIs?
Hi spark experts
 
Are there any Python APIs for Spark Streaming? I didn’t find the Python APIs in 
Spark Streaming programming guide..
http://spark.apache.org/docs/latest/streaming-programming-guide.html
 
Xiaoyong
 


Re: Run Spark job on Playframework + Spark Master/Worker in one Mac

2014-12-15 Thread Tomoya Igarashi
Thanks for response.

Yes, I am using standalone mode.

I couldn't find any errors. But, "WARN" messages appear in Spark master
logs.
Here is Spark master logs.
https://gist.github.com/TomoyaIgarashi/72145c11d3769c7d1ddb

FYI
Here is Spark worker logs.
https://gist.github.com/TomoyaIgarashi/0db77e93cacb4a93aa1f
Here is Playframework logs.
https://gist.github.com/TomoyaIgarashi/9688bdd5663af95ddd4d

If you have any comments, please let us know.

Regards


2014-12-16 15:34 GMT+09:00 Aniket Bhatnagar :
>
> Seems you are using standalone mode. Can you check spark worker logs or
> application logs in spark work directory to find any errors?
>
>
> On Tue, Dec 16, 2014, 9:09 AM Tomoya Igarashi <
> tomoya.igarashi.0...@gmail.com> wrote:
>
>> Hi Aniket,
>> Thanks for your reply.
>>
>> I followed your advice to modified my code.
>> Here is latest one.
>>
>> https://github.com/TomoyaIgarashi/spark_cluster_sample/commit/ce7613c42d3adbe6ae44e264c11f3829460f3c35
>>
>> As a result, It works correctly! Thank you very much.
>>
>> But, "AssociationError" Message appears line 397 in Playframework logs as
>> follows.
>> https://gist.github.com/TomoyaIgarashi/9688bdd5663af95ddd4d
>>
>> Is there any problem?
>>
>>
>> 2014-12-15 18:48 GMT+09:00 Aniket Bhatnagar :
>>>
>>> Try the workaround (addClassPathJars(sparkContext,
>>> this.getClass.getClassLoader) discussed in
>>> http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E
>>>
>>> Thanks,
>>> Aniket
>>>
>>>
>>> On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi <
>>> tomoya.igarashi.0...@gmail.com> wrote:
>>>
 Hi all,

 I am trying to run Spark job on Playframework + Spark Master/Worker in
 one Mac.
 When job ran, I encountered java.lang.ClassNotFoundException.
 Would you teach me how to solve it?

 Here is my code in Github.
 https://github.com/TomoyaIgarashi/spark_cluster_sample

 * Envrionments:
 Mac 10.9.5
 Java 1.7.0_71
 Playframework 2.2.3
 Spark 1.1.1

 * Setup history:
 > cd ~
 > git clone g...@github.com:apache/spark.git
 > cd spark
 > git checkout -b v1.1.1 v1.1.1
 > sbt/sbt assembly
 > vi ~/.bashrc
 export SPARK_HOME=/Users/tomoya/spark
 > . ~/.bashrc
 > hostname
 Tomoya-Igarashis-MacBook-Air.local
 > vi $SPARK_HOME/conf/slaves
 Tomoya-Igarashis-MacBook-Air.local
 > play new spark_cluster_sample
 default name
 type -> scala

 * Run history:
 > $SPARK_HOME/sbin/start-all.sh
 > jps
 > which play
 /Users/tomoya/play/play
 > git clone https://github.com/TomoyaIgarashi/spark_cluster_sample
 > cd spark_cluster_sample
 > play run

 * Error trace:
 Here is error trace in Gist.
 https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511

 Regards

>>>


Accessing Apache Spark from Java

2014-12-15 Thread Jai
Hi

I have installed a standalone Spark set up in standalone mode in a Linux
server and I am trying to access that spark setup from Java in windows. When
I try connecting to Spark I see the following exception

14/12/16 12:52:52 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/12/16 12:52:56 INFO AppClient$ClientActor: Connecting to master
spark://01hw294954.INDIA:7077...
14/12/16 12:53:07 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/12/16 12:53:16 INFO AppClient$ClientActor: Connecting to master
spark://01hw294954.INDIA:7077...
14/12/16 12:53:22 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/12/16 12:53:36 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: All masters are unresponsive! Giving up.
14/12/16 12:53:36 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool 
14/12/16 12:53:36 INFO TaskSchedulerImpl: Cancelling stage 0
14/12/16 12:53:36 INFO DAGScheduler: Failed to run collect at
MySqlConnector.java:579
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: All masters are unresponsive! Giving up.
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run

I have attached the Spark Master UI 

 Spark Master at spark://01hw294954.INDIA:7077
URL: spark://01hw294954.INDIA:7077
Workers: 1
Cores: 2 Total, 0 Used
Memory: 835.0 MB Total, 0.0 B Used
Applications: 0 Running, 0 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE
Workers

Id  Address State   Cores   Memory
worker-20141216123503-01hw294954.INDIA-3896201hw294954.INDIA:38962  ALIVE   
2
(0 Used) 835.0 MB (0.0 B Used)
Running Applications

ID  NameCores   Memory per Node Submitted Time  UserState   Duration
Completed Applications

ID  NameCores   Memory per Node Submitted Time  UserState   Duration


My Spark Slave is 

 Spark Worker at 01hw294954.INDIA:38962
ID: worker-20141216123503-01hw294954.INDIA-38962
Master URL: spark://01hw294954.INDIA:7077
Cores: 2 (0 Used)
Memory: 835.0 MB (0.0 B Used)
Back to Master

Running Executors (0)

ExecutorID  Cores   State   Memory  Job Details Logs


My Java Master Code looks like this 

SparkConf sparkConf = new SparkConf().setAppName("JdbcRddTest");
sparkConf.setMaster("spark://01hw294954.INDIA:7077");
When I tried using the same code with the local spark set up as the master 
it ran. 

Any help for solving this issue is very much appreciated.

Thanks and Regards
Jai




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-Apache-Spark-from-Java-tp20700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org