Re: Serialization issue when using HBase with Spark

2014-12-14 Thread Yanbo
In #1, class HTable can not be serializable.
You also need to check you self defined function getUserActions and make sure 
it is a member function of one class who implement serializable interface.

发自我的 iPad

> 在 2014年12月12日,下午4:35,yangliuyu  写道:
> 
> The scenario is using HTable instance to scan multiple rowkey range in Spark
> tasks look likes below:
> Option 1:
> val users = input
>  .map { case (deviceId, uid) =>
> uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
>  val conf = HBaseConfiguration.create()
>  val table = new HTable(conf, "actions")
>  val result = iterator.map{ userId=>
>(userId, getUserActions(table, userId, timeStart, timeStop))
>  }
>  table.close()
>  result
>})
> 
> But got the exception:
> org.apache.spark.SparkException: Task not serializable
>at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)...
> ...
> Caused by: java.io.NotSerializableException:
> org.apache.hadoop.conf.Configuration
>at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 
> The reason not using sc.newAPIHadoopRDD is it only support one scan each
> time.
> val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>  classOf[org.apache.hadoop.hbase.client.Result]) 
> 
> And if using MultiTableInputFormat, driver is not possible put all rowkeys
> into HBaseConfiguration
> Option 2:
> sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
>  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>  classOf[org.apache.hadoop.hbase.client.Result])
> 
> It may divide all rowkey ranges into several parts then use option 2, but I
> prefer option 1. So is there any solution for option 1? 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JSON Input files

2014-12-14 Thread Madabhattula Rajesh Kumar
Hi Helena and All,

I have a below example JSON file format. My use case is to read "NAME"
variable.

When I execute I got next exception

*"Exception in thread "main"
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: 'NAME, tree:Project ['NAME] Subquery device"*

*Please let me know how to read values from JSON using Spark SQL*

*CODE BLOCK :*




*val device =
sqlContext.jsonFile("hdfs://localhost:9000/user/rajesh/json/test.json")
device.registerAsTable("device")device.printSchemaval results =
sqlContext.sql("SELECT NAME FROM device").collect.foreach(println)*

*JSON format :*

{ "Device 1" :
 {"NAME" : "Device 1",
  "GROUP" : "1",
  "SITE" : "qqq",
  "DIRECTION" : "East",
 }
 "Device 2" :
 {"NAME" : "Device 2",
  "GROUP" : "2",
  "SITE" : "sss",
  "DIRECTION" : "North",
 }
}



On Sat, Dec 13, 2014 at 10:13 PM, Helena Edelson <
helena.edel...@datastax.com> wrote:
>
> One solution can be found here:
> https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets
>
> - Helena
> @helenaedelson
>
> On Dec 13, 2014, at 11:18 AM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
> Hi Team,
>
> I have a large JSON file in Hadoop. Could you please let me know
>
> 1. How to read the JSON file
> 2. How to parse the JSON file
>
> Please share any example program based on Scala
>
> Regards,
> Rajesh
>
>
>


Re: Spark-SQL JDBC driver

2014-12-14 Thread Michael Armbrust
I'll add that there is an experimental method that allows you to start the
JDBC server with an existing HiveContext (which might have registered
temporary tables).

https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala#L42


On Thu, Dec 11, 2014 at 6:52 AM, Denny Lee  wrote:
>
> Yes, that is correct. A quick reference on this is the post
> https://www.linkedin.com/pulse/20141007143323-732459-an-absolutely-unofficial-way-to-connect-tableau-to-sparksql-spark-1-1?_mSplash=1
> with the pertinent section being:
>
> It is important to note that when you create Spark tables (for example,
> via the .registerTempTable) these are operating within the Spark
> environment which resides in a separate process than the Hive Metastore.
> This means that currently tables that are created within the Spark context
> are not available through the Thrift server. To achieve this, within the
> Spark context save your temporary table into Hive - then the Spark Thrift
> Server will be able to see the table.
>
> HTH!
>
>
> On Thu, Dec 11, 2014 at 04:09 Anas Mosaad  wrote:
>
>> Actually I came to a conclusion that RDDs has to be persisted in hive in
>> order to be able to access through thrift.
>> Hope I didn't end up with incorrect conclusion.
>> Please someone correct me if I am wrong.
>> On Dec 11, 2014 8:53 AM, "Judy Nash" 
>> wrote:
>>
>>>  Looks like you are wondering why you cannot see the RDD table you have
>>> created via thrift?
>>>
>>>
>>>
>>> Based on my own experience with spark 1.1, RDD created directly via
>>> Spark SQL (i.e. Spark Shell or Spark-SQL.sh) is not visible on thrift,
>>> since thrift has its own session containing its own RDD.
>>>
>>> Spark SQL experts on the forum can confirm on this though.
>>>
>>>
>>>
>>> *From:* Cheng Lian [mailto:lian.cs@gmail.com]
>>> *Sent:* Tuesday, December 9, 2014 6:42 AM
>>> *To:* Anas Mosaad
>>> *Cc:* Judy Nash; user@spark.apache.org
>>> *Subject:* Re: Spark-SQL JDBC driver
>>>
>>>
>>>
>>> According to the stacktrace, you were still using SQLContext rather than
>>> HiveContext. To interact with Hive, HiveContext *must* be used.
>>>
>>> Please refer to this page
>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables
>>>
>>>  On 12/9/14 6:26 PM, Anas Mosaad wrote:
>>>
>>>  Back to the first question, this will mandate that hive is up and
>>> running?
>>>
>>>
>>>
>>> When I try it, I get the following exception. The documentation says
>>> that this method works only on SchemaRDD. I though that
>>> countries.saveAsTable did not work for that a reason so I created a tmp
>>> that contains the results from the registered temp table. Which I could
>>> validate that it's a SchemaRDD as shown below.
>>>
>>>
>>>
>>>
>>> * @Judy,* I do really appreciate your kind support and I want to
>>> understand and off course don't want to wast your time. If you can direct
>>> me the documentation describing this details, this will be great.
>>>
>>>
>>>
>>> scala> val tmp = sqlContext.sql("select * from countries")
>>>
>>> tmp: org.apache.spark.sql.SchemaRDD =
>>>
>>> SchemaRDD[12] at RDD at SchemaRDD.scala:108
>>>
>>> == Query Plan ==
>>>
>>> == Physical Plan ==
>>>
>>> PhysicalRDD
>>> [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29],
>>> MapPartitionsRDD[9] at mapPartitions at ExistingRDD.scala:36
>>>
>>>
>>>
>>> scala> tmp.saveAsTable("Countries")
>>>
>>> org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
>>> Unresolved plan found, tree:
>>>
>>> 'CreateTableAsSelect None, Countries, false, None
>>>
>>>  Project
>>> [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29]
>>>
>>>   Subquery countries
>>>
>>>LogicalRDD
>>> [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29],
>>> MapPartitionsRDD[9] at mapPartitions at ExistingRDD.scala:36
>>>
>>>
>>>
>>> at
>>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83)
>>>
>>> at
>>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)
>>>
>>> at
>>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
>>>
>>> at
>>> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
>>>
>>> at
>>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
>>>
>>> at
>>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
>>>
>>> at
>>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$

Re: SchemaRDD partition on specific column values?

2014-12-14 Thread Michael Armbrust
I'm happy to discuss what it would take to make sure we can propagate this
information correctly.  Please open a JIRA (and mention me in it).

Regarding including it in 1.2.1, it depends on how invasive the change ends
up being, but it is certainly possible.

On Thu, Dec 11, 2014 at 3:55 AM, nitin  wrote:
>
> Can we take this as a performance improvement task in Spark-1.2.1? I can
> help
> contribute for this.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20623.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: JSON Input files

2014-12-14 Thread Yanbo
Pay attention to your JSON file, try to change it like following.
Each record represent as a JSON string.

 {"NAME" : "Device 1",
  "GROUP" : "1",
  "SITE" : "qqq",
  "DIRECTION" : "East",
 }
 {"NAME" : "Device 2",
  "GROUP" : "2",
  "SITE" : "sss",
  "DIRECTION" : "North",
 }

> 在 2014年12月14日,下午5:01,Madabhattula Rajesh Kumar  写道:
> 
> { "Device 1" : 
>  {"NAME" : "Device 1",
>   "GROUP" : "1",
>   "SITE" : "qqq",
>   "DIRECTION" : "East",
>  }
>  "Device 2" : 
>  {"NAME" : "Device 2",
>   "GROUP" : "2",
>   "SITE" : "sss",
>   "DIRECTION" : "North",
>  }
> }

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Read data from SparkStreaming from Java socket.

2014-12-14 Thread Guillermo Ortiz
Why doesn't it work?? I guess that it's the same with "\n".


2014-12-13 12:56 GMT+01:00 Guillermo Ortiz :
> I got it, thanks,, a silly question,, why if I do:
> out.write("hello " + System.currentTimeMillis() + "\n"); it doesn't
> detect anything and if I do
> out.println("hello " + System.currentTimeMillis());  it works??
>
> I'm doing with spark
> val errorLines = lines.filter(_.contains("hello"))
>
>
> 2014-12-13 8:12 GMT+01:00 Tathagata Das :
>> Yes, socketTextStream starts a TCP client that tries to connect to a
>> TCP server (localhost: in your case). If there is a server running
>> on that port that can send data to connected TCP connections, then you
>> will receive data in the stream.
>>
>> Did you check out the quick example in the streaming programming guide?
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html
>> That has instructions to start a netcat server on port  and send
>> data to spark streaming through that.
>>
>> TD
>>
>> On Fri, Dec 12, 2014 at 9:54 PM, Akhil Das  
>> wrote:
>>> socketTextStream is Socket client which will read from a TCP ServerSocket.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Fri, Dec 12, 2014 at 7:21 PM, Guillermo Ortiz 
>>> wrote:

 I dont' understand what spark streaming socketTextStream is waiting...
 is it like a server so you just have to send data from a client?? or
 what's it excepting?

 2014-12-12 14:19 GMT+01:00 Akhil Das :
 > I have created a Serversocket program which you can find over here
 > https://gist.github.com/akhld/4286df9ab0677a555087 It simply listens to
 > the
 > given port and when the client connects, it will send the contents of
 > the
 > given file. I'm attaching the executable jar also, you can run the jar
 > as:
 >
 > java -jar SocketBenchmark.jar student 12345 io
 >
 > Here student is the file which will be sent to the client whoever
 > connects
 > on 12345, i have it tested and is working with SparkStreaming
 > (socketTextStream).
 >
 >
 > Thanks
 > Best Regards
 >
 > On Fri, Dec 12, 2014 at 6:25 PM, Guillermo Ortiz 
 > wrote:
 >>
 >> Hi,
 >>
 >> I'm a newbie with Spark,, I'm just trying to use SparkStreaming and
 >> filter some data sent with a Java Socket but it's not working... it
 >> works when I use ncat
 >>
 >> Why is it not working??
 >>
 >> My sparkcode is just this:
 >> val sparkConf = new
 >> SparkConf().setMaster("local[2]").setAppName("Test")
 >> val ssc = new StreamingContext(sparkConf, Seconds(5))
 >> val lines = ssc.socketTextStream("localhost", )
 >> val errorLines = lines.filter(_.contains("hello"))
 >> errorLines.print()
 >>
 >> I created a client socket which sends data to that port, but it could
 >> connect any address, I guess that Spark doesn't work like a
 >> serverSocket... what's the way to send data from a socket with Java to
 >> be able to read from socketTextStream??
 >>
 >> -
 >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 >> For additional commands, e-mail: user-h...@spark.apache.org
 >>
 >

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Read data from SparkStreaming from Java socket.

2014-12-14 Thread Gerard Maas
Are you using a bufferedPrintWriter?   that's probably a different flushing
behaviour.  Try doing out.flush() after out.write(...) and you will have
the same result.

This is Spark unrelated btw.

-kr, Gerard.


Re: MLLIB model export: PMML vs MLLIB serialization

2014-12-14 Thread selvinsource
Hi Sourabh,

have a look at https://issues.apache.org/jira/browse/SPARK-1406, I am
looking into exporting models in PMML using JPMML.

Regards,
Vincenzo



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20674.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Read data from SparkStreaming from Java socket.

2014-12-14 Thread Guillermo Ortiz
Thanks.

2014-12-14 12:20 GMT+01:00 Gerard Maas :
> Are you using a bufferedPrintWriter?   that's probably a different flushing
> behaviour.  Try doing out.flush() after out.write(...) and you will have the
> same result.
>
> This is Spark unrelated btw.
>
> -kr, Gerard.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Having problem with Spark streaming with Kinesis

2014-12-14 Thread Aniket Bhatnagar
The reason is because of the following code:

val numStreams = numShards
val kinesisStreams = (0 until numStreams).map { i =>
  KinesisUtils.createStream(ssc, streamName, endpointUrl,
kinesisCheckpointInterval,
  InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
}

In the above code, numStreams is set as numShards. This enforces the need
to have #shards + 1 workers. If you set numStreams as Math.min(numShards,
numAvailableWorkers - 1), you can have lesser number of workers than number
of shards. Makes sense?

On Sun Dec 14 2014 at 10:06:36 A.K.M. Ashrafuzzaman <
ashrafuzzaman...@gmail.com> wrote:

> Thanks Aniket,
> The trick is to have the #workers >= #shards + 1. But I don’t know why is
> that.
> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>
> Here in the figure[spark streaming kinesis architecture], it seems like
> one node should be able to take on more than one shards.
>
>
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred 
>
> (M) 880-175-5592433
> Twitter  | Blog
>  | Facebook
> 
>
> Check out The Academy , your #1 source
> for free content marketing resources
>
> On Nov 26, 2014, at 6:23 PM, A.K.M. Ashrafuzzaman <
> ashrafuzzaman...@gmail.com> wrote:
>
> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use
> more that 1 then it falls into an infinite loop and no data is processed by
> the spark streaming. In the kinesis dynamo DB, I can see that it keeps
> increasing the leaseCounter. But it do start processing.
>
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
>
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred 
>
> (M) 880-175-5592433
> Twitter  | Blog
>  | Facebook
> 
>
> Check out The Academy , your #1 source
> for free content marketing resources
>
>
>


Re: JSON Input files

2014-12-14 Thread Madabhattula Rajesh Kumar
Thank you Yanbo

Regards,
Rajesh

On Sun, Dec 14, 2014 at 3:15 PM, Yanbo  wrote:
>
> Pay attention to your JSON file, try to change it like following.
> Each record represent as a JSON string.
>
>  {"NAME" : "Device 1",
>   "GROUP" : "1",
>   "SITE" : "qqq",
>   "DIRECTION" : "East",
>  }
>  {"NAME" : "Device 2",
>   "GROUP" : "2",
>   "SITE" : "sss",
>   "DIRECTION" : "North",
>  }
>
> > 在 2014年12月14日,下午5:01,Madabhattula Rajesh Kumar  写道:
> >
> > { "Device 1" :
> >  {"NAME" : "Device 1",
> >   "GROUP" : "1",
> >   "SITE" : "qqq",
> >   "DIRECTION" : "East",
> >  }
> >  "Device 2" :
> >  {"NAME" : "Device 2",
> >   "GROUP" : "2",
> >   "SITE" : "sss",
> >   "DIRECTION" : "North",
> >  }
> > }
>


pyspark is crashing in this case. why?

2014-12-14 Thread genesis fatum
Hi,

My environment is: standalone spark 1.1.1 on windows 8.1 pro.

The following case works fine:
>>> a = [1,2,3,4,5,6,7,8,9]
>>> b = []
>>> for x in range(10):
...  b.append(a)
...
>>> rdd1 = sc.parallelize(b)
>>> rdd1.first()
>>>[1, 2, 3, 4, 5, 6, 7, 8, 9]

The following case does not work. The only difference is the size of the
array. Note the loop range: 100K vs. 1M.
>>> a = [1,2,3,4,5,6,7,8,9]
>>> b = []
>>> for x in range(100):
...  b.append(a)
...
>>> rdd1 = sc.parallelize(b)
>>> rdd1.first()
>>>
14/12/14 07:52:19 ERROR PythonRDD: Python worker exited unexpectedly
(crashed)
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
at java.io.BufferedOutputStream.write(Unknown Source)
at java.io.DataOutputStream.write(Unknown Source)
at java.io.FilterOutputStream.write(Unknown Source)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
1.apply(PythonRDD.scala:341)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
1.apply(PythonRDD.scala:339)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD
D.scala:339)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
ly$mcV$sp(PythonRDD.scala:209)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
ly(PythonRDD.scala:184)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
ly(PythonRDD.scala:184)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364)
at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal
a:183)

What I have tried:
1. Replaced JRE 32bit with JRE64 
2. Multiple configurations when I start pyspark: --driver-memory,
--executor-memory
3. Tried to set the SparkConf with different settings
4. Tried also with spark 1.1.0

Being new to Spark, I am sure that it is something simple that I am missing
and would appreciate any thoughts.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-is-crashing-in-this-case-why-tp20675.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MLlib vs Madlib

2014-12-14 Thread Venkat, Ankam
Can somebody throw light on MLlib vs Madlib?

Which is better for machine learning? and are there any specific use case 
scenarios MLlib or Madlib will shine in?

Regards,
Venkat Ankam
This communication is the property of CenturyLink and may contain confidential 
or privileged information. Unauthorized use of this communication is strictly 
prohibited and may be unlawful. If you have received this communication in 
error, please immediately notify the sender by reply e-mail and destroy all 
copies of the communication and any attachments.


Limit the # of columns in Spark Scala

2014-12-14 Thread Denny Lee
I have a large of files within HDFS that I would like to do a group by
statement ala

val table = sc.textFile("hdfs://")
val tabs = table.map(_.split("\t"))

I'm trying to do something similar to
tabs.map(c => (c._(167), c._(110), c._(200))

where I create a new RDD that only has
but that isn't quite right because I'm not really manipulating sequences.

BTW, I cannot use SparkSQL / case right now because my table has 200
columns (and I'm on Scala 2.10.3)

Thanks!
Denny


Re: Limit the # of columns in Spark Scala

2014-12-14 Thread Gerard Maas
Hi,

I don't get what the problem is. That map to selected columns looks like
the way to go given the context. What's not working?

Kr, Gerard
On Dec 14, 2014 5:17 PM, "Denny Lee"  wrote:

> I have a large of files within HDFS that I would like to do a group by
> statement ala
>
> val table = sc.textFile("hdfs://")
> val tabs = table.map(_.split("\t"))
>
> I'm trying to do something similar to
> tabs.map(c => (c._(167), c._(110), c._(200))
>
> where I create a new RDD that only has
> but that isn't quite right because I'm not really manipulating sequences.
>
> BTW, I cannot use SparkSQL / case right now because my table has 200
> columns (and I'm on Scala 2.10.3)
>
> Thanks!
> Denny
>
>


Re: Limit the # of columns in Spark Scala

2014-12-14 Thread Denny Lee
Getting a bunch of syntax errors. Let me get back with the full statement
and error later today. Thanks for verifying my thinking wasn't out in left
field.
On Sun, Dec 14, 2014 at 08:56 Gerard Maas  wrote:

> Hi,
>
> I don't get what the problem is. That map to selected columns looks like
> the way to go given the context. What's not working?
>
> Kr, Gerard
> On Dec 14, 2014 5:17 PM, "Denny Lee"  wrote:
>
>> I have a large of files within HDFS that I would like to do a group by
>> statement ala
>>
>> val table = sc.textFile("hdfs://")
>> val tabs = table.map(_.split("\t"))
>>
>> I'm trying to do something similar to
>> tabs.map(c => (c._(167), c._(110), c._(200))
>>
>> where I create a new RDD that only has
>> but that isn't quite right because I'm not really manipulating sequences.
>>
>> BTW, I cannot use SparkSQL / case right now because my table has 200
>> columns (and I'm on Scala 2.10.3)
>>
>> Thanks!
>> Denny
>>
>>


Re: MLlib vs Madlib

2014-12-14 Thread Brian Dolan
MADLib (http://madlib.net/) was designed to bring large-scale ML techniques to 
a relational database, primarily postgresql.  MLlib assumes the data exists in 
some Spark-compatible data format.

I would suggest you pick the library that matches your data platform first.

DISCLAIMER: I am the original author of MADLib, though EMC/Pivotal assumed 
ownership rather quickly.


~~
May All Your Sequences Converge



On Dec 14, 2014, at 6:26 AM, "Venkat, Ankam"  
wrote:

> Can somebody throw light on MLlib vs Madlib? 
>  
> Which is better for machine learning? and are there any specific use case 
> scenarios MLlib or Madlib will shine in?
>  
> Regards,
> Venkat Ankam
> This communication is the property of CenturyLink and may contain 
> confidential or privileged information. Unauthorized use of this 
> communication is strictly prohibited and may be unlawful. If you have 
> received this communication in error, please immediately notify the sender by 
> reply e-mail and destroy all copies of the communication and any attachments.



DStream demultiplexer based on a key

2014-12-14 Thread Jean-Pascal Billaud
Hey,

I am doing an experiment with Spark Streaming consisting of moving data
from Kafka to S3 locations while partitioning by date. I have already
looked into Linked Camus and Pinterest Secor and while both are workable
solutions, it just feels that Spark Streaming should be able to be on par
with those without having to manage yet another application in our stack
since we already have a Spark Streaming cluster in production.

So what I am trying to do is very simple really. Each message in Kafka is
thrift serialized, and the corresponding thrift objects have a timestamp
field. What I'd like is to do is something like that:

JavaPairDStream stream = KafkaUtils.createRawStream(...)
stream = stream.map(new PairFunction, String, Log> {
  public Tuple2 call(Tuple2 tuple) {
return new Tuple2<>(tuple._2().getDate(), tuple._2());
  }
}

At this point, I'd like to do some partitioning on the resulting DStream to
have multiple DStream each with a single common string Date... So for
instance in one DStream I would have all the entries from 12/01 and on
another the entries from 12/02. Once I have this list of DStream, for each
of them I would call saveAsObjectFiles() basically. I unfortunately did not
find a way to demultiplex DStream based on a key. Obviously the reduce
operation families does some of that but the result is still a single
DStream.

An alternative approach would be to call forEachRDD() on the DStream and
demultiplex the entries into multiple new RDDs based on the timestamp to
bucketize the entries with the same day date in the same RDD and finally
call saveAsObjectFiles(). I am not sure if I can use parallelize() to
create those RDDs?

Another thing that I am gonna be experimenting with is to use much longer
batching interval. I am talking in minutes because I don't want to have
bunch of tiny files. I might simply use a bigger Duration or use one of the
window operation. Not sure if anybody tries running Spark Streaming in that
way.

Any thoughts on that would be much appreciated,

Thanks!


Re: pyspark is crashing in this case. why?

2014-12-14 Thread Sameer Farooqui
How much executor-memory are you setting for the JVM? What about the Driver
JVM memory?

Also check the Windows Event Log for Out of memory errors for one of the 2
above JVMs.
On Dec 14, 2014 6:04 AM, "genesis fatum"  wrote:

> Hi,
>
> My environment is: standalone spark 1.1.1 on windows 8.1 pro.
>
> The following case works fine:
> >>> a = [1,2,3,4,5,6,7,8,9]
> >>> b = []
> >>> for x in range(10):
> ...  b.append(a)
> ...
> >>> rdd1 = sc.parallelize(b)
> >>> rdd1.first()
> >>>[1, 2, 3, 4, 5, 6, 7, 8, 9]
>
> The following case does not work. The only difference is the size of the
> array. Note the loop range: 100K vs. 1M.
> >>> a = [1,2,3,4,5,6,7,8,9]
> >>> b = []
> >>> for x in range(100):
> ...  b.append(a)
> ...
> >>> rdd1 = sc.parallelize(b)
> >>> rdd1.first()
> >>>
> 14/12/14 07:52:19 ERROR PythonRDD: Python worker exited unexpectedly
> (crashed)
> java.net.SocketException: Connection reset by peer: socket write error
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at java.net.SocketOutputStream.socketWrite(Unknown Source)
> at java.net.SocketOutputStream.write(Unknown Source)
> at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
> at java.io.BufferedOutputStream.write(Unknown Source)
> at java.io.DataOutputStream.write(Unknown Source)
> at java.io.FilterOutputStream.write(Unknown Source)
> at
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
> 1.apply(PythonRDD.scala:341)
> at
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
> 1.apply(PythonRDD.scala:339)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD
> D.scala:339)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
> ly$mcV$sp(PythonRDD.scala:209)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
> ly(PythonRDD.scala:184)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
> ly(PythonRDD.scala:184)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal
> a:183)
>
> What I have tried:
> 1. Replaced JRE 32bit with JRE64
> 2. Multiple configurations when I start pyspark: --driver-memory,
> --executor-memory
> 3. Tried to set the SparkConf with different settings
> 4. Tried also with spark 1.1.0
>
> Being new to Spark, I am sure that it is something simple that I am missing
> and would appreciate any thoughts.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-is-crashing-in-this-case-why-tp20675.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DStream demultiplexer based on a key

2014-12-14 Thread Gerard Maas
Hi Jean-Pascal,

At Virdata we do a similar thing to 'bucketize' our data to different
keyspaces in Cassandra.

The basic construction would be to filter the DStream (or the underlying
RDD) for each key and then apply the usual storage operations on that new
data set.
Given that, in your case, you need the data within the stream to apply the
filter, you will need first to collect those keys in order to create the
buckets.

Something like this:

val kafkaStream =  ???
kafkaStream.foreachRDD{rdd  =>
rdd.cache() // very important!
val keys = rdd.map(elem => key(elem)).distinct.collect  // where
key(...) is a function to get the desired key from each record
keys.foreach{ key =>
rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
}
rdd.unpersist()
}

-kr, Gerard.




On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud 
wrote:
>
> Hey,
>
> I am doing an experiment with Spark Streaming consisting of moving data
> from Kafka to S3 locations while partitioning by date. I have already
> looked into Linked Camus and Pinterest Secor and while both are workable
> solutions, it just feels that Spark Streaming should be able to be on par
> with those without having to manage yet another application in our stack
> since we already have a Spark Streaming cluster in production.
>
> So what I am trying to do is very simple really. Each message in Kafka is
> thrift serialized, and the corresponding thrift objects have a timestamp
> field. What I'd like is to do is something like that:
>
> JavaPairDStream stream = KafkaUtils.createRawStream(...)
> stream = stream.map(new PairFunction, String, Log> {
>   public Tuple2 call(Tuple2 tuple) {
> return new Tuple2<>(tuple._2().getDate(), tuple._2());
>   }
> }
>
> At this point, I'd like to do some partitioning on the resulting DStream
> to have multiple DStream each with a single common string Date... So for
> instance in one DStream I would have all the entries from 12/01 and on
> another the entries from 12/02. Once I have this list of DStream, for each
> of them I would call saveAsObjectFiles() basically. I unfortunately did not
> find a way to demultiplex DStream based on a key. Obviously the reduce
> operation families does some of that but the result is still a single
> DStream.
>
> An alternative approach would be to call forEachRDD() on the DStream and
> demultiplex the entries into multiple new RDDs based on the timestamp to
> bucketize the entries with the same day date in the same RDD and finally
> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
> create those RDDs?
>
> Another thing that I am gonna be experimenting with is to use much longer
> batching interval. I am talking in minutes because I don't want to have
> bunch of tiny files. I might simply use a bigger Duration or use one of the
> window operation. Not sure if anybody tries running Spark Streaming in that
> way.
>
> Any thoughts on that would be much appreciated,
>
> Thanks!
>


Re: DStream demultiplexer based on a key

2014-12-14 Thread Jean-Pascal Billaud
Ah! That sounds very much like what I need. A very basic question (most
likely), why is "rdd.cache()" critical? Isn't it already true that in Spark
Streaming DStream are cached in memory anyway?

Also any experience with minutes long batch interval?

Thanks for the quick answer!

On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas  wrote:
>
> Hi Jean-Pascal,
>
> At Virdata we do a similar thing to 'bucketize' our data to different
> keyspaces in Cassandra.
>
> The basic construction would be to filter the DStream (or the underlying
> RDD) for each key and then apply the usual storage operations on that new
> data set.
> Given that, in your case, you need the data within the stream to apply the
> filter, you will need first to collect those keys in order to create the
> buckets.
>
> Something like this:
>
> val kafkaStream =  ???
> kafkaStream.foreachRDD{rdd  =>
> rdd.cache() // very important!
> val keys = rdd.map(elem => key(elem)).distinct.collect  // where
> key(...) is a function to get the desired key from each record
> keys.foreach{ key =>
> rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
> }
> rdd.unpersist()
> }
>
> -kr, Gerard.
>
>
>
>
> On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud 
> wrote:
>>
>> Hey,
>>
>> I am doing an experiment with Spark Streaming consisting of moving data
>> from Kafka to S3 locations while partitioning by date. I have already
>> looked into Linked Camus and Pinterest Secor and while both are workable
>> solutions, it just feels that Spark Streaming should be able to be on par
>> with those without having to manage yet another application in our stack
>> since we already have a Spark Streaming cluster in production.
>>
>> So what I am trying to do is very simple really. Each message in Kafka is
>> thrift serialized, and the corresponding thrift objects have a timestamp
>> field. What I'd like is to do is something like that:
>>
>> JavaPairDStream stream = KafkaUtils.createRawStream(...)
>> stream = stream.map(new PairFunction, String, Log> {
>>   public Tuple2 call(Tuple2 tuple) {
>> return new Tuple2<>(tuple._2().getDate(), tuple._2());
>>   }
>> }
>>
>> At this point, I'd like to do some partitioning on the resulting DStream
>> to have multiple DStream each with a single common string Date... So for
>> instance in one DStream I would have all the entries from 12/01 and on
>> another the entries from 12/02. Once I have this list of DStream, for each
>> of them I would call saveAsObjectFiles() basically. I unfortunately did not
>> find a way to demultiplex DStream based on a key. Obviously the reduce
>> operation families does some of that but the result is still a single
>> DStream.
>>
>> An alternative approach would be to call forEachRDD() on the DStream and
>> demultiplex the entries into multiple new RDDs based on the timestamp to
>> bucketize the entries with the same day date in the same RDD and finally
>> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
>> create those RDDs?
>>
>> Another thing that I am gonna be experimenting with is to use much longer
>> batching interval. I am talking in minutes because I don't want to have
>> bunch of tiny files. I might simply use a bigger Duration or use one of the
>> window operation. Not sure if anybody tries running Spark Streaming in that
>> way.
>>
>> Any thoughts on that would be much appreciated,
>>
>> Thanks!
>>
>


HTTP 500 Error for SparkUI in YARN Cluster mode

2014-12-14 Thread Benyi Wang
I got this error when I click "Track URL: ApplicationMaster" when I run a
spark job in YARN cluster mode. I found this jira
https://issues.apache.org/jira/browse/YARN-800, but I could not get this
problem fixed. I'm running CDH 5.1.0 with Both HDFS and RM HA enabled. Does
anybody has the similar issue? How do you fix this?HTTP ERROR 500

Problem accessing /proxy/application_1418016558670_0193/. Reason:

Connection refused

Caused by:

java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)


Re: DStream demultiplexer based on a key

2014-12-14 Thread Gerard Maas
I haven't done anything else than performance tuning on Spark Streaming for
the past weeks. rdd.cache makes a huge difference. A must in this case
where you want to iterate over the same RDD several times.

Intuitively, I also thought that all data was in memory already so that
wouldn't make a difference and I was very surprised to see stage times
dropping from seconds to ms when cache() was present.

Our intervals are 10-12 seconds long. I've not tried batches of minutes
yet.
Probably the best way would be to use window functions for that.  Although
something in the 1-5 minute range should be doable as well.

-kr, Gerard.




On Sun, Dec 14, 2014 at 8:25 PM, Jean-Pascal Billaud 
wrote:
>
> Ah! That sounds very much like what I need. A very basic question (most
> likely), why is "rdd.cache()" critical? Isn't it already true that in Spark
> Streaming DStream are cached in memory anyway?
>
> Also any experience with minutes long batch interval?
>
> Thanks for the quick answer!
>
> On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas 
> wrote:
>>
>> Hi Jean-Pascal,
>>
>> At Virdata we do a similar thing to 'bucketize' our data to different
>> keyspaces in Cassandra.
>>
>> The basic construction would be to filter the DStream (or the underlying
>> RDD) for each key and then apply the usual storage operations on that new
>> data set.
>> Given that, in your case, you need the data within the stream to apply
>> the filter, you will need first to collect those keys in order to create
>> the buckets.
>>
>> Something like this:
>>
>> val kafkaStream =  ???
>> kafkaStream.foreachRDD{rdd  =>
>> rdd.cache() // very important!
>> val keys = rdd.map(elem => key(elem)).distinct.collect  // where
>> key(...) is a function to get the desired key from each record
>> keys.foreach{ key =>
>> rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
>> }
>> rdd.unpersist()
>> }
>>
>> -kr, Gerard.
>>
>>
>>
>>
>> On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud 
>> wrote:
>>>
>>> Hey,
>>>
>>> I am doing an experiment with Spark Streaming consisting of moving data
>>> from Kafka to S3 locations while partitioning by date. I have already
>>> looked into Linked Camus and Pinterest Secor and while both are workable
>>> solutions, it just feels that Spark Streaming should be able to be on par
>>> with those without having to manage yet another application in our stack
>>> since we already have a Spark Streaming cluster in production.
>>>
>>> So what I am trying to do is very simple really. Each message in Kafka
>>> is thrift serialized, and the corresponding thrift objects have a timestamp
>>> field. What I'd like is to do is something like that:
>>>
>>> JavaPairDStream stream = KafkaUtils.createRawStream(...)
>>> stream = stream.map(new PairFunction, String, Log> {
>>>   public Tuple2 call(Tuple2 tuple) {
>>> return new Tuple2<>(tuple._2().getDate(), tuple._2());
>>>   }
>>> }
>>>
>>> At this point, I'd like to do some partitioning on the resulting DStream
>>> to have multiple DStream each with a single common string Date... So for
>>> instance in one DStream I would have all the entries from 12/01 and on
>>> another the entries from 12/02. Once I have this list of DStream, for each
>>> of them I would call saveAsObjectFiles() basically. I unfortunately did not
>>> find a way to demultiplex DStream based on a key. Obviously the reduce
>>> operation families does some of that but the result is still a single
>>> DStream.
>>>
>>> An alternative approach would be to call forEachRDD() on the DStream and
>>> demultiplex the entries into multiple new RDDs based on the timestamp to
>>> bucketize the entries with the same day date in the same RDD and finally
>>> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
>>> create those RDDs?
>>>
>>> Another thing that I am gonna be experimenting with is to use much
>>> longer batching interval. I am talking in minutes because I don't want to
>>> have bunch of tiny files. I might simply use a bigger Duration or use one
>>> of the window operation. Not sure if anybody tries running Spark Streaming
>>> in that way.
>>>
>>> Any thoughts on that would be much appreciated,
>>>
>>> Thanks!
>>>
>>


Re: Limit the # of columns in Spark Scala

2014-12-14 Thread Michael Armbrust
>
> BTW, I cannot use SparkSQL / case right now because my table has 200
> columns (and I'm on Scala 2.10.3)
>

You can still apply the schema programmatically:
http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema


Re: Trouble with cache() and parquet

2014-12-14 Thread Michael Armbrust
For many operations, Spark SQL will just pass the data through without
looking at it.  Caching, in contrast, has to process the data so that we
can build up compressed column buffers.  So the schema is mismatched in
both cases, but only the caching case shows it.

Based on the exception, it looks more like there is a type mismatch (the
metastore is reporting an Integer, but the parquet data is actually
producing a String).

On Thu, Dec 11, 2014 at 6:38 AM, Yana Kadiyska 
wrote:
>
> I see -- they are the same in design but  the difference comes from
> partitioned Hive tables: when the RDD is generated by querying an external
> Hive metastore, the partition is appended as part of the row, and shows up
> as part of the schema. Can you shed some light on why this is a problem:
>
> last2HourRdd.first <-- works ok
> last2HourRdd.cache()
>
> last2HourRdd.first <-- does not work
>
> ​
>
> The first call shows K+1 columns (and so does print schema, where K
> columns are from the backing parquet files and the K+1st is the partition
> inlined. My impression is that the second call to .first would just force
> the cache() call and dump out that RDD to disk (with all of it's K+1
> columns and store the schema info, again with K+1 columns), and then just
> return a single entry. I am not sure why the fact that Hive metastore
> exposes an extra column over the raw parquet file is a problem since it
> does so both on the schema and in the
> data: last2HourRdd.schema.fields.length reports K+1, and so does
>  last2HourRdd.first.length.
>
> I also tried
> calling sqlContext.applySchema(last2HourRdd,parquetFile.schema) before
> caching but it does not fix the issue. The only workaround I've come up
> with so far is to replace select * with a select . But I'd
> love to understand a little better why the cache call trips this scenario
>
>
>
> On Wed, Dec 10, 2014 at 3:50 PM, Michael Armbrust 
> wrote:
>
>> Have you checked to make sure the schema in the metastore matches the
>> schema in the parquet file?  One way to test would be to just use
>> sqlContext.parquetFile(...) which infers the schema from the file instead
>> of using the metastore.
>>
>> On Wed, Dec 10, 2014 at 12:46 PM, Yana Kadiyska 
>> wrote:
>>
>>>
>>> Hi folks, wondering if anyone has thoughts. Trying to create something
>>> akin to a materialized view (sqlContext is a HiveContext connected to
>>> external metastore):
>>>
>>>
>>> val last2HourRdd = sqlContext.sql(s"select * from mytable")
>>> //last2HourRdd.first prints out a  org.apache.spark.sql.Row = [...] with
>>> valid data
>>>
>>>  last2HourRdd.cache()
>>> //last2HourRdd.first now fails in an executor with the following:
>>>
>>> In the driver:
>>>
>>> 14/12/10 20:24:01 INFO TaskSetManager: Starting task 0.1 in stage 25.0 (TID 
>>> 35, iphere, NODE_LOCAL, 2170 bytes)
>>> 14/12/10 20:24:01 INFO TaskSetManager: Lost task 0.1 in stage 25.0 (TID 35) 
>>> on executor iphere: java.lang.ClassCastException (null) [duplicate 1]
>>>
>>> ​
>>>
>>>
>>> And in executor:
>>>
>>> 14/12/10 19:56:57 ERROR Executor: Exception in task 0.1 in stage 20.0 (TID 
>>> 27)
>>> java.lang.ClassCastException: java.lang.String cannot be cast to 
>>> java.lang.Integer
>>> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
>>> at 
>>> org.apache.spark.sql.catalyst.expressions.MutableInt.update(SpecificMutableRow.scala:73)
>>> at 
>>> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.update(SpecificMutableRow.scala:231)
>>> at 
>>> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(SpecificMutableRow.scala:236)
>>> at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:328)
>>> at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:310)
>>> at 
>>> org.apache.spark.sql.columnar.compression.RunLengthEncoding$Decoder.next(compressionSchemes.scala:168)
>>> at 
>>> org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.extractSingle(CompressibleColumnAccessor.scala:37)
>>> at 
>>> org.apache.spark.sql.columnar.NativeColumnAccessor.extractSingle(ColumnAccessor.scala:64)
>>> at 
>>> org.apache.spark.sql.columnar.BasicColumnAccessor.extractTo(ColumnAccessor.scala:54)
>>> at 
>>> org.apache.spark.sql.columnar.NativeColumnAccessor.org$apache$spark$sql$columnar$NullableColumnAccessor$$super$extractTo(ColumnAccessor.scala:64)
>>> at 
>>> org.apache.spark.sql.columnar.NullableColumnAccessor$class.extractTo(NullableColumnAccessor.scala:52)
>>> at 
>>> org.apache.spark.sql.columnar.NativeColumnAccessor.extractTo(ColumnAccessor.scala:64)
>>> at 
>>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
>>> at 
>>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:275)
>>> at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>>> at scala.collec

Re: Limit the # of columns in Spark Scala

2014-12-14 Thread Denny Lee
Yes - that works great! Sorry for implying I couldn't. Was just more
flummoxed that I couldn't make the Scala call work on its own. Will
continue to debug ;-)
On Sun, Dec 14, 2014 at 11:39 Michael Armbrust 
wrote:

> BTW, I cannot use SparkSQL / case right now because my table has 200
>> columns (and I'm on Scala 2.10.3)
>>
>
> You can still apply the schema programmatically:
> http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
>


spark kafka batch integration

2014-12-14 Thread Koert Kuipers
hello all,
we at tresata wrote a library to provide for batch integration between
spark and kafka (distributed write of rdd to kafa, distributed read of rdd
from kafka). our main use cases are (in lambda architecture jargon):
* period appends to the immutable master dataset on hdfs from kafka using
spark
* make non-streaming data available in kafka with periodic data drops from
hdfs using spark. this is to facilitate merging the speed and batch layer
in spark-streaming
* distributed writes from spark-streaming

see here:
https://github.com/tresata/spark-kafka

best,
koert


Re: Limit the # of columns in Spark Scala

2014-12-14 Thread Yana Kadiyska
Denny, I am not sure what exception you're observing but I've had luck with
2 things:

val table = sc.textFile("hdfs://")

You can try calling table.first here and you'll see the first line of the
file.
You can also do val debug = table.first.split("\t") which would give you an
array and you can indeed verify that the array contains what you want in
 positions 167,119 and 200. In the case of large files with a random bad
line I find wrapping the call within the map in try/catch very valuable --
you can dump out the whole line in the catch statement

Lastly I would guess that you're getting a compile error and not a runtime
error -- I believe c is an array of values so I think you want
tabs.map(c => (c(167), c(110), c(200)) instead of tabs.map(c => (c._(167),
c._(110), c._(200))



On Sun, Dec 14, 2014 at 3:12 PM, Denny Lee  wrote:
>
> Yes - that works great! Sorry for implying I couldn't. Was just more
> flummoxed that I couldn't make the Scala call work on its own. Will
> continue to debug ;-)
>
> On Sun, Dec 14, 2014 at 11:39 Michael Armbrust 
> wrote:
>
>> BTW, I cannot use SparkSQL / case right now because my table has 200
>>> columns (and I'm on Scala 2.10.3)
>>>
>>
>> You can still apply the schema programmatically:
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
>>
>


RE: MLlib vs Madlib

2014-12-14 Thread Venkat, Ankam
Thanks for the info Brian.

I am trying to compare performance difference between "Pivotal HAWQ/Greenplum 
with MADlib" vs "HDFS with MLlib".

Do you think Spark MLlib will perform better because of in-memory, caching and 
iterative processing capabilities?

I need to perform large scale text analytics and I can data store on HDFS or on 
Pivotal Greenplum/Hawq.

Regards,
Venkat Ankam

From: Brian Dolan [mailto:buddha_...@yahoo.com]
Sent: Sunday, December 14, 2014 10:02 AM
To: Venkat, Ankam
Cc: 'user@spark.apache.org'
Subject: Re: MLlib vs Madlib

MADLib (http://madlib.net/) was designed to bring large-scale ML techniques to 
a relational database, primarily postgresql.  MLlib assumes the data exists in 
some Spark-compatible data format.

I would suggest you pick the library that matches your data platform first.

DISCLAIMER: I am the original author of MADLib, though EMC/Pivotal assumed 
ownership rather quickly.


~~
May All Your Sequences Converge



On Dec 14, 2014, at 6:26 AM, "Venkat, Ankam" 
mailto:ankam.ven...@centurylink.com>> wrote:


Can somebody throw light on MLlib vs Madlib?

Which is better for machine learning? and are there any specific use case 
scenarios MLlib or Madlib will shine in?

Regards,
Venkat Ankam
This communication is the property of CenturyLink and may contain confidential 
or privileged information. Unauthorized use of this communication is strictly 
prohibited and may be unlawful. If you have received this communication in 
error, please immediately notify the sender by reply e-mail and destroy all 
copies of the communication and any attachments.

This communication is the property of CenturyLink and may contain confidential 
or privileged information. Unauthorized use of this communication is strictly 
prohibited and may be unlawful. If you have received this communication in 
error, please immediately notify the sender by reply e-mail and destroy all 
copies of the communication and any attachments.


Re: MLlib vs Madlib

2014-12-14 Thread Brian Dolan
I don't have any solid performance numbers, no.  Let's start with some questions

* Do you have to do any feature extraction before you start the routine? E.g. 
NLP, NER or tokenization? Have you already vectorized?
* Which routine(s) do you wish to use?  Things like k-means do very well in a 
relational setting, neural networks not as much.
* Where does the data live now?  How often will you have to re-load the data 
and re-run the pipeline?
* The ML portion is probably the most expensive portion of the pipeline, so it 
may justify moving it in/out of HDFS or Greenplum for just the ML.

For processing speed, my guess is Greenplum will be fastest, then Spark + HDFS, 
then Greenplum + HAWQ.

I've done quite a bit of scale text analysis, and process is typically

1. Source the data. Either in Solr or HDFS or a drive somewhere
2. Annotation / Feature Extraction (just get the bits you need from the data)
3. Create vectors from the data.  Tf/Idf is the most popular format.
4. Run the routine
5. Shout "Damn" when you realize you did it wrong.
6. Do 1-5 again. And again.
7. Create a report of some sort.
8. Visualize.

When asking about performance, most people focus on (4).  When focused on 
production, you need to consider the total cost of the pipeline.  So my basic 
recommendation is to do the whole thing on a small scale first.  If you end up 
with very "relational" questions, put everything in Greenplum.  If it all comes 
down to a query on a single table, use Spark RDD and maybe Spark SQL.

Just as an example, I've seen standard Postgres run extremely fast on Weighted 
Dictionaries.  This demands just two tables, the weighted dictionary and a 
table with your documents.   Though it's possible (and I've been foolish enough 
to do it), you don't want to spend the time embedding Stanford NLP into 
Postgres, the performance is awful.

Let me know how it goes!
b
https://twitter.com/buddha_314

~~
May All Your Sequences Converge



On Dec 14, 2014, at 4:07 PM, "Venkat, Ankam"  
wrote:

> Thanks for the info Brian.
>  
> I am trying to compare performance difference between “Pivotal HAWQ/Greenplum 
> with MADlib” vs “HDFS with MLlib”. 
>  
> Do you think Spark MLlib will perform better because of in-memory, caching 
> and iterative processing capabilities?   
>  
> I need to perform large scale text analytics and I can data store on HDFS or 
> on Pivotal Greenplum/Hawq. 
>  
> Regards,
> Venkat Ankam
>  
> From: Brian Dolan [mailto:buddha_...@yahoo.com] 
> Sent: Sunday, December 14, 2014 10:02 AM
> To: Venkat, Ankam
> Cc: 'user@spark.apache.org'
> Subject: Re: MLlib vs Madlib
>  
> MADLib (http://madlib.net/) was designed to bring large-scale ML techniques 
> to a relational database, primarily postgresql.  MLlib assumes the data 
> exists in some Spark-compatible data format.
>  
> I would suggest you pick the library that matches your data platform first.
>  
> DISCLAIMER: I am the original author of MADLib, though EMC/Pivotal assumed 
> ownership rather quickly.
>  
>  
> ~~
> May All Your Sequences Converge
>  
>  
>  
> On Dec 14, 2014, at 6:26 AM, "Venkat, Ankam"  
> wrote:
> 
> 
> Can somebody throw light on MLlib vs Madlib? 
>  
> Which is better for machine learning? and are there any specific use case 
> scenarios MLlib or Madlib will shine in?
>  
> Regards,
> Venkat Ankam
> This communication is the property of CenturyLink and may contain 
> confidential or privileged information. Unauthorized use of this 
> communication is strictly prohibited and may be unlawful. If you have 
> received this communication in error, please immediately notify the sender by 
> reply e-mail and destroy all copies of the communication and any attachments.
>  
> This communication is the property of CenturyLink and may contain 
> confidential or privileged information. Unauthorized use of this 
> communication is strictly prohibited and may be unlawful. If you have 
> received this communication in error, please immediately notify the sender by 
> reply e-mail and destroy all copies of the communication and any attachments.



Re: Running spark-submit from a remote machine using a YARN application

2014-12-14 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 12, 2014 at 7:01 AM, ryaminal  wrote:
>
> Now our solution is to make a very simply YARN application which execustes
> as its command "spark-submit --master yarn-cluster
> s3n://application/jar.jar
> ...". This seemed so simple and elegant, but it has some weird issues. We
> get "NoClassDefFoundErrors". When we ssh to the box, run the same
> spark-submit command it works, but doing this through YARN leads in the
> NoClassDefFoundErrors mentioned.
>

I do something similar, I start Spark using spark-submit from a non-Spark
server application. Make sure that HADOOP_CONF_DIR is set correctly when
running spark-submit from your program so that the YARN configuration can
be found correctly.

Also, keep in mind that some parameters to spark-submit have a different
behavior when using yarn-cluster vs. local[*] master. For example, system
properties set using `--conf` will be available in your Spark application
only in local[*] mode, for YARN you need to wrap them with `--conf
"spark.executor.extraJavaOptions=..."`.

Tobias


Re: Adding a column to a SchemaRDD

2014-12-14 Thread Tobias Pfeiffer
Nathan,

On Fri, Dec 12, 2014 at 3:11 PM, Nathan Kronenfeld <
nkronenf...@oculusinfo.com> wrote:
>
> I can see how to do it if can express the added values in SQL - just run
> "SELECT *,valueCalculation AS newColumnName FROM table"
>
> I've been searching all over for how to do this if my added value is a
> scala function, with no luck.
>
> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
> new column, D, calculated using Utility.process(b, c), and I want (of
> course) to pass in the value B and C from each row, ending up with a new
> SchemaRDD with columns A, B, C, and D.
> 
>

I guess you would have to do two things:
- schemardd.map(row => { extend the row here })
  which will give you a plain RDD[Row] without a schema
- take the schema from the schemardd and extend it manually by the name and
type of the newly added column,
- create a new SchemaRDD from your mapped RDD and the manually extended
schema.

Does that make sense?

Tobias


Run Spark job on Playframework + Spark Master/Worker in one Mac

2014-12-14 Thread Tomoya Igarashi
Hi all,

I am trying to run Spark job on Playframework + Spark Master/Worker in one
Mac.
When job ran, I encountered java.lang.ClassNotFoundException.
Would you teach me how to solve it?

Here is my code in Github.
https://github.com/TomoyaIgarashi/spark_cluster_sample

* Envrionments:
Mac 10.9.5
Java 1.7.0_71
Playframework 2.2.3
Spark 1.1.1

* Setup history:
> cd ~
> git clone g...@github.com:apache/spark.git
> cd spark
> git checkout -b v1.1.1 v1.1.1
> sbt/sbt assembly
> vi ~/.bashrc
export SPARK_HOME=/Users/tomoya/spark
> . ~/.bashrc
> hostname
Tomoya-Igarashis-MacBook-Air.local
> vi $SPARK_HOME/conf/slaves
Tomoya-Igarashis-MacBook-Air.local
> play new spark_cluster_sample
default name
type -> scala

* Run history:
> $SPARK_HOME/sbin/start-all.sh
> jps
> which play
/Users/tomoya/play/play
> git clone https://github.com/TomoyaIgarashi/spark_cluster_sample
> cd spark_cluster_sample
> play run

* Error trace:
Here is error trace in Gist.
https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511

Regards


Re: what is the best way to implement mini batches?

2014-12-14 Thread Earthson
I think it could be done like:

1. using mapPartition to randomly drop some partition
2. drop some elements randomly(for selected partition)
3. calculate gradient step for selected elements

I don't think fixed step is needed, but fixed step could be done:

1. zipWithIndex
2. create ShuffleRDD based on the index(eg. using index/10 as key)
3. using mapPartition to calculate each bach

I also have a question: 

Can mini batches run in parallel?
I think parallel all batches just like a full batch GD in some case. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming Python APIs?

2014-12-14 Thread Xiaoyong Zhu
Hi spark experts

Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in 
Spark Streaming programming guide..
http://spark.apache.org/docs/latest/streaming-programming-guide.html

Xiaoyong



RE: Spark Streaming Python APIs?

2014-12-14 Thread Shao, Saisai
AFAIK, this will be a new feature in version 1.2, you can check out the master 
branch or 1.2 branch to take a try.

Thanks
Jerry

From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com]
Sent: Monday, December 15, 2014 10:53 AM
To: user@spark.apache.org
Subject: Spark Streaming Python APIs?

Hi spark experts

Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in 
Spark Streaming programming guide..
http://spark.apache.org/docs/latest/streaming-programming-guide.html

Xiaoyong



RE: Spark Streaming Python APIs?

2014-12-14 Thread Xiaoyong Zhu
Cool thanks!

Xiaoyong

From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Monday, December 15, 2014 10:57 AM
To: Xiaoyong Zhu
Cc: user@spark.apache.org
Subject: RE: Spark Streaming Python APIs?

AFAIK, this will be a new feature in version 1.2, you can check out the master 
branch or 1.2 branch to take a try.

Thanks
Jerry

From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com]
Sent: Monday, December 15, 2014 10:53 AM
To: user@spark.apache.org
Subject: Spark Streaming Python APIs?

Hi spark experts

Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in 
Spark Streaming programming guide..
http://spark.apache.org/docs/latest/streaming-programming-guide.html

Xiaoyong



RE: Spark Streaming Python APIs?

2014-12-14 Thread Xiaoyong Zhu
Btw I have seen the python related docs in the 1.2 doc here:
http://people.apache.org/~pwendell/spark-1.2.0-rc2-docs/streaming-programming-guide.html

Xiaoyong

From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com]
Sent: Monday, December 15, 2014 10:58 AM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: RE: Spark Streaming Python APIs?

Cool thanks!

Xiaoyong

From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Monday, December 15, 2014 10:57 AM
To: Xiaoyong Zhu
Cc: user@spark.apache.org
Subject: RE: Spark Streaming Python APIs?

AFAIK, this will be a new feature in version 1.2, you can check out the master 
branch or 1.2 branch to take a try.

Thanks
Jerry

From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com]
Sent: Monday, December 15, 2014 10:53 AM
To: user@spark.apache.org
Subject: Spark Streaming Python APIs?

Hi spark experts

Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in 
Spark Streaming programming guide..
http://spark.apache.org/docs/latest/streaming-programming-guide.html

Xiaoyong



Re: ALS failure with size > Integer.MAX_VALUE

2014-12-14 Thread Bharath Ravi Kumar
Hi Xiangrui,

The block size limit was encountered even with reduced number of item
blocks as you had expected. I'm wondering if I could try the new
implementation as a standalone library against a 1.1 deployment. Does it
have dependencies on any core API's in the current master?

Thanks,
Bharath

On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar 
wrote:
>
> Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And
> yes, I've been following the JIRA for the new ALS implementation. I'll try
> it out when it's ready for testing. .
>
> On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng  wrote:
>
>> Hi Bharath,
>>
>> You can try setting a small item blocks in this case. 1200 is
>> definitely too large for ALS. Please try 30 or even smaller. I'm not
>> sure whether this could solve the problem because you have 100 items
>> connected with 10^8 users. There is a JIRA for this issue:
>>
>> https://issues.apache.org/jira/browse/SPARK-3735
>>
>> which I will try to implement in 1.3. I'll ping you when it is ready.
>>
>> Best,
>> Xiangrui
>>
>> On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar 
>> wrote:
>> > Yes, the issue appears to be due to the 2GB block size limitation. I am
>> > hence looking for (user, product) block sizing suggestions to work
>> around
>> > the block size limitation.
>> >
>> > On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen  wrote:
>> >>
>> >> (It won't be that, since you see that the error occur when reading a
>> >> block from disk. I think this is an instance of the 2GB block size
>> >> limitation.)
>> >>
>> >> On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya
>> >>  wrote:
>> >> > Hi Bharath – I’m unsure if this is your problem but the
>> >> > MatrixFactorizationModel in MLLIB which is the underlying component
>> for
>> >> > ALS
>> >> > expects your User/Product fields to be integers. Specifically, the
>> input
>> >> > to
>> >> > ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
>> >> > wondering if
>> >> > perhaps one of your identifiers exceeds MAX_INT, could you write a
>> quick
>> >> > check for that?
>> >> >
>> >> > I have been running a very similar use case to yours (with more
>> >> > constrained
>> >> > hardware resources) and I haven’t seen this exact problem but I’m
>> sure
>> >> > we’ve
>> >> > seen similar issues. Please let me know if you have other questions.
>> >> >
>> >> > From: Bharath Ravi Kumar 
>> >> > Date: Thursday, November 27, 2014 at 1:30 PM
>> >> > To: "user@spark.apache.org" 
>> >> > Subject: ALS failure with size > Integer.MAX_VALUE
>> >> >
>> >> > We're training a recommender with ALS in mllib 1.1 against a dataset
>> of
>> >> > 150M
>> >> > users and 4.5K items, with the total number of training records being
>> >> > 1.2
>> >> > Billion (~30GB data). The input data is spread across 1200
>> partitions on
>> >> > HDFS. For the training, rank=10, and we've configured {number of user
>> >> > data
>> >> > blocks = number of item data blocks}. The number of user/item blocks
>> was
>> >> > varied  between 50 to 1200. Irrespective of the block size (e.g. at
>> 1200
>> >> > blocks each), there are atleast a couple of tasks that end up shuffle
>> >> > reading > 9.7G each in the aggregate stage (ALS.scala:337) and
>> failing
>> >> > with
>> >> > the following exception:
>> >> >
>> >> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>> >> > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
>> >> > at
>> >> > org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
>> >> > at
>> >> > org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
>> >> > at
>> >> >
>> >> >
>> org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
>> >> > at
>> >> >
>> >> >
>> org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
>> >> >
>> >
>> >
>>
>
>


Re: Limit the # of columns in Spark Scala

2014-12-14 Thread Denny Lee
Oh, just figured it out:

tabs.map(c => Array(c(167), c(110), c(200))

Thanks for all of the advice, eh?!





On Sun Dec 14 2014 at 1:14:00 PM Yana Kadiyska 
wrote:

> Denny, I am not sure what exception you're observing but I've had luck
> with 2 things:
>
> val table = sc.textFile("hdfs://")
>
> You can try calling table.first here and you'll see the first line of the
> file.
> You can also do val debug = table.first.split("\t") which would give you
> an array and you can indeed verify that the array contains what you want in
>  positions 167,119 and 200. In the case of large files with a random bad
> line I find wrapping the call within the map in try/catch very valuable --
> you can dump out the whole line in the catch statement
>
> Lastly I would guess that you're getting a compile error and not a runtime
> error -- I believe c is an array of values so I think you want
> tabs.map(c => (c(167), c(110), c(200)) instead of tabs.map(c => (c._(167),
> c._(110), c._(200))
>
>
>
> On Sun, Dec 14, 2014 at 3:12 PM, Denny Lee  wrote:
>>
>> Yes - that works great! Sorry for implying I couldn't. Was just more
>> flummoxed that I couldn't make the Scala call work on its own. Will
>> continue to debug ;-)
>>
>> On Sun, Dec 14, 2014 at 11:39 Michael Armbrust 
>> wrote:
>>
>>> BTW, I cannot use SparkSQL / case right now because my table has 200
 columns (and I'm on Scala 2.10.3)

>>>
>>> You can still apply the schema programmatically:
>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
>>>
>>


Re: KafkaUtils explicit acks

2014-12-14 Thread Mukesh Jha
Thanks TD & Francois for the explanation & documentation. I'm curious if we
have any performance benchmark with & without WAL for spark-streaming-kafka.

Also In spark-streaming-kafka (as kafka provides a way to acknowledge logs)
on top of WAL can we modify KafkaUtils to acknowledge the offsets only when
the RRDs are fully processed and are getting evicted out of the Spark
memory thus we can be cent percent sure that all the records are getting
processed in the system.
I was thinking if it's good to have the kafka offset information of each
batch as part of RDDs metadata and commit the offsets once the RDDs lineage
is complete.

On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das 
wrote:
>
> I am updating the docs right now. Here is a staged copy that you can
> have sneak peek of. This will be part of the Spark 1.2.
>
>
> http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html
>
> The updated fault-tolerance section tries to simplify the explanation
> of when and what data can be lost, and how to prevent that using the
> new experimental feature of write ahead logs.
> Any feedback will be much appreciated.
>
> TD
>
> On Wed, Dec 10, 2014 at 2:42 AM,   wrote:
> > [sorry for the botched half-message]
> >
> > Hi Mukesh,
> >
> > There's been some great work on Spark Streaming reliability lately.
> > https://www.youtube.com/watch?v=jcJq3ZalXD8
> > Look at the links from:
> > https://issues.apache.org/jira/browse/SPARK-3129
> >
> > I'm not aware of any doc yet (did I miss something ?) but you can look at
> > the ReliableKafkaReceiver's test suite:
> >
> >
> external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
> >
> > --
> > FG
> >
> >
> > On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha 
> > wrote:
> >>
> >> Hello Guys,
> >>
> >> Any insights on this??
> >> If I'm not clear enough my question is how can I use kafka consumer and
> >> not loose any data in cases of failures with spark-streaming.
> >>
> >> On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha 
> >> wrote:
> >>>
> >>> Hello Experts,
> >>>
> >>> I'm working on a spark app which reads data from kafka & persists it in
> >>> hbase.
> >>>
> >>> Spark documentation states the below [1] that in case of worker failure
> >>> we can loose some data. If not how can I make my kafka stream more
> reliable?
> >>> I have seen there is a simple consumer [2] but I'm not sure if it has
> >>> been used/tested extensively.
> >>>
> >>> I was wondering if there is a way to explicitly acknowledge the kafka
> >>> offsets once they are replicated in memory of other worker nodes (if
> it's
> >>> not already done) to tackle this issue.
> >>>
> >>> Any help is appreciated in advance.
> >>>
> >>>
> >>> Using any input source that receives data through a network - For
> >>> network-based data sources like Kafka and Flume, the received input
> data is
> >>> replicated in memory between nodes of the cluster (default replication
> >>> factor is 2). So if a worker node fails, then the system can recompute
> the
> >>> lost from the the left over copy of the input data. However, if the
> worker
> >>> node where a network receiver was running fails, then a tiny bit of
> data may
> >>> be lost, that is, the data received by the system but not yet
> replicated to
> >>> other node(s). The receiver will be started on a different node and it
> will
> >>> continue to receive data.
> >>> https://github.com/dibbhatt/kafka-spark-consumer
> >>>
> >>> Txz,
> >>>
> >>> Mukesh Jha
> >>
> >>
> >>
> >>
> >> --
> >>
> >>
> >> Thanks & Regards,
> >>
> >> Mukesh Jha
> >
> >
>


-- 


Thanks & Regards,

*Mukesh Jha *


Q about Spark MLlib- Decision tree - scala.MatchError: 2.0 (of class java.lang.Double)

2014-12-14 Thread jake Lim
I am working some kind of Spark MLlib Test(Decision Tree) and I used IRIS
data from Cran-R package.
Original IRIS Data is not a good format for Spark MLlib. so I changed data
format(change data format and features's location)

When I ran sample Spark MLlib code for DT, I met the error like below
How can i solve this error?
==
14/12/15 14:27:30 ERROR TaskSetManager: Task 21.0:0 failed 4 times; aborting
job
14/12/15 14:27:30 INFO TaskSchedulerImpl: Cancelling stage 21
14/12/15 14:27:30 INFO DAGScheduler: Failed to run aggregate at
DecisionTree.scala:657
14/12/15 14:27:30 INFO TaskSchedulerImpl: Stage 21 was cancelled
14/12/15 14:27:30 WARN TaskSetManager: Loss was due to
org.apache.spark.TaskKilledException
org.apache.spark.TaskKilledException
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/12/15 14:27:30 INFO TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks
have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task
21.0:0 failed 4 times, most recent failure: Exception failure in TID 34 on
host krbda1anode01.kr.test.com: scala.MatchError: 2.0 (of class
java.lang.Double)
   
org.apache.spark.mllib.tree.DecisionTree$.classificationBinSeqOp$1(DecisionTree.scala:568)
   
org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$binSeqOp$1(DecisionTree.scala:623)
   
org.apache.spark.mllib.tree.DecisionTree$$anonfun$4.apply(DecisionTree.scala:657)
   
org.apache.spark.mllib.tree.DecisionTree$$anonfun$4.apply(DecisionTree.scala:657)
   
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
   
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
   
scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)
org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)
   
org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116)
   
org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
org.apache.spark.scheduler.Task.run(Task.scala:51)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.fork

RE: KafkaUtils explicit acks

2014-12-14 Thread Shao, Saisai
Hi,

It is not a trivial work to acknowledge the offsets when RDD is fully 
processed, I think from my understanding only modify the KafakUtils is not 
enough to meet your requirement, you need to add a metadata management stuff 
for each block/RDD, and track them both in executor-driver side, and many other 
things should also be taken care :).

Thanks
Jerry

From: mukh@gmail.com [mailto:mukh@gmail.com] On Behalf Of Mukesh Jha
Sent: Monday, December 15, 2014 1:31 PM
To: Tathagata Das
Cc: francois.garil...@typesafe.com; user@spark.apache.org
Subject: Re: KafkaUtils explicit acks

Thanks TD & Francois for the explanation & documentation. I'm curious if we 
have any performance benchmark with & without WAL for spark-streaming-kafka.

Also In spark-streaming-kafka (as kafka provides a way to acknowledge logs) on 
top of WAL can we modify KafkaUtils to acknowledge the offsets only when the 
RRDs are fully processed and are getting evicted out of the Spark memory thus 
we can be cent percent sure that all the records are getting processed in the 
system.
I was thinking if it's good to have the kafka offset information of each batch 
as part of RDDs metadata and commit the offsets once the RDDs lineage is 
complete.

On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das 
mailto:tathagata.das1...@gmail.com>> wrote:
I am updating the docs right now. Here is a staged copy that you can
have sneak peek of. This will be part of the Spark 1.2.

http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html

The updated fault-tolerance section tries to simplify the explanation
of when and what data can be lost, and how to prevent that using the
new experimental feature of write ahead logs.
Any feedback will be much appreciated.

TD

On Wed, Dec 10, 2014 at 2:42 AM,  
mailto:francois.garil...@typesafe.com>> wrote:
> [sorry for the botched half-message]
>
> Hi Mukesh,
>
> There's been some great work on Spark Streaming reliability lately.
> https://www.youtube.com/watch?v=jcJq3ZalXD8
> Look at the links from:
> https://issues.apache.org/jira/browse/SPARK-3129
>
> I'm not aware of any doc yet (did I miss something ?) but you can look at
> the ReliableKafkaReceiver's test suite:
>
> external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
>
> -
> FG
>
>
> On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha 
> mailto:me.mukesh@gmail.com>>
> wrote:
>>
>> Hello Guys,
>>
>> Any insights on this??
>> If I'm not clear enough my question is how can I use kafka consumer and
>> not loose any data in cases of failures with spark-streaming.
>>
>> On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha 
>> mailto:me.mukesh@gmail.com>>
>> wrote:
>>>
>>> Hello Experts,
>>>
>>> I'm working on a spark app which reads data from kafka & persists it in
>>> hbase.
>>>
>>> Spark documentation states the below [1] that in case of worker failure
>>> we can loose some data. If not how can I make my kafka stream more reliable?
>>> I have seen there is a simple consumer [2] but I'm not sure if it has
>>> been used/tested extensively.
>>>
>>> I was wondering if there is a way to explicitly acknowledge the kafka
>>> offsets once they are replicated in memory of other worker nodes (if it's
>>> not already done) to tackle this issue.
>>>
>>> Any help is appreciated in advance.
>>>
>>>
>>> Using any input source that receives data through a network - For
>>> network-based data sources like Kafka and Flume, the received input data is
>>> replicated in memory between nodes of the cluster (default replication
>>> factor is 2). So if a worker node fails, then the system can recompute the
>>> lost from the the left over copy of the input data. However, if the worker
>>> node where a network receiver was running fails, then a tiny bit of data may
>>> be lost, that is, the data received by the system but not yet replicated to
>>> other node(s). The receiver will be started on a different node and it will
>>> continue to receive data.
>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>
>>> Txz,
>>>
>>> Mukesh Jha
>>
>>
>>
>>
>> --
>>
>>
>> Thanks & Regards,
>>
>> Mukesh Jha
>
>


--


Thanks & Regards,

Mukesh Jha