Re: Spark 1.4 RDD to DF fails with toDF()

2015-09-08 Thread Gheorghe Postelnicu
Compiling from source with Scala 2.11 support fixed this issue. Thanks
again for the help!



On Tue, Sep 8, 2015 at 7:33 AM, Gheorghe Postelnicu <
gheorghe.posteln...@gmail.com> wrote:

> Good point. It is a pre-compiled Spark version. Based on the text on the
> downloads page, the answer to your question is no, so I will download the
> sources and recompile.
>
> Thanks!
>
> On Tue, Sep 8, 2015 at 5:17 AM, Koert Kuipers  wrote:
>
>> is /opt/spark-1.4.1-bin-hadoop2.6 a spark version compiled with scala
>> 2.11?
>>
>> On Mon, Sep 7, 2015 at 5:29 PM, Gheorghe Postelnicu <
>> gheorghe.posteln...@gmail.com> wrote:
>>
>>> sbt assembly; $SPARK_HOME/bin/spark-submit --class main.scala.TestMain
>>> --master "local[4]" target/scala-2.11/bof-assembly-0.1-SNAPSHOT.jar
>>>
>>> using Spark:
>>>
>>> /opt/spark-1.4.1-bin-hadoop2.6
>>>
>>> On Mon, Sep 7, 2015 at 10:20 PM, Jonathan Coveney 
>>> wrote:
>>>
 How are you building and running it?


 El lunes, 7 de septiembre de 2015, Gheorghe Postelnicu <
 gheorghe.posteln...@gmail.com> escribió:

> Interesting idea. Tried that, didn't work. Here is my new SBT file:
>
> name := """testMain"""
>
> scalaVersion := "2.11.6"
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.4.1" % "provided",
>   "org.apache.spark" %% "spark-sql" % "1.4.1" % "provided",
>   "org.scala-lang" % "scala-reflect" % "2.11.6"
> )
>
>
> On Mon, Sep 7, 2015 at 9:55 PM, Jonathan Coveney 
> wrote:
>
>> Try adding the following to your build.sbt
>>
>> libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.11.6"
>>
>>
>> I believe that spark shades the scala library, and this is a library 
>> that it looks like you need in an unshaded way.
>>
>>
>> 2015-09-07 16:48 GMT-04:00 Gheorghe Postelnicu <
>> gheorghe.posteln...@gmail.com>:
>>
>>> Hi,
>>>
>>> The following code fails when compiled from SBT:
>>>
>>> package main.scala
>>>
>>> import org.apache.spark.SparkContext
>>> import org.apache.spark.sql.SQLContext
>>>
>>> object TestMain {
>>>   def main(args: Array[String]): Unit = {
>>> implicit val sparkContext = new SparkContext()
>>> val sqlContext = new SQLContext(sparkContext)
>>> import sqlContext.implicits._
>>> sparkContext.parallelize(1 to 10).map(i => (i,
>>> i.toString)).toDF("intCol", "strCol")
>>>   }
>>> }
>>>
>>> with the following error:
>>>
>>> 15/09/07 21:39:21 INFO BlockManagerMaster: Registered BlockManager
>>> Exception in thread "main" java.lang.NoSuchMethodError:
>>> scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
>>> at main.scala.Bof$.main(Bof.scala:14)
>>> at main.scala.Bof.main(Bof.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> 15/09/07 21:39:22 INFO SparkContext: Invoking stop() from shutdown
>>> hook
>>>
>>> whereas the code above works in a spark shell.
>>>
>>> The code is compiled using Scala 2.11.6 and precompiled Spark 1.4.1
>>>
>>> Any suggestion on how to fix this would be much appreciated.
>>>
>>> Best,
>>> Gheorghe
>>>
>>>
>>
>
>>>
>>
>


Re: Partitions with zero records & variable task times

2015-09-08 Thread Akhil Das
Try using a custom partitioner for the keys so that they will get evenly
distributed across tasks

Thanks
Best Regards

On Fri, Sep 4, 2015 at 7:19 PM, mark  wrote:

> I am trying to tune a Spark job and have noticed some strange behavior -
> tasks in a stage vary in execution time, ranging from 2 seconds to 20
> seconds. I assume tasks should all run in roughly the same amount of time
> in a well tuned job.
>
> So I did some investigation - the fast tasks appear to have no records,
> whilst the slow tasks do. I need help understanding why this is happening.
>
> The code in the stage is pretty simple. All it does is:
>
> - filters records
> - maps records to a (key, record) tuple
> - reduces by key
>
> The data are Avro objects stored in Parquet files in 16MB blocks in HDFS.
>
> To establish how many records in each partition I added this snippet:
>
> val counts = rdd.mapPartitions(iter => {
>   val ctx = TaskContext.get
>   val stageId = ctx.stageId
>   val partId = ctx.partitionId
>   val attemptid = ctx.taskAttemptId()
> Array(Array(stageId, partId, attemptid, iter.size)).iterator }
>   , true).collect()
>
> Which produces the following:
>
> 1  1  0  0
> 1  2  1  50489
> 1  3  2  0
> 1  4  3  0
> 1  5  4  0
> 1  6  5  53200
> 1  7  6  0
> 1  8  7  0
> 1  9  8  0
> 1  10   9  56946
> 1  11   10   0
> 1  12   11   0
> 1  13   12   0
> 1  14   13   59209
> 1  15   14   0
> 1  16   15   0
> 1  17   16   0
> 1  18   17   50202
> 1  19   18   0
> 1  20   19   0
> 1  21   20   0
> 1  22   21   54613
> 1  23   22   0
> 1  24   23   0
> 1  25   24   54157
> 1  26   25   0
> 1  27   26   0
> 1  28   27   0
> 1  29   28   53595
> 1  30   29   0
> 1  31   30   0
> 1  32   31   10750
>
>
> Looking at the logs, you can see the tasks that contain records have the
> longest run time:
>
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0
> (TID 26) in 2782 ms on DG1322 (6/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID
> 8) in 2815 ms on DG1322 (7/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0
> (TID 20) in 2815 ms on DG1322 (8/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0
> (TID 24) in 2840 ms on DG1321 (9/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0
> (TID 30) in 2839 ms on DG1321 (10/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0
> (TID 12) in 2878 ms on DG1321 (11/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0
> (TID 31) in 2870 ms on DG1321 (12/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0
> (TID 19) in 2892 ms on DG1321 (13/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID
> 1) in 2930 ms on DG1321 (14/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID
> 7) in 2934 ms on DG1321 (15/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0
> (TID 13) in 2931 ms on DG1321 (16/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID
> 4) in 3246 ms on DG1323 (17/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0
> (TID 28) in 3226 ms on DG1323 (18/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage 1.0
> (TID 16) in 3249 ms on DG1323 (19/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 10.0 in stage 1.0
> (TID 11) in 3669 ms on DG1323 (20/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 16.0 in stage 1.0
> (TID 17) in 3666 ms on DG1323 (21/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 22.0 in stage 1.0
> (TID 23) in 3664 ms on DG1323 (22/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID
> 5) in 3692 ms on DG1323 (23/32)
> *15/09/03 16:26:39 INFO TaskSetManager: Finished task 31.0 in stage 1.0
> (TID 32) in 6668 ms on DG1322 (24/32)*
> *15/09/03 16:26:48 INFO TaskSetManager: Finished task 17.0

Re: Problems with Tungsten in Spark 1.5.0-rc2

2015-09-08 Thread Anders Arpteg
Ok, thanks Reynold. When I tested dynamic allocation with Spark 1.4, it
complained saying that it was not tungsten compliant. Lets hope it works
with 1.5 then!

On Tue, Sep 8, 2015 at 5:49 AM Reynold Xin  wrote:

>
> On Wed, Sep 2, 2015 at 12:03 AM, Anders Arpteg  wrote:
>
>>
>> BTW, is it possible (or will it be) to use Tungsten with dynamic
>> allocation and the external shuffle manager?
>>
>>
> Yes - I think this already works. There isn't anything specific here
> related to Tungsten.
>
>


Can not allocate executor when running spark on mesos

2015-09-08 Thread canan chen
Hi all,

I try to run spark on mesos, but it looks like I can not allocate resources
from mesos. I am not expert of mesos, but from the mesos log, it seems
spark always decline the offer from mesos. Not sure what's wrong, maybe
need some configuration change. Here's the mesos master log

I0908 15:08:16.515960 301916160 master.cpp:1767] Received registration
request for framework 'Spark shell' at
scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
I0908 15:08:16.520545 301916160 master.cpp:1834] Registering framework
20150908-143320-16777343-5050-41965- (Spark shell) at
scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133 with
checkpointing disabled and capabilities [  ]
I0908 15:08:16.522307 300843008 hierarchical.hpp:386] Added framework
20150908-143320-16777343-5050-41965-
I0908 15:08:16.525845 301379584 master.cpp:4290] Sending 1 offers to
framework 20150908-143320-16777343-5050-41965- (Spark shell) at
scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
I0908 15:08:16.637677 302452736 master.cpp:2884] Processing DECLINE call
for offers: [ 20150908-143320-16777343-5050-41965-O0 ] for framework
20150908-143320-16777343-5050-41965- (Spark shell) at
scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
I0908 15:08:16.639197 299233280 hierarchical.hpp:761] Recovered cpus(*):8;
mem(*):15360; disk(*):470842; ports(*):[31000-32000] (total: cpus(*):8;
mem(*):15360; disk(*):470842; ports(*):[31000-32000], allocated: ) on slave
20150908-143320-16777343-5050-41965-S0 from framework
20150908-143320-16777343-5050-41965-
I0908 15:08:21.786932 300306432 master.cpp:4290] Sending 1 offers to
framework 20150908-143320-16777343-5050-41965- (Spark shell) at
scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
I0908 15:08:21.789979 298696704 master.cpp:2884] Processing DECLINE call
for offers: [ 20150908-143320-16777343-5050-41965-O1 ] for framework
20150908-143320-16777343-5050-41965- (Spark shell) at
scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133


Re: Spark SQL - UDF for scoring a model - take $"*"

2015-09-08 Thread Night Wolf
Not sure how that would work. Really I want to tack on an extra column onto
the DF with a UDF that can take a Row object.

On Tue, Sep 8, 2015 at 1:54 AM, Jörn Franke  wrote:

> Can you use a map or list with different properties as one parameter?
> Alternatively a string where parameters are Comma-separated...
>
> Le lun. 7 sept. 2015 à 8:35, Night Wolf  a écrit :
>
>> Is it possible to have a UDF which takes a variable number of arguments?
>>
>> e.g. df.select(myUdf($"*")) fails with
>>
>> org.apache.spark.sql.AnalysisException: unresolved operator 'Project
>> [scalaUDF(*) AS scalaUDF(*)#26];
>>
>> What I would like to do is pass in a generic data frame which can be then
>> passed to a UDF which does scoring of a model. The UDF needs to know the
>> schema to map column names in the model to columns in the DataFrame.
>>
>> The model has 100s of factors (very wide), so I can't just have a scoring
>> UDF that has 500 parameters (for obvious reasons).
>>
>> Cheers,
>> ~N
>>
>


Re: Spark SQL - UDF for scoring a model - take $"*"

2015-09-08 Thread Night Wolf
So basically I need something like

df.withColumn("score", new Column(new Expression {
 ...

def eval(input: Row = null): EvaluatedType = myModel.score(input)
...

}))

But I can't do this, so how can I make a UDF or something like it, that can
take in a Row and pass back a double value or some struct...

On Tue, Sep 8, 2015 at 5:33 PM, Night Wolf  wrote:

> Not sure how that would work. Really I want to tack on an extra column
> onto the DF with a UDF that can take a Row object.
>
> On Tue, Sep 8, 2015 at 1:54 AM, Jörn Franke  wrote:
>
>> Can you use a map or list with different properties as one parameter?
>> Alternatively a string where parameters are Comma-separated...
>>
>> Le lun. 7 sept. 2015 à 8:35, Night Wolf  a
>> écrit :
>>
>>> Is it possible to have a UDF which takes a variable number of arguments?
>>>
>>> e.g. df.select(myUdf($"*")) fails with
>>>
>>> org.apache.spark.sql.AnalysisException: unresolved operator 'Project
>>> [scalaUDF(*) AS scalaUDF(*)#26];
>>>
>>> What I would like to do is pass in a generic data frame which can be
>>> then passed to a UDF which does scoring of a model. The UDF needs to know
>>> the schema to map column names in the model to columns in the DataFrame.
>>>
>>> The model has 100s of factors (very wide), so I can't just have a
>>> scoring UDF that has 500 parameters (for obvious reasons).
>>>
>>> Cheers,
>>> ~N
>>>
>>
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
as a starting point, attach your stacktrace...
ps: look for duplicates in your classpath, maybe you include another jar
with same class

On 8 September 2015 at 06:38, Nicholas R. Peterson 
wrote:

> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through Yarn.
> Serialization is set to use Kryo.
>
> I have a large object which I send to the executors as a Broadcast. The
> object seems to serialize just fine. When it attempts to deserialize,
> though, Kryo throws a ClassNotFoundException... for a class that I include
> in the fat jar that I spark-submit.
>
> What could be causing this classpath issue with Kryo on the executors?
> Where should I even start looking to try to diagnose the problem? I
> appreciate any help you can provide.
>
> Thank you!
>
> -- Nick
>


Re: Spark SQL - UDF for scoring a model - take $"*"

2015-09-08 Thread Night Wolf
Sorry for the spam - I had some success;

case class ScoringDF(function: Row => Double) extends Expression {
  val dataType = DataTypes.DoubleType

  override type EvaluatedType = Double

  override def eval(input: Row): EvaluatedType = {
function(input)
  }

  override def nullable: Boolean = false

  override def children: Seq[Expression] = Nil
}

But this falls over if I want to return an Array[Double];

case class ScoringDF(function: Row => Array[Double]) extends Expression {
  val dataType = DataTypes.createArrayType(DataTypes.DoubleType)

  override type EvaluatedType = Array[Double]

  override def eval(input: Row): EvaluatedType = {
function(input)
  }

  override def nullable: Boolean = false

  override def children: Seq[Expression] = Nil
}


 get the following exception;

scala> dfs.show
java.lang.ClassCastException: [D cannot be cast to scala.collection.Seq
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$2.apply(CatalystTypeConverters.scala:282)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$4.apply(CatalystTypeConverters.scala:301)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$2.apply(SparkPlan.scala:150)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$2.apply(SparkPlan.scala:150)

Any ideas?

On Tue, Sep 8, 2015 at 5:47 PM, Night Wolf  wrote:

> So basically I need something like
>
> df.withColumn("score", new Column(new Expression {
>  ...
>
> def eval(input: Row = null): EvaluatedType = myModel.score(input)
> ...
>
> }))
>
> But I can't do this, so how can I make a UDF or something like it, that
> can take in a Row and pass back a double value or some struct...
>
> On Tue, Sep 8, 2015 at 5:33 PM, Night Wolf  wrote:
>
>> Not sure how that would work. Really I want to tack on an extra column
>> onto the DF with a UDF that can take a Row object.
>>
>> On Tue, Sep 8, 2015 at 1:54 AM, Jörn Franke  wrote:
>>
>>> Can you use a map or list with different properties as one parameter?
>>> Alternatively a string where parameters are Comma-separated...
>>>
>>> Le lun. 7 sept. 2015 à 8:35, Night Wolf  a
>>> écrit :
>>>
 Is it possible to have a UDF which takes a variable number of arguments?

 e.g. df.select(myUdf($"*")) fails with

 org.apache.spark.sql.AnalysisException: unresolved operator 'Project
 [scalaUDF(*) AS scalaUDF(*)#26];

 What I would like to do is pass in a generic data frame which can be
 then passed to a UDF which does scoring of a model. The UDF needs to know
 the schema to map column names in the model to columns in the DataFrame.

 The model has 100s of factors (very wide), so I can't just have a
 scoring UDF that has 500 parameters (for obvious reasons).

 Cheers,
 ~N

>>>
>>
>


Re: Spark SQL - UDF for scoring a model - take $"*"

2015-09-08 Thread Night Wolf
Haha ok, its one of those days, Array isn't valid. RTFM and it says
Catalyst array maps to a Scala Seq, that makes sense.

So it works! Two follow up questions;

1 - Is this the best approach?
2 - what if I want my expression to return multiple rows? - my binary
classification model gives me a array double with 3 fields, the prediction,
the class A probability and the class B probability. How could I make those
like 3 columns from my expression? Clearly .withColumn only expects 1
column back.

On Tue, Sep 8, 2015 at 6:21 PM, Night Wolf  wrote:

> Sorry for the spam - I had some success;
>
> case class ScoringDF(function: Row => Double) extends Expression {
>   val dataType = DataTypes.DoubleType
>
>   override type EvaluatedType = Double
>
>   override def eval(input: Row): EvaluatedType = {
> function(input)
>   }
>
>   override def nullable: Boolean = false
>
>   override def children: Seq[Expression] = Nil
> }
>
> But this falls over if I want to return an Array[Double];
>
> case class ScoringDF(function: Row => Array[Double]) extends Expression {
>   val dataType = DataTypes.createArrayType(DataTypes.DoubleType)
>
>   override type EvaluatedType = Array[Double]
>
>   override def eval(input: Row): EvaluatedType = {
> function(input)
>   }
>
>   override def nullable: Boolean = false
>
>   override def children: Seq[Expression] = Nil
> }
>
>
>  get the following exception;
>
> scala> dfs.show
> java.lang.ClassCastException: [D cannot be cast to scala.collection.Seq
> at
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$2.apply(CatalystTypeConverters.scala:282)
> at
> org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
> at
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$4.apply(CatalystTypeConverters.scala:301)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$2.apply(SparkPlan.scala:150)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$2.apply(SparkPlan.scala:150)
>
> Any ideas?
>
> On Tue, Sep 8, 2015 at 5:47 PM, Night Wolf  wrote:
>
>> So basically I need something like
>>
>> df.withColumn("score", new Column(new Expression {
>>  ...
>>
>> def eval(input: Row = null): EvaluatedType = myModel.score(input)
>> ...
>>
>> }))
>>
>> But I can't do this, so how can I make a UDF or something like it, that
>> can take in a Row and pass back a double value or some struct...
>>
>> On Tue, Sep 8, 2015 at 5:33 PM, Night Wolf 
>> wrote:
>>
>>> Not sure how that would work. Really I want to tack on an extra column
>>> onto the DF with a UDF that can take a Row object.
>>>
>>> On Tue, Sep 8, 2015 at 1:54 AM, Jörn Franke 
>>> wrote:
>>>
 Can you use a map or list with different properties as one parameter?
 Alternatively a string where parameters are Comma-separated...

 Le lun. 7 sept. 2015 à 8:35, Night Wolf  a
 écrit :

> Is it possible to have a UDF which takes a variable number of
> arguments?
>
> e.g. df.select(myUdf($"*")) fails with
>
> org.apache.spark.sql.AnalysisException: unresolved operator 'Project
> [scalaUDF(*) AS scalaUDF(*)#26];
>
> What I would like to do is pass in a generic data frame which can be
> then passed to a UDF which does scoring of a model. The UDF needs to know
> the schema to map column names in the model to columns in the DataFrame.
>
> The model has 100s of factors (very wide), so I can't just have a
> scoring UDF that has 500 parameters (for obvious reasons).
>
> Cheers,
> ~N
>

>>>
>>
>


about mr-style merge sort

2015-09-08 Thread 周千昊
Hi, community
 I have an application which I try to migrate from MR to Spark.
 It will do some calculations from Hive and output to hfile which will
be bulk load to HBase Table, details as follow:

 Rdd input = getSourceInputFromHive()
 Rdd> mapSideResult =
input.glom().mapPartitions(/*some calculation*/)
 // PS: the result in each partition has already been sorted according
to the lexicographical order during the calculation
 mapSideResult.reduceByKey(/*some
aggregations*/).sortByKey(/**/).map(/*transform Tuple2 to
Tuple2*/).saveAsNewAPIHadoopFile(/*write
to hfile*/)

  *Here is the problem, as in MR, in the reducer side, the mapper
output has already been sorted, so that it is a merge sort which makes
writing to hfile is sequential and fast.*
*  However in Spark, the output of reduceByKey phase has been shuffled,
so I have to sort the rdd in order to write hfile which makes it slower 2x
running on Spark than on MR.*
*  I am wondering that, if there is anything I can leverage has the
same effect as MR. I happen to see a JIRA
ticket https://issues.apache.org/jira/browse/SPARK-2926
. Is it related to what I
am looking for?*


Applying transformations on a JavaRDD using reflection

2015-09-08 Thread Nirmal Fernando
Hi All,

I'd like to apply a chain of Spark transformations (map/filter) on a given
JavaRDD. I'll have the set of Spark transformations as Function, and
even though I can determine the classes of T and A at the runtime, due to
the type erasure, I cannot call JavaRDD's transformations as they expect
generics. Any idea on how to resolve this?

-- 

Thanks & regards,
Nirmal

Team Lead - WSO2 Machine Learner
Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733
Blog: http://nirmalfdo.blogspot.com/


Re: buildSupportsSnappy exception when reading the snappy file in Spark

2015-09-08 Thread Akhil Das
Looks like you are having different versions of snappy library. Here's a
similar discussion if you haven't seen it already
http://stackoverflow.com/questions/22150417/hadoop-mapreduce-java-lang-unsatisfiedlinkerror-org-apache-hadoop-util-nativec

Thanks
Best Regards

On Mon, Sep 7, 2015 at 7:41 AM, dong.yajun  wrote:

> hi all,
>
> I met problem that can't read the file with snappy encoding from HDFS in
> Spark1.4.1,
>
> I have configured the SPARK_LIBRARY_PATH property in conf/spark-env.sh to
> the native path of Hadoop and restarted the spark cluster
>
> SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/app/install/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/lib/native
>
>
> the partial exception:
>
> Caused by: org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem 
> reading HFile Trailer from file
> hdfs://nameservice1/hbase/data/default/IM_ItemBase/02296539242087aea77877dced9ba3d5/BaseInfo/9fe36f74334c4d30ba1bfc17bbd717f5
>
>  at
> org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:478)
>
>  at
> org.apache.hadoop.hbase.io.hfile.HFile.createReader(HFile.java:521)
>
>  at
> com.newegg.ec.bigdata.dump.CombineHFileRecordReader.(CombineHFileRecordReader.java:33)
>
>  ... 19 more
>
> Caused by: java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>
>  at
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
>
>  at
> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>
>  at
> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:192)
>
>  at
> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:176)
>
>  at
> org.apache.hadoop.hbase.io.compress.Compression$Algorithm.getDecompressor(Compression.java:328)
>
>  at
> org.apache.hadoop.hbase.io.compress.Compression.decompress(Compression.java:423)
>
>  at
> org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext.prepareDecoding(HFileBlockDefaultDecodingContext.java:90)
>
> --
> *Ric Dong*
>
>


Re: Exception when restoring spark streaming with batch RDD from checkpoint.

2015-09-08 Thread Akhil Das
Try to add a filter to remove/replace the null elements within/before the
map operation.

Thanks
Best Regards

On Mon, Sep 7, 2015 at 3:34 PM, ZhengHanbin  wrote:

> Hi,
>
> I am using spark streaming to join every RDD of a DStream to a stand alone
> RDD to generate a new DStream as followed:
>
> *def joinWithBatchEvent(contentFeature: RDD[(String, String)],*
> *   batchEvent: DStream[((String, String), (Long,
> Double, Double))]) = {*
> *  batchEvent.map(event => {*
> *(event._1._2, (event._1._1, event._2._1, event._2._2, event._2._3))*
> *  }).transform(eventRDD => {*
> *eventRDD.leftOuterJoin(contentFeature).map(result =>*
> *  (result._2._1._1, (result._1, result._2._1._2, result._2._1._3,
> result._2._1._4, result._2._2))*
> *)*
> *  })*
> *}*
>
> It works well when it start from a new StreamContext.
> But if the StreamContext is restored from checkpoint, there will be an
> exception as followed and the Graph can not be setup.
> Do you know how to solve this problem? Thanks very much!
>
> 5/09/07 14:07:18 INFO spark.SparkContext: Starting job: saveAsTextFiles at
> CFBModel.scala:49
> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 12
> (repartition at EventComponent.scala:64)
> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 17 (flatMap
> at CFBModel.scala:25)
> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 20 (map at
> ContentFeature.scala:100)
> 15/09/07 14:07:18 WARN scheduler.DAGScheduler: Creating new stage failed
> due to exception - job: 1
> java.lang.IllegalArgumentException: Flat hash tables cannot contain null
> elements.
> at
> scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
> at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
> at
> scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
> at
> scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
> at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
> at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
> at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
> at
> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
> at
> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
> at
> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
> at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Job 1 failed:
> saveAsTextFiles at CFBModel.scala:49, took 0.016406 s
> 15/09/07 14:07:18 ERROR scheduler.JobScheduler: Error running job
> streaming job 144160590 ms.0
> java.lang.IllegalArgumentException: Flat hash tables cannot contain null
> elements.
> at
> scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
> at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
> at
> scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
> at
> scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
> at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
> at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
> at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
> at
> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
> at
> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
> at
> org.apache.spark.scheduler.DAGScheduler$$an

Re: Can not allocate executor when running spark on mesos

2015-09-08 Thread Akhil Das
In which mode are you submitting your application? (coarse-grained or
fine-grained(default)). Have you gone through this documentation already?
http://spark.apache.org/docs/latest/running-on-mesos.html#using-a-mesos-master-url

Thanks
Best Regards

On Tue, Sep 8, 2015 at 12:54 PM, canan chen  wrote:

> Hi all,
>
> I try to run spark on mesos, but it looks like I can not allocate
> resources from mesos. I am not expert of mesos, but from the mesos log, it
> seems spark always decline the offer from mesos. Not sure what's wrong,
> maybe need some configuration change. Here's the mesos master log
>
> I0908 15:08:16.515960 301916160 master.cpp:1767] Received registration
> request for framework 'Spark shell' at
> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
> I0908 15:08:16.520545 301916160 master.cpp:1834] Registering framework
> 20150908-143320-16777343-5050-41965- (Spark shell) at
> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133 with
> checkpointing disabled and capabilities [  ]
> I0908 15:08:16.522307 300843008 hierarchical.hpp:386] Added framework
> 20150908-143320-16777343-5050-41965-
> I0908 15:08:16.525845 301379584 master.cpp:4290] Sending 1 offers to
> framework 20150908-143320-16777343-5050-41965- (Spark shell) at
> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
> I0908 15:08:16.637677 302452736 master.cpp:2884] Processing DECLINE call
> for offers: [ 20150908-143320-16777343-5050-41965-O0 ] for framework
> 20150908-143320-16777343-5050-41965- (Spark shell) at
> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
> I0908 15:08:16.639197 299233280 hierarchical.hpp:761] Recovered cpus(*):8;
> mem(*):15360; disk(*):470842; ports(*):[31000-32000] (total: cpus(*):8;
> mem(*):15360; disk(*):470842; ports(*):[31000-32000], allocated: ) on slave
> 20150908-143320-16777343-5050-41965-S0 from framework
> 20150908-143320-16777343-5050-41965-
> I0908 15:08:21.786932 300306432 master.cpp:4290] Sending 1 offers to
> framework 20150908-143320-16777343-5050-41965- (Spark shell) at
> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
> I0908 15:08:21.789979 298696704 master.cpp:2884] Processing DECLINE call
> for offers: [ 20150908-143320-16777343-5050-41965-O1 ] for framework
> 20150908-143320-16777343-5050-41965- (Spark shell) at
> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>


Re: Parquet Array Support Broken?

2015-09-08 Thread Cheng Lian
Yeah, this is a typical Parquet interoperability issue due to 
unfortunate historical reasons. Hive (actually parquet-hive) gives the 
following schema for array:


message m0 {
  optional group f (LIST) {
repeated group bag {
  optional int32 array_element;
}
}
}

while Spark SQL gives

message m1 {
  optional group f (LIST) {
repeated group bag {
  optional int32 array;
}
  }
}

So Spark 1.4 couldn't find the expected field "array" in the target 
Parquet file.  As Ruslan suggested, Spark 1.5 addresses this issue 
properly and is able to read Parquet files generated by most, if not 
all, Parquet data models out there.


You may find more details about Parquet interoperability in this post if 
you are interested 
https://www.mail-archive.com/user@spark.apache.org/msg35663.html


Cheng

On 9/8/15 6:19 AM, Alex Kozlov wrote:

Thank you - it works if the file is created in Spark

On Mon, Sep 7, 2015 at 3:06 PM, Ruslan Dautkhanov 
mailto:dautkha...@gmail.com>> wrote:


Read response from Cheng Lian mailto:lian.cs@gmail.com>> on Aug/27th - it looks the same
problem.

Workarounds
1. write that parquet file in Spark;
2. upgrade to Spark 1.5.

--
Ruslan Dautkhanov

On Mon, Sep 7, 2015 at 3:52 PM, Alex Kozlov mailto:ale...@gmail.com>> wrote:

No, it was created in Hive by CTAS, but any help is
appreciated...

On Mon, Sep 7, 2015 at 2:51 PM, Ruslan Dautkhanov
mailto:dautkha...@gmail.com>> wrote:

That parquet table wasn't created in Spark, is it?

There was a recent discussion on this list that complex
data types in Spark prior to 1.5 often incompatible with
Hive for example, if I remember correctly.

On Mon, Sep 7, 2015, 2:57 PM Alex Kozlov mailto:ale...@gmail.com>> wrote:

I am trying to read an (array typed) parquet file in
spark-shell (Spark 1.4.1 with Hadoop 2.6):

{code}
$ bin/spark-shell
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig
for more info.
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/09/07 13:45:22 INFO SecurityManager: Changing view
acls to: hivedata
15/09/07 13:45:22 INFO SecurityManager: Changing
modify acls to: hivedata
15/09/07 13:45:22 INFO SecurityManager:
SecurityManager: authentication disabled; ui acls
disabled; users with view permissions: Set(hivedata);
users with modify permissions: Set(hivedata)
15/09/07 13:45:23 INFO HttpServer: Starting HTTP Server
15/09/07 13:45:23 INFO Utils: Successfully started
service 'HTTP class server' on port 43731.
Welcome to
  __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\ version 1.4.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit
Server VM, Java 1.8.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/09/07 13:45:26 INFO SparkContext: Running Spark
version 1.4.1
15/09/07 13:45:26 INFO SecurityManager: Changing view
acls to: hivedata
15/09/07 13:45:26 INFO SecurityManager: Changing
modify acls to: hivedata
15/09/07 13:45:26 INFO SecurityManager:
SecurityManager: authentication disabled; ui acls
disabled; users with view permissions: Set(hivedata);
users with modify permissions: Set(hivedata)
15/09/07 13:45:27 INFO Slf4jLogger: Slf4jLogger started
15/09/07 13:45:27 INFO Remoting: Starting remoting
15/09/07 13:45:27 INFO Remoting: Remoting started;
listening on addresses
:[akka.tcp://sparkDriver@10.10.30.52:46083
]
15/09/07 13:45:27 INFO Utils: Successfully started
service 'sparkDriver' on port 46083.
15/09/07 13:45:27 INFO SparkEnv: Registering
MapOutputTracker
15/09/07 13:45:27 INFO SparkEnv: Registering
BlockManagerMaster
15/09/07 13:45:27 INFO DiskBlockManager: Created local
directory at

/tmp/spark-f313315a-0769-4057-835d-196cfe140a26/blockm

Re: Split content into multiple Parquet files

2015-09-08 Thread Cheng Lian

In Spark 1.4 and 1.5, you can do something like this:

df.write.partitionBy("key").parquet("/datasink/output-parquets")

BTW, I'm curious about how did you do it without partitionBy using 
saveAsHadoopFile?


Cheng

On 9/8/15 2:34 PM, Adrien Mogenet wrote:

Hi there,

We've spent several hours to split our input data into several parquet 
files (or several folders, i.e. 
/datasink/output-parquets//foobar.parquet), based on a 
low-cardinality key. This works very well with a when using 
saveAsHadoopFile, but we can't achieve a similar thing with Parquet files.


The only working solution so far is to persist the RDD and then loop 
over it N times to write N files. That does not look acceptable...


Do you guys have any suggestion to do such an operation?

--

*Adrien Mogenet*
Head of Backend/Infrastructure
adrien.moge...@contentsquare.com 
(+33)6.59.16.64.22
http://www.contentsquare.com 
50, avenue Montaigne - 75008 Paris




Re: Sending yarn application logs to web socket

2015-09-08 Thread Jeetendra Gangele
1.in order to change log4j.properties at the name node, u can change
/home/hadoop/log4j.properties.

2.in order to change log4j.properties for the container logs, u need to
change it at the yarn containers jar, since they hard-coded loading the
file directly from project resources.

2.1 ssh to the slave (on EMR u can also simply add this as bootstrap
action, so u dont need to ssh to each of the nodes).

2.2 override the container-log4j.properties at the jar resources:

jar uf
/home/hadoop/share/hadoop/yarn/hadoop-yarn-server-nodemanager-2.2.0.jar
*container-log4j.properties*

On 8 September 2015 at 05:47, Yana Kadiyska  wrote:

> Hopefully someone will give you a more direct answer but whenever I'm
> having issues with log4j I always try -Dlog4j.debug=true.This will tell
> you which log4j settings are getting picked up from where. I've spent
> countless hours due to typos in the file, for example.
>
> On Mon, Sep 7, 2015 at 11:47 AM, Jeetendra Gangele 
> wrote:
>
>> I also tried placing my costomized log4j.properties file under
>> src/main/resources still no luck.
>>
>> won't above step modify the default YARN and spark  log4j.properties  ?
>>
>> anyhow its still taking log4j.properties from YARn.
>>
>>
>>
>> On 7 September 2015 at 19:25, Jeetendra Gangele 
>> wrote:
>>
>>> anybody here to help?
>>>
>>>
>>>
>>> On 7 September 2015 at 17:53, Jeetendra Gangele 
>>> wrote:
>>>
 Hi All I have been trying to send my application related logs to socket
 so that we can write log stash and check the application logs.

 here is my log4j.property file

 main.logger=RFA,SA

 log4j.appender.SA=org.apache.log4j.net.SocketAppender
 log4j.appender.SA.Port=4560
 log4j.appender.SA.RemoteHost=hadoop07.housing.com
 log4j.appender.SA.ReconnectionDelay=1
 log4j.appender.SA.Application=NM-${user.dir}
 # Ignore messages below warning level from Jetty, because it's a bit
 verbose
 log4j.logger.org.spark-project.jetty=WARN
 log4j.logger.org.apache.hadoop=WARN


 I am launching my spark job using below common on YARN-cluster mode

 *spark-submit --name data-ingestion --master yarn-cluster --conf
 spark.custom.configuration.file=hdfs://10.1.6.186/configuration/binning-dev.conf
  --files
 /usr/hdp/current/spark-client/Runnable/conf/log4j.properties --conf
 "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties"
 --conf
 "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties"
 --class com.housing.spark.streaming.Binning
 /usr/hdp/current/spark-client/Runnable/dsl-data-ingestion-all.jar*


 *Can anybody please guide me why i am not getting the logs the socket?*


 *I followed many pages listing below without success*

 http://tech-stories.com/2015/02/12/setting-up-a-central-logging-infrastructure-for-hadoop-and-spark/#comment-208

 http://stackoverflow.com/questions/22918720/custom-log4j-appender-in-hadoop-2

 http://stackoverflow.com/questions/9081625/override-log4j-properties-in-hadoop


>>>
>>
>>
>>
>>
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Nicholas R. Peterson
Thans, Igor; I've got it running again right now, and can attach the stack
trace when it finishes.

In the mean time, I've noticed something interesting: in the Spark UI, the
application jar that I submit is not being included on the classpath.  It
has been successfully uploaded to the nodes -- in the nodemanager directory
for the application, I see __app__.jar and __spark__.jar.  The directory
itself is on the classpath, and __spark__.jar and __hadoop_conf__ are as
well.  When I do everything the same but switch the master to local[*], the
jar I submit IS added to the classpath.

This seems like a likely culprit.  What could cause this, and how can I fix
it?

Best,
Nick

On Tue, Sep 8, 2015 at 1:14 AM Igor Berman  wrote:

> as a starting point, attach your stacktrace...
> ps: look for duplicates in your classpath, maybe you include another jar
> with same class
>
> On 8 September 2015 at 06:38, Nicholas R. Peterson 
> wrote:
>
>> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through Yarn.
>> Serialization is set to use Kryo.
>>
>> I have a large object which I send to the executors as a Broadcast. The
>> object seems to serialize just fine. When it attempts to deserialize,
>> though, Kryo throws a ClassNotFoundException... for a class that I include
>> in the fat jar that I spark-submit.
>>
>> What could be causing this classpath issue with Kryo on the executors?
>> Where should I even start looking to try to diagnose the problem? I
>> appreciate any help you can provide.
>>
>> Thank you!
>>
>> -- Nick
>>
>
>


Re: How to read files from S3 from Spark local when there is a http proxy

2015-09-08 Thread tariq
Hi svelusamy,

Were you able to make it work? I am facing the exact same problem. Getting
connection timed when trying to access S3.

Thank you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-files-from-S3-from-Spark-local-when-there-is-a-http-proxy-tp21122p24604.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



1.5 Build Errors

2015-09-08 Thread Benjamin Zaitlen
Hi All,

I'm trying to build a distribution off of the latest in master and I keep
getting errors on MQTT and the build fails.   I'm running the build on a
m1.large which has 7.5 GB of RAM and no other major processes are running.

MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
> ./make-distribution.sh  --name continuum-custom-spark-1.5 --tgz -Pyarn
> -Phive -Phive-thriftserver -Phadoop-2.4 -Dhadoop.version=2.4.0



INFO] Spark Project GraphX ... SUCCESS [ 33.345
> s]
> [INFO] Spark Project Streaming  SUCCESS [01:08
> min]
> [INFO] Spark Project Catalyst . SUCCESS [01:39
> min]
> [INFO] Spark Project SQL .. SUCCESS [02:06
> min]
> [INFO] Spark Project ML Library ... SUCCESS [02:16
> min]
> [INFO] Spark Project Tools  SUCCESS [
>  4.087 s]
> [INFO] Spark Project Hive . SUCCESS [01:28
> min]
> [INFO] Spark Project REPL . SUCCESS [
> 16.291 s]
> [INFO] Spark Project YARN Shuffle Service . SUCCESS [
> 13.671 s]
> [INFO] Spark Project YARN . SUCCESS [
> 20.554 s]
> [INFO] Spark Project Hive Thrift Server ... SUCCESS [
> 14.332 s]
> [INFO] Spark Project Assembly . SUCCESS [03:33
> min]
> [INFO] Spark Project External Twitter . SUCCESS [
> 14.208 s]
> [INFO] Spark Project External Flume Sink .. SUCCESS [
> 11.535 s]
> [INFO] Spark Project External Flume ... SUCCESS [
> 19.010 s]
> [INFO] Spark Project External Flume Assembly .. SUCCESS [
>  5.210 s]
> [INFO] Spark Project External MQTT  FAILURE [01:10
> min]
> [INFO] Spark Project External MQTT Assembly ... SKIPPED
> [INFO] Spark Project External ZeroMQ .. SKIPPED
> [INFO] Spark Project External Kafka ... SKIPPED
> [INFO] Spark Project Examples . SKIPPED
> [INFO] Spark Project External Kafka Assembly .. SKIPPED
> [INFO]
> 
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 22:55 min
> [INFO] Finished at: 2015-09-07T22:42:57+00:00
> [INFO] Final Memory: 240M/455M
> [INFO]
> 
> [ERROR] GC overhead limit exceeded -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
> + return 1
> + exit 1


Any thoughts would be extremely helpful.

--Ben


Re: buildSupportsSnappy exception when reading the snappy file in Spark

2015-09-08 Thread dong.yajun
hi Akhil,

I just use property key LD_LIBRARY_PATH in conf/spark-env.xml instead of
SPARK_LIBRARY_PATH which points to the path of native, it works.

thanks.

On Tue, Sep 8, 2015 at 6:14 PM, Akhil Das 
wrote:

> Looks like you are having different versions of snappy library. Here's a
> similar discussion if you haven't seen it already
> http://stackoverflow.com/questions/22150417/hadoop-mapreduce-java-lang-unsatisfiedlinkerror-org-apache-hadoop-util-nativec
>
> Thanks
> Best Regards
>
> On Mon, Sep 7, 2015 at 7:41 AM, dong.yajun  wrote:
>
>> hi all,
>>
>> I met problem that can't read the file with snappy encoding from HDFS in
>> Spark1.4.1,
>>
>> I have configured the SPARK_LIBRARY_PATH property in conf/spark-env.sh to
>> the native path of Hadoop and restarted the spark cluster
>>
>> SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/app/install/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/lib/native
>>
>>
>> the partial exception:
>>
>> Caused by: org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem 
>> reading HFile Trailer from file
>> hdfs://nameservice1/hbase/data/default/IM_ItemBase/02296539242087aea77877dced9ba3d5/BaseInfo/9fe36f74334c4d30ba1bfc17bbd717f5
>>
>>  at
>> org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:478)
>>
>>  at
>> org.apache.hadoop.hbase.io.hfile.HFile.createReader(HFile.java:521)
>>
>>  at
>> com.newegg.ec.bigdata.dump.CombineHFileRecordReader.(CombineHFileRecordReader.java:33)
>>
>>  ... 19 more
>>
>> Caused by: java.lang.UnsatisfiedLinkError:
>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>>
>>  at
>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
>>
>>  at
>> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>>
>>  at
>> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:192)
>>
>>  at
>> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:176)
>>
>>  at
>> org.apache.hadoop.hbase.io.compress.Compression$Algorithm.getDecompressor(Compression.java:328)
>>
>>  at
>> org.apache.hadoop.hbase.io.compress.Compression.decompress(Compression.java:423)
>>
>>  at
>> org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext.prepareDecoding(HFileBlockDefaultDecodingContext.java:90)
>>
>> --
>> *Ric Dong*
>>
>>
>


-- 
*Ric Dong*


Re: How to read files from S3 from Spark local when there is a http proxy

2015-09-08 Thread Akhil Das
In the shell, before running the job you can actually do a
*export http_proxy="http://host:port"* and see if it works.

Thanks
Best Regards

On Tue, Sep 8, 2015 at 6:21 PM, tariq  wrote:

> Hi svelusamy,
>
> Were you able to make it work? I am facing the exact same problem. Getting
> connection timed when trying to access S3.
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-files-from-S3-from-Spark-local-when-there-is-a-http-proxy-tp21122p24604.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Nicholas R. Peterson
Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot
to include the list.)


15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in
stage 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException:
com.esotericsoftware.kryo.KryoException: Error constructing instance
of class: com.lumiata.patientanalysis.utils.CachedGraph
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
at 
com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: com.lumiata.patientanalysis.utils.CachedGraph
at 
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
at 
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
... 24 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at 
com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
at 
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
... 32 more
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find
class: com.i2028.Document.Document
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
at 
com.lumiata.patientanalysis.utils.CachedGraph.loadCacheFromSerializedData(CachedGraph.java:221)
at 
com.lumiata.patientanalysis.utils.CachedGraph.(CachedGraph.java:182)
at 
com.lumiata.patientanalysis.utils.Ca

Spark intermittently fails to recover from a worker failure (in standalone mode)

2015-09-08 Thread Cheuk Lam
We have run into a problem where some Spark job is aborted after a worker is
killed in a 2-worker standalone cluster.  The problem is intermittent, but
we can consistently reproduce it.  The problem only appears to happen when
we kill a worker.  It doesn't happen when we kill an executor directly.

Has anyone run into a similar problem?  We would appreciate if anyone could
share some related experience and/or any suggestion to troubleshoot this
problem.  Thank you.

~

For those who are interested, we did look into the logs and this is our
analysis so far.  We think the failure is caused by the following two things
combined, but we don't know how the first thing could happen.
* The BlockManagerMasterEndpoint in the driver has some stale block info
corresponding to the dead executor after the worker has been killed.  The
driver does appear to handle the "RemoveExecutor" message and cleans up all
related block info.  But subsequently, and intermittently, it receives some
Akka messages to re-register the dead BlockManager and re-add some of its
blocks.  As a result, upon GetLocations requests from the remaining
executor, the driver responds with some stale block info, instructing the
remaining executor to fetch blocks from the dead executor.  Please see the
driver log excerption below that shows the sequence of events described
above.  In the log, there are two executors: 1.2.3.4 was the one which got
shut down, while 5.6.7.8 is the remaining executor.  The driver also ran on
5.6.7.8.
* When the remaining executor's BlockManager issues a doGetRemote() call to
fetch the block of data, it fails because the targeted BlockManager which
resided in the dead executor is gone.  This failure results in an exception
forwarded to the caller, bypassing the mechanism in the doGetRemote()
function to trigger a re-computation of the block.  I don't know whether
that is intentional or not.

Driver log excerption that shows the driver received messages to re-register
the dead BlockManager after handling the RemoveExecutor message:

11690 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message
(172.236378 ms)
AkkaMessage(RegisterExecutor(0,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/Executor#670388190]),1.2.3.4:36140,8,Map(stdout
->
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stdout,
stderr ->
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stderr)),true)
from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$f]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
message AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4,
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message:
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4,
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)

11718 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] INFO
BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2
GB RAM, BlockManagerId(0, 1.2.3.4, 52615)

11719 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message
(1.498313 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4,
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

...

308892 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] ERROR
TaskSchedulerImpl: Lost executor 0 on 1.2.3.4: worker lost

...

308903 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO DAGScheduler:
Executor lost: 0 (epoch 178)

308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
message AkkaMessage(RemoveExecutor(0),true) from
Actor[akka://sparkDriver/temp/$Jqb]

308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message:
AkkaMessage(RemoveExecutor(0),true)

308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO
BlockManagerMasterEndpoint: Trying to remove executor 0 from
BlockManagerMaster.

308906 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO
BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0,
1.2.3

No auto decompress in Spark Java textFile function?

2015-09-08 Thread Chris Teoh
Hi Folks,

I tried using Spark v1.2 on bz2 files in Java but the behaviour is
different to the same textFile API call in Python and Scala.

That being said, how do I process to read .tar.bz2 files in Spark's Java
API?

Thanks in advance
Chris


[streaming] DStream with window performance issue

2015-09-08 Thread Alexey Ponkin
Hi,

I have an application with 2 streams, which are joined together.
Stream1 - is simple DStream(relativly small size batch chunks)
Stream2 - is a windowed DStream(with duration for example 60 seconds)

Stream1 and Stream2 are Kafka direct stream. 
The problem is that according to logs window operation is constantly 
increasing(http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php";>screen).
And also I see gap in pocessing window(http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen)
 in logs there are no events in that period.
So what is happen in that gap and why window is constantly insreasing?

Thank you in advance

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: 1.5 Build Errors

2015-09-08 Thread Sean Owen
It shows you there that Maven is out of memory. Give it more heap. I use 3gb.

On Tue, Sep 8, 2015 at 1:53 PM, Benjamin Zaitlen  wrote:
> Hi All,
>
> I'm trying to build a distribution off of the latest in master and I keep
> getting errors on MQTT and the build fails.   I'm running the build on a
> m1.large which has 7.5 GB of RAM and no other major processes are running.
>
>> MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
>> ./make-distribution.sh  --name continuum-custom-spark-1.5 --tgz -Pyarn
>> -Phive -Phive-thriftserver -Phadoop-2.4 -Dhadoop.version=2.4.0
>
>
>
>> INFO] Spark Project GraphX ... SUCCESS [
>> 33.345 s]
>> [INFO] Spark Project Streaming  SUCCESS [01:08
>> min]
>> [INFO] Spark Project Catalyst . SUCCESS [01:39
>> min]
>> [INFO] Spark Project SQL .. SUCCESS [02:06
>> min]
>> [INFO] Spark Project ML Library ... SUCCESS [02:16
>> min]
>> [INFO] Spark Project Tools  SUCCESS [
>> 4.087 s]
>> [INFO] Spark Project Hive . SUCCESS [01:28
>> min]
>> [INFO] Spark Project REPL . SUCCESS [
>> 16.291 s]
>> [INFO] Spark Project YARN Shuffle Service . SUCCESS [
>> 13.671 s]
>> [INFO] Spark Project YARN . SUCCESS [
>> 20.554 s]
>> [INFO] Spark Project Hive Thrift Server ... SUCCESS [
>> 14.332 s]
>> [INFO] Spark Project Assembly . SUCCESS [03:33
>> min]
>> [INFO] Spark Project External Twitter . SUCCESS [
>> 14.208 s]
>> [INFO] Spark Project External Flume Sink .. SUCCESS [
>> 11.535 s]
>> [INFO] Spark Project External Flume ... SUCCESS [
>> 19.010 s]
>> [INFO] Spark Project External Flume Assembly .. SUCCESS [
>> 5.210 s]
>> [INFO] Spark Project External MQTT  FAILURE [01:10
>> min]
>> [INFO] Spark Project External MQTT Assembly ... SKIPPED
>> [INFO] Spark Project External ZeroMQ .. SKIPPED
>> [INFO] Spark Project External Kafka ... SKIPPED
>> [INFO] Spark Project Examples . SKIPPED
>> [INFO] Spark Project External Kafka Assembly .. SKIPPED
>> [INFO]
>> 
>> [INFO] BUILD FAILURE
>> [INFO]
>> 
>> [INFO] Total time: 22:55 min
>> [INFO] Finished at: 2015-09-07T22:42:57+00:00
>> [INFO] Final Memory: 240M/455M
>> [INFO]
>> 
>> [ERROR] GC overhead limit exceeded -> [Help 1]
>> [ERROR]
>> [ERROR] To see the full stack trace of the errors, re-run Maven with the
>> -e switch.
>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> [ERROR]
>> [ERROR] For more information about the errors and possible solutions,
>> please read the following articles:
>> [ERROR] [Help 1]
>> http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
>> + return 1
>> + exit 1
>
>
> Any thoughts would be extremely helpful.
>
> --Ben

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
java.lang.ClassNotFoundException: com.i2028.Document.Document

1. so have you checked that jar that you create(fat jar) contains this class?

2. might be there is some stale cache issue...not sure though


On 8 September 2015 at 16:12, Nicholas R. Peterson 
wrote:

> Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot to 
> include the list.)
>
>
> 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 
> 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: 
> com.esotericsoftware.kryo.KryoException: Error constructing instance of 
> class: com.lumiata.patientanalysis.utils.CachedGraph
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>   at 
> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
>   at 
> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>   at org.apache.spark.scheduler.Task.run(Task.scala:70)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing 
> instance of class: com.lumiata.patientanalysis.utils.CachedGraph
>   at 
> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>   at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>   ... 24 more
> Caused by: java.lang.reflect.InvocationTargetException
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>   at 
> com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
>   at 
> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
>   ... 32 more
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
> com.i2028.Document.Document
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
>   at 
> com.esotericsoftware.kryo.serialize

Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Nick Peterson
Yes, the jar contains the class:

$ jar -tf lumiata-evaluation-assembly-1.0.jar | grep 2028/Document/Document
com/i2028/Document/Document$1.class
com/i2028/Document/Document.class

What else can I do?  Is there any way to get more information about the
classes available to the particular classloader kryo is using?

On Tue, Sep 8, 2015 at 6:34 AM Igor Berman  wrote:

> java.lang.ClassNotFoundException: com.i2028.Document.Document
>
> 1. so have you checked that jar that you create(fat jar) contains this class?
>
> 2. might be there is some stale cache issue...not sure though
>
>
> On 8 September 2015 at 16:12, Nicholas R. Peterson 
> wrote:
>
>> Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot to 
>> include the list.)
>>
>>
>> 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 
>> 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: 
>> com.esotericsoftware.kryo.KryoException: Error constructing instance of 
>> class: com.lumiata.patientanalysis.utils.CachedGraph
>>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>  at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>  at 
>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
>>  at 
>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing 
>> instance of class: com.lumiata.patientanalysis.utils.CachedGraph
>>  at 
>> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>>  at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>>  at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>>  at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>  at 
>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
>>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>>  ... 24 more
>> Caused by: java.lang.reflect.InvocationTargetException
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>  at 
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>  at 
>> com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
>>  at 
>> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
>>  ... 32 more
>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
>> com.i2028.Document.Document
>>  at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(Defa

Getting Started with Spark

2015-09-08 Thread Bryan Jeffrey
Hello. We're getting started with Spark Streaming. We're working to build
some unit/acceptance testing around functions that consume DStreams. The
current method for creating DStreams is to populate the data by creating an
InputDStream:

val input = Array(TestDataFactory.CreateEvent(123 notFoundData))
val queue =
scala.collection.mutable.Queue(ssc.sparkContext.parallelize(input))
val events: InputDStream[MyEvent] = ssc.queueStream(queue)

The 'events' InputDStream can then be fed into functions. However, the
stream does not allow checkpointing. This means that we're unable to use
this to feed methods/classes that execute stateful actions like
'updateStateByKey'.

Does anyone have a simple, contained method to create DStreams that allow
for checkpointing? I looked at the Spark unit test framework, but that
seems to require access to a bunch of spark internals (requiring that
you're within the spark package, etc.)


Java vs. Scala for Spark

2015-09-08 Thread Bryan Jeffrey
All,

We're looking at language choice in developing a simple streaming
processing application in spark.  We've got a small set of example code
built in Scala.  Articles like the following:
http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
would seem to indicate that Scala is great for use in distributed
programming (including Spark).  However, there is a large group of folks
that seem to feel that interoperability with other Java libraries is much
to be desired, and that the cost of learning (yet another) language is
quite high.

Has anyone looked at Scala for Spark dev in an enterprise environment?
What was the outcome?

Regards,

Bryan Jeffrey


Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Cody Koeninger
Have you tried deleting or moving the contents of the checkpoint directory
and restarting the job?

On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg 
wrote:

> Sorry, more relevant code below:
>
> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
> JavaStreamingContext jssc = params.isCheckpointed() ?
> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
> params);
> jssc.start();
> jssc.awaitTermination();
> jssc.close();
> ………..
>   private JavaStreamingContext createCheckpointedContext(SparkConf
> sparkConf, Parameters params) {
> JavaStreamingContextFactory factory = new
> JavaStreamingContextFactory() {
>   @Override
>   public JavaStreamingContext create() {
> return createContext(sparkConf, params);
>   }
> };
> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
> factory);
>   }
>
>   private JavaStreamingContext createContext(SparkConf sparkConf,
> Parameters params) {
> // Create context with the specified batch interval, in milliseconds.
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.milliseconds(params.getBatchDurationMillis()));
> // Set the checkpoint directory, if we're checkpointing
> if (params.isCheckpointed()) {
>   jssc.checkpoint(params.getCheckpointDir());
> }
>
> Set topicsSet = new HashSet(Arrays.asList(params
> .getTopic()));
>
> // Set the Kafka parameters.
> Map kafkaParams = new HashMap();
> kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
> .getBrokerList());
> if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>   kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
> .getAutoOffsetReset());
> }
>
> // Create direct Kafka stream with the brokers and the topic.
> JavaPairInputDStream messages =
> KafkaUtils.createDirectStream(
>   jssc,
>   String.class,
>   String.class,
>   StringDecoder.class,
>   StringDecoder.class,
>   kafkaParams,
>   topicsSet);
>
> // See if there's an override of the default checkpoint duration.
> if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
>   messages.checkpoint(Durations.milliseconds(params
> .getCheckpointMillis()));
> }
>
> JavaDStream messageBodies = messages.map(new
> Function, String>() {
>   @Override
>   public String call(Tuple2 tuple2) {
> return tuple2._2();
>   }
> });
>
> messageBodies.foreachRDD(new Function, Void>() {
>   @Override
>   public Void call(JavaRDD rdd) throws Exception {
> ProcessPartitionFunction func = new
> ProcessPartitionFunction(params);
> rdd.foreachPartition(func);
> return null;
>   }
> });
>
> return jssc;
> }
>
> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I'd think that we wouldn't be "accidentally recovering from checkpoint"
>> hours or even days after consumers have been restarted, plus the content is
>> the fresh content that I'm feeding, not some content that had been fed
>> before the last restart.
>>
>> The code is basically as follows:
>>
>> SparkConf sparkConf = createSparkConf(...);
>> // We'd be 'checkpointed' because we specify a checkpoint directory
>> which makes isCheckpointed true
>> JavaStreamingContext jssc = params.isCheckpointed() ?
>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>> params);jssc.start();
>>
>> jssc.awaitTermination();
>>
>> jssc.close();
>>
>>
>>
>> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das 
>> wrote:
>>
>>> Are you sure you are not accidentally recovering from checkpoint? How
>>> are you using StreamingContext.getOrCreate() in your code?
>>>
>>> TD
>>>
>>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 Tathagata,

 In our logs I see the batch duration millis being set first to 10 then
 to 20 seconds. I don't see the 20 being reflected later during ingestion.

 In the Spark UI under Streaming I see the below output, notice the *10
 second* Batch interval.  Can you think of a reason why it's stuck at
 10?  It used to be 1 second by the way, then somehow over the course of a
 few restarts we managed to get it to be 10 seconds.  Now it won't get reset
 to 20 seconds.  Any ideas?

 Streaming

- *Started at: *Thu Sep 03 10:59:03 EDT 2015
- *Time since start: *1 day 8 hours 44 minutes
- *Network receivers: *0
- *Batch interval: *10 seconds
- *Processed batches: *11790
- *Waiting batches: *0
- *Received records: *0
- *Processed records: *0



 Statistics over last 100 processed batchesReceiver Statistics
 No receivers
 Batch Processing Statistics

MetricLast batchMinimum25th percentileMedian75th 
 percentileMaximumProcessing
>

Re: Is HDFS required for Spark streaming?

2015-09-08 Thread Cody Koeninger
Yes, local directories will be sufficient

On Sat, Sep 5, 2015 at 10:44 AM, N B  wrote:

> Hi TD,
>
> Thanks!
>
> So our application does turn on checkpoints but we do not recover upon
> application restart (we just blow the checkpoint directory away first and
> re-create the StreamingContext) as we don't have a real need for that type
> of recovery. However, because the application does reduceeByKeyAndWindow
> operations, checkpointing has to be turned on. Do you think this scenario
> will also only work with HDFS or having local directories suffice?
>
> Thanks
> Nikunj
>
>
>
> On Fri, Sep 4, 2015 at 3:09 PM, Tathagata Das  wrote:
>
>> Shuffle spills will use local disk, HDFS not needed.
>> Spark and Spark Streaming checkpoint info WILL NEED HDFS for
>> fault-tolerance. So that stuff can be recovered even if the spark cluster
>> nodes go down.
>>
>> TD
>>
>> On Fri, Sep 4, 2015 at 2:45 PM, N B  wrote:
>>
>>> Hello,
>>>
>>> We have a Spark Streaming program that is currently running on a single
>>> node in "local[n]" master mode. We currently give it local directories for
>>> Spark's own state management etc. The input is streaming from network/flume
>>> and output is also to network/kafka etc, so the process as such does not
>>> need any distributed file system.
>>>
>>> Now, we do want to start distributing this procesing across a few
>>> machines and make a real cluster out of it. However, I am not sure if HDFS
>>> is a hard requirement for that to happen. I am thinking about the Shuffle
>>> spills, DStream/RDD persistence and checkpoint info. Do any of these
>>> require the state to be shared via HDFS? Are there other alternatives that
>>> can be utilized if state sharing is accomplished via the file system only.
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>
>


Re: 1.5 Build Errors

2015-09-08 Thread Benjamin Zaitlen
Ah, right.  Should've caught that.

The docs seem to recommend 2gb.  Should that be increased as well?

--Ben

On Tue, Sep 8, 2015 at 9:33 AM, Sean Owen  wrote:

> It shows you there that Maven is out of memory. Give it more heap. I use
> 3gb.
>
> On Tue, Sep 8, 2015 at 1:53 PM, Benjamin Zaitlen 
> wrote:
> > Hi All,
> >
> > I'm trying to build a distribution off of the latest in master and I keep
> > getting errors on MQTT and the build fails.   I'm running the build on a
> > m1.large which has 7.5 GB of RAM and no other major processes are
> running.
> >
> >> MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
> >> ./make-distribution.sh  --name continuum-custom-spark-1.5 --tgz -Pyarn
> >> -Phive -Phive-thriftserver -Phadoop-2.4 -Dhadoop.version=2.4.0
> >
> >
> >
> >> INFO] Spark Project GraphX ... SUCCESS [
> >> 33.345 s]
> >> [INFO] Spark Project Streaming  SUCCESS
> [01:08
> >> min]
> >> [INFO] Spark Project Catalyst . SUCCESS
> [01:39
> >> min]
> >> [INFO] Spark Project SQL .. SUCCESS
> [02:06
> >> min]
> >> [INFO] Spark Project ML Library ... SUCCESS
> [02:16
> >> min]
> >> [INFO] Spark Project Tools  SUCCESS [
> >> 4.087 s]
> >> [INFO] Spark Project Hive . SUCCESS
> [01:28
> >> min]
> >> [INFO] Spark Project REPL . SUCCESS [
> >> 16.291 s]
> >> [INFO] Spark Project YARN Shuffle Service . SUCCESS [
> >> 13.671 s]
> >> [INFO] Spark Project YARN . SUCCESS [
> >> 20.554 s]
> >> [INFO] Spark Project Hive Thrift Server ... SUCCESS [
> >> 14.332 s]
> >> [INFO] Spark Project Assembly . SUCCESS
> [03:33
> >> min]
> >> [INFO] Spark Project External Twitter . SUCCESS [
> >> 14.208 s]
> >> [INFO] Spark Project External Flume Sink .. SUCCESS [
> >> 11.535 s]
> >> [INFO] Spark Project External Flume ... SUCCESS [
> >> 19.010 s]
> >> [INFO] Spark Project External Flume Assembly .. SUCCESS [
> >> 5.210 s]
> >> [INFO] Spark Project External MQTT  FAILURE
> [01:10
> >> min]
> >> [INFO] Spark Project External MQTT Assembly ... SKIPPED
> >> [INFO] Spark Project External ZeroMQ .. SKIPPED
> >> [INFO] Spark Project External Kafka ... SKIPPED
> >> [INFO] Spark Project Examples . SKIPPED
> >> [INFO] Spark Project External Kafka Assembly .. SKIPPED
> >> [INFO]
> >> 
> >> [INFO] BUILD FAILURE
> >> [INFO]
> >> 
> >> [INFO] Total time: 22:55 min
> >> [INFO] Finished at: 2015-09-07T22:42:57+00:00
> >> [INFO] Final Memory: 240M/455M
> >> [INFO]
> >> 
> >> [ERROR] GC overhead limit exceeded -> [Help 1]
> >> [ERROR]
> >> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> >> -e switch.
> >> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> >> [ERROR]
> >> [ERROR] For more information about the errors and possible solutions,
> >> please read the following articles:
> >> [ERROR] [Help 1]
> >> http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
> >> + return 1
> >> + exit 1
> >
> >
> > Any thoughts would be extremely helpful.
> >
> > --Ben
>


Re: 1.5 Build Errors

2015-09-08 Thread Sean Owen
It might need more memory in certain situations / running certain
tests. If 3gb works for your relatively full build, yes you can open a
PR to change any occurrences of lower recommendations to 3gb.

On Tue, Sep 8, 2015 at 3:02 PM, Benjamin Zaitlen  wrote:
> Ah, right.  Should've caught that.
>
> The docs seem to recommend 2gb.  Should that be increased as well?
>
> --Ben
>
> On Tue, Sep 8, 2015 at 9:33 AM, Sean Owen  wrote:
>>
>> It shows you there that Maven is out of memory. Give it more heap. I use
>> 3gb.
>>
>> On Tue, Sep 8, 2015 at 1:53 PM, Benjamin Zaitlen 
>> wrote:
>> > Hi All,
>> >
>> > I'm trying to build a distribution off of the latest in master and I
>> > keep
>> > getting errors on MQTT and the build fails.   I'm running the build on a
>> > m1.large which has 7.5 GB of RAM and no other major processes are
>> > running.
>> >
>> >> MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
>> >> ./make-distribution.sh  --name continuum-custom-spark-1.5 --tgz -Pyarn
>> >> -Phive -Phive-thriftserver -Phadoop-2.4 -Dhadoop.version=2.4.0
>> >
>> >
>> >
>> >> INFO] Spark Project GraphX ... SUCCESS [
>> >> 33.345 s]
>> >> [INFO] Spark Project Streaming  SUCCESS
>> >> [01:08
>> >> min]
>> >> [INFO] Spark Project Catalyst . SUCCESS
>> >> [01:39
>> >> min]
>> >> [INFO] Spark Project SQL .. SUCCESS
>> >> [02:06
>> >> min]
>> >> [INFO] Spark Project ML Library ... SUCCESS
>> >> [02:16
>> >> min]
>> >> [INFO] Spark Project Tools  SUCCESS [
>> >> 4.087 s]
>> >> [INFO] Spark Project Hive . SUCCESS
>> >> [01:28
>> >> min]
>> >> [INFO] Spark Project REPL . SUCCESS [
>> >> 16.291 s]
>> >> [INFO] Spark Project YARN Shuffle Service . SUCCESS [
>> >> 13.671 s]
>> >> [INFO] Spark Project YARN . SUCCESS [
>> >> 20.554 s]
>> >> [INFO] Spark Project Hive Thrift Server ... SUCCESS [
>> >> 14.332 s]
>> >> [INFO] Spark Project Assembly . SUCCESS
>> >> [03:33
>> >> min]
>> >> [INFO] Spark Project External Twitter . SUCCESS [
>> >> 14.208 s]
>> >> [INFO] Spark Project External Flume Sink .. SUCCESS [
>> >> 11.535 s]
>> >> [INFO] Spark Project External Flume ... SUCCESS [
>> >> 19.010 s]
>> >> [INFO] Spark Project External Flume Assembly .. SUCCESS [
>> >> 5.210 s]
>> >> [INFO] Spark Project External MQTT  FAILURE
>> >> [01:10
>> >> min]
>> >> [INFO] Spark Project External MQTT Assembly ... SKIPPED
>> >> [INFO] Spark Project External ZeroMQ .. SKIPPED
>> >> [INFO] Spark Project External Kafka ... SKIPPED
>> >> [INFO] Spark Project Examples . SKIPPED
>> >> [INFO] Spark Project External Kafka Assembly .. SKIPPED
>> >> [INFO]
>> >>
>> >> 
>> >> [INFO] BUILD FAILURE
>> >> [INFO]
>> >>
>> >> 
>> >> [INFO] Total time: 22:55 min
>> >> [INFO] Finished at: 2015-09-07T22:42:57+00:00
>> >> [INFO] Final Memory: 240M/455M
>> >> [INFO]
>> >>
>> >> 
>> >> [ERROR] GC overhead limit exceeded -> [Help 1]
>> >> [ERROR]
>> >> [ERROR] To see the full stack trace of the errors, re-run Maven with
>> >> the
>> >> -e switch.
>> >> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> >> [ERROR]
>> >> [ERROR] For more information about the errors and possible solutions,
>> >> please read the following articles:
>> >> [ERROR] [Help 1]
>> >> http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
>> >> + return 1
>> >> + exit 1
>> >
>> >
>> > Any thoughts would be extremely helpful.
>> >
>> > --Ben
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [streaming] DStream with window performance issue

2015-09-08 Thread Cody Koeninger
Can you provide more info (what version of spark, code example)?

On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin  wrote:

> Hi,
>
> I have an application with 2 streams, which are joined together.
> Stream1 - is simple DStream(relativly small size batch chunks)
> Stream2 - is a windowed DStream(with duration for example 60 seconds)
>
> Stream1 and Stream2 are Kafka direct stream.
> The problem is that according to logs window operation is constantly
> increasing(http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php
> ">screen).
> And also I see gap in pocessing window(http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen)
> in logs there are no events in that period.
> So what is happen in that gap and why window is constantly insreasing?
>
> Thank you in advance
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Java vs. Scala for Spark

2015-09-08 Thread Jonathan Coveney
It worked for Twitter!

Seriously though: scala is much much more pleasant. And scala has a great
story for using Java libs. And since spark is kind of framework-y (use its
scripts to submit, start up repl, etc) the projects tend to be lead
projects, so even in a big company that uses Java the cost of scala is low
and fairly isolated. If you need to write large amounts of supporting
libraries, you are free to use Java or scala as you see fit.

El martes, 8 de septiembre de 2015, Bryan Jeffrey 
escribió:

> All,
>
> We're looking at language choice in developing a simple streaming
> processing application in spark.  We've got a small set of example code
> built in Scala.  Articles like the following:
> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
> would seem to indicate that Scala is great for use in distributed
> programming (including Spark).  However, there is a large group of folks
> that seem to feel that interoperability with other Java libraries is much
> to be desired, and that the cost of learning (yet another) language is
> quite high.
>
> Has anyone looked at Scala for Spark dev in an enterprise environment?
> What was the outcome?
>
> Regards,
>
> Bryan Jeffrey
>


Re: Java vs. Scala for Spark

2015-09-08 Thread Ted Yu
Performance wise, Scala is by far the best choice when you use Spark.

The cost of learning Scala is not negligible but not insurmountable either.

My personal opinion.

On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey 
wrote:

> All,
>
> We're looking at language choice in developing a simple streaming
> processing application in spark.  We've got a small set of example code
> built in Scala.  Articles like the following:
> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
> would seem to indicate that Scala is great for use in distributed
> programming (including Spark).  However, there is a large group of folks
> that seem to feel that interoperability with other Java libraries is much
> to be desired, and that the cost of learning (yet another) language is
> quite high.
>
> Has anyone looked at Scala for Spark dev in an enterprise environment?
> What was the outcome?
>
> Regards,
>
> Bryan Jeffrey
>


Re: hadoop2.6.0 + spark1.4.1 + python2.7.10

2015-09-08 Thread Sasha Kacanski
Hi Ashish,
Thanks for the update.
I tried all of it, but what I don't get it is that I run cluster with one
node so presumably I should have PYspark binaries there as I am developing
on same host.
Could you tell me where you placed parcels or whatever cloudera is using.
My understanding of yarn and spark is that these binaries get compressed
and packaged with Java to be pushed to work node.
Regards,
On Sep 7, 2015 9:00 PM, "Ashish Dutt"  wrote:

> Hello Sasha,
>
> I have no answer for debian. My cluster is on Linux and I'm using CDH 5.4
> Your question-  "Error from python worker:
>   /cube/PY/Python27/bin/python: No module named pyspark"
>
> On a single node (ie one server/machine/computer) I installed pyspark
> binaries and it worked. Connected it to pycharm and it worked too.
>
> Next I tried executing pyspark command on another node (say the worker) in
> the cluster and i got this error message, Error from python worker: PATH:
> No module named pyspark".
>
> My first guess was that the worker is not picking up the path of pyspark
> binaries installed on the server ( I tried many a things like hard-coding
> the pyspark path in the config.sh file on the worker- NO LUCK; tried
> dynamic path from the code in pycharm- NO LUCK... ; searched the web and
> asked the question in almost every online forum--NO LUCK..; banged my head
> several times with pyspark/hadoop books--NO LUCK... Finally, one fine day a
> 'watermelon' dropped while brooding on this problem and I installed pyspark
> binaries on all the worker machines ) Now when I tried executing just the
> command pyspark on the worker's it worked. Tried some simple program
> snippets on each worker, it works too.
>
> I am not sure if this will help or not for your use-case.
>
>
>
> Sincerely,
> Ashish
>
> On Mon, Sep 7, 2015 at 11:04 PM, Sasha Kacanski 
> wrote:
>
>> Thanks Ashish,
>> nice blog but does not cover my issue. Actually I have pycharm running
>> and loading pyspark and rest of libraries perfectly fine.
>> My issue is that I am not sure what is triggering
>>
>> Error from python worker:
>>   /cube/PY/Python27/bin/python: No module named pyspark
>> pyspark
>> PYTHONPATH was:
>>
>> /tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/filecache/18/spark-assembly-1.
>> 4.1-hadoop2.6.0.jar
>>
>> Question is why is yarn not getting python package to run on the single
>> node via YARN?
>> Some people are saying run with JAVA 6 due to zip library changes between
>> 6/7/8, some identified bug w RH, i am on debian,  then some documentation
>> errors but nothing is really clear.
>>
>> i have binaries for spark hadoop and i did just fine with spark sql
>> module, hive, python, pandas ad yarn.
>> Locally as i said app is working fine (pandas to spark df to parquet)
>> But as soon as I move to yarn client mode yarn is not getting packages
>> required to run app.
>>
>> If someone confirms that I need to build everything from source with
>> specific version of software I will do that, but at this point I am not
>> sure what to do to remedy this situation...
>>
>> --sasha
>>
>>
>> On Sun, Sep 6, 2015 at 8:27 PM, Ashish Dutt 
>> wrote:
>>
>>> Hi Aleksandar,
>>> Quite some time ago, I faced the same problem and I found a solution
>>> which I have posted here on my blog
>>> .
>>> See if that can help you and if it does not then you can check out these
>>> questions & solution on stackoverflow
>>>  website
>>>
>>>
>>> Sincerely,
>>> Ashish Dutt
>>>
>>>
>>> On Mon, Sep 7, 2015 at 7:17 AM, Sasha Kacanski 
>>> wrote:
>>>
 Hi,
 I am successfully running python app via pyCharm in local mode
 setMaster("local[*]")

 When I turn on SparkConf().setMaster("yarn-client")

 and run via

 park-submit PysparkPandas.py


 I run into issue:
 Error from python worker:
   /cube/PY/Python27/bin/python: No module named pyspark
 PYTHONPATH was:

 /tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/filecache/18/spark-assembly-1.4.1-hadoop2.6.0.jar

 I am running java
 hadoop@pluto:~/pySpark$ /opt/java/jdk/bin/java -version
 java version "1.8.0_31"
 Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
 Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)

 Should I try same thing with java 6/7

 Is this packaging issue or I have something wrong with configurations
 ...

 Regards,

 --
 Aleksandar Kacanski

>>>
>>>
>>
>>
>> --
>> Aleksandar Kacanski
>>
>
>


Re: Java vs. Scala for Spark

2015-09-08 Thread Sean Owen
Why would Scala vs Java performance be different Ted? Relatively
speaking there is almost no runtime difference; it's the same APIs or
calls via a thin wrapper. Scala/Java vs Python is a different story.

Java libraries can be used in Scala. Vice-versa too, though calling
Scala-generated classes can be clunky in Java. What's your concern
about interoperability Jeffrey?

I disagree that Java 7 vs Scala usability is sooo different, but it's
certainly much more natural to use Spark in Scala. Java 8 closes a lot
of the usability gap with Scala, but not all of it. Enough that it's
not crazy for a Java shop to stick to Java 8 + Spark and not be at a
big disadvantage.

The downsides of Scala IMHO are that it provides too much: lots of
nice features (closures! superb collections!), lots of rope to hang
yourself too (implicits sometimes!) and some WTF features (XML
literals!) Learning the good useful bits of Scala isn't hard. You can
always write Scala code as much like Java as you like, I find.

Scala tooling is different from Java tooling; that's an
underappreciated barrier. For example I think SBT is good for
development, bad for general project lifecycle management compared to
Maven, but in any event still less developed. SBT/scalac are huge
resource hogs, since so much of Scala is really implemented in the
compiler; prepare to update your laptop to develop in Scala on your
IDE of choice, and start to think about running long-running compile
servers like we did in the year 2000.

Still net-net I would choose Scala, FWIW.

On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu  wrote:
> Performance wise, Scala is by far the best choice when you use Spark.
>
> The cost of learning Scala is not negligible but not insurmountable either.
>
> My personal opinion.
>
> On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey 
> wrote:
>>
>> All,
>>
>> We're looking at language choice in developing a simple streaming
>> processing application in spark.  We've got a small set of example code
>> built in Scala.  Articles like the following:
>> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
>> would seem to indicate that Scala is great for use in distributed
>> programming (including Spark).  However, there is a large group of folks
>> that seem to feel that interoperability with other Java libraries is much to
>> be desired, and that the cost of learning (yet another) language is quite
>> high.
>>
>> Has anyone looked at Scala for Spark dev in an enterprise environment?
>> What was the outcome?
>>
>> Regards,
>>
>> Bryan Jeffrey
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can not allocate executor when running spark on mesos

2015-09-08 Thread canan chen
Yes, I follow the guide in this doc, and run it as mesos client mode

On Tue, Sep 8, 2015 at 6:31 PM, Akhil Das 
wrote:

> In which mode are you submitting your application? (coarse-grained or
> fine-grained(default)). Have you gone through this documentation already?
> http://spark.apache.org/docs/latest/running-on-mesos.html#using-a-mesos-master-url
>
> Thanks
> Best Regards
>
> On Tue, Sep 8, 2015 at 12:54 PM, canan chen  wrote:
>
>> Hi all,
>>
>> I try to run spark on mesos, but it looks like I can not allocate
>> resources from mesos. I am not expert of mesos, but from the mesos log, it
>> seems spark always decline the offer from mesos. Not sure what's wrong,
>> maybe need some configuration change. Here's the mesos master log
>>
>> I0908 15:08:16.515960 301916160 master.cpp:1767] Received registration
>> request for framework 'Spark shell' at
>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>> I0908 15:08:16.520545 301916160 master.cpp:1834] Registering framework
>> 20150908-143320-16777343-5050-41965- (Spark shell) at
>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133 with
>> checkpointing disabled and capabilities [  ]
>> I0908 15:08:16.522307 300843008 hierarchical.hpp:386] Added framework
>> 20150908-143320-16777343-5050-41965-
>> I0908 15:08:16.525845 301379584 master.cpp:4290] Sending 1 offers to
>> framework 20150908-143320-16777343-5050-41965- (Spark shell) at
>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>> I0908 15:08:16.637677 302452736 master.cpp:2884] Processing DECLINE call
>> for offers: [ 20150908-143320-16777343-5050-41965-O0 ] for framework
>> 20150908-143320-16777343-5050-41965- (Spark shell) at
>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>> I0908 15:08:16.639197 299233280 hierarchical.hpp:761] Recovered
>> cpus(*):8; mem(*):15360; disk(*):470842; ports(*):[31000-32000] (total:
>> cpus(*):8; mem(*):15360; disk(*):470842; ports(*):[31000-32000], allocated:
>> ) on slave 20150908-143320-16777343-5050-41965-S0 from framework
>> 20150908-143320-16777343-5050-41965-
>> I0908 15:08:21.786932 300306432 master.cpp:4290] Sending 1 offers to
>> framework 20150908-143320-16777343-5050-41965- (Spark shell) at
>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>> I0908 15:08:21.789979 298696704 master.cpp:2884] Processing DECLINE call
>> for offers: [ 20150908-143320-16777343-5050-41965-O1 ] for framework
>> 20150908-143320-16777343-5050-41965- (Spark shell) at
>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>>
>
>


Re: Java vs. Scala for Spark

2015-09-08 Thread Bryan Jeffrey
Thank you for the quick responses.  It's useful to have some insight from
folks already extensively using Spark.

Regards,

Bryan Jeffrey

On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen  wrote:

> Why would Scala vs Java performance be different Ted? Relatively
> speaking there is almost no runtime difference; it's the same APIs or
> calls via a thin wrapper. Scala/Java vs Python is a different story.
>
> Java libraries can be used in Scala. Vice-versa too, though calling
> Scala-generated classes can be clunky in Java. What's your concern
> about interoperability Jeffrey?
>
> I disagree that Java 7 vs Scala usability is sooo different, but it's
> certainly much more natural to use Spark in Scala. Java 8 closes a lot
> of the usability gap with Scala, but not all of it. Enough that it's
> not crazy for a Java shop to stick to Java 8 + Spark and not be at a
> big disadvantage.
>
> The downsides of Scala IMHO are that it provides too much: lots of
> nice features (closures! superb collections!), lots of rope to hang
> yourself too (implicits sometimes!) and some WTF features (XML
> literals!) Learning the good useful bits of Scala isn't hard. You can
> always write Scala code as much like Java as you like, I find.
>
> Scala tooling is different from Java tooling; that's an
> underappreciated barrier. For example I think SBT is good for
> development, bad for general project lifecycle management compared to
> Maven, but in any event still less developed. SBT/scalac are huge
> resource hogs, since so much of Scala is really implemented in the
> compiler; prepare to update your laptop to develop in Scala on your
> IDE of choice, and start to think about running long-running compile
> servers like we did in the year 2000.
>
> Still net-net I would choose Scala, FWIW.
>
> On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu  wrote:
> > Performance wise, Scala is by far the best choice when you use Spark.
> >
> > The cost of learning Scala is not negligible but not insurmountable
> either.
> >
> > My personal opinion.
> >
> > On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey 
> > wrote:
> >>
> >> All,
> >>
> >> We're looking at language choice in developing a simple streaming
> >> processing application in spark.  We've got a small set of example code
> >> built in Scala.  Articles like the following:
> >>
> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
> >> would seem to indicate that Scala is great for use in distributed
> >> programming (including Spark).  However, there is a large group of folks
> >> that seem to feel that interoperability with other Java libraries is
> much to
> >> be desired, and that the cost of learning (yet another) language is
> quite
> >> high.
> >>
> >> Has anyone looked at Scala for Spark dev in an enterprise environment?
> >> What was the outcome?
> >>
> >> Regards,
> >>
> >> Bryan Jeffrey
> >
> >
>


Partitioning a RDD for training multiple classifiers

2015-09-08 Thread Maximo Gurmendez
Hi,
I have a RDD that needs to be split (say, by client) in order to train n 
models (i.e. one for each client). Since most of the classifiers that come with 
ml-lib only can accept an RDD as input (and cannot build multiple models in one 
pass - as I understand it), the only way to train n separate models is to 
create n RDDs (by filtering the original one). 

Conceptually:

rdd1,rdd2,rdd3 = splitRdds(bigRdd)  

the function splitRdd would use the standard filter mechanism .  I would then 
need to submit n training spark jobs. When I do this, will it mean that it will 
traverse the bigRdd n times? Is there a better way to persist the splitted rdd 
(i.e. save the split RDD in a cache)? 

I could cache the bigRdd, but not sure that would be ver efficient either since 
it will require the same number of passes anyway (I think - but I’m relatively 
new to Spark). Also I’m planning on reusing the individual splits (rdd1, rdd2, 
etc so would be convenient to have them individually cached). 

Another problem is that the splits are could be very skewed (i.e. one split 
could represent a large percentage of the original bigRdd ). So saving the 
split RDDs to disk (at least, naively) could be a challenge. 

Is there any better way of doing this?

Thanks!
   Máximo



Re: Java vs. Scala for Spark

2015-09-08 Thread Ted Yu
Sean:
w.r.t. performance, I meant Scala/Java vs Python.

Cheers

On Tue, Sep 8, 2015 at 7:28 AM, Sean Owen  wrote:

> Why would Scala vs Java performance be different Ted? Relatively
> speaking there is almost no runtime difference; it's the same APIs or
> calls via a thin wrapper. Scala/Java vs Python is a different story.
>
> Java libraries can be used in Scala. Vice-versa too, though calling
> Scala-generated classes can be clunky in Java. What's your concern
> about interoperability Jeffrey?
>
> I disagree that Java 7 vs Scala usability is sooo different, but it's
> certainly much more natural to use Spark in Scala. Java 8 closes a lot
> of the usability gap with Scala, but not all of it. Enough that it's
> not crazy for a Java shop to stick to Java 8 + Spark and not be at a
> big disadvantage.
>
> The downsides of Scala IMHO are that it provides too much: lots of
> nice features (closures! superb collections!), lots of rope to hang
> yourself too (implicits sometimes!) and some WTF features (XML
> literals!) Learning the good useful bits of Scala isn't hard. You can
> always write Scala code as much like Java as you like, I find.
>
> Scala tooling is different from Java tooling; that's an
> underappreciated barrier. For example I think SBT is good for
> development, bad for general project lifecycle management compared to
> Maven, but in any event still less developed. SBT/scalac are huge
> resource hogs, since so much of Scala is really implemented in the
> compiler; prepare to update your laptop to develop in Scala on your
> IDE of choice, and start to think about running long-running compile
> servers like we did in the year 2000.
>
> Still net-net I would choose Scala, FWIW.
>
> On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu  wrote:
> > Performance wise, Scala is by far the best choice when you use Spark.
> >
> > The cost of learning Scala is not negligible but not insurmountable
> either.
> >
> > My personal opinion.
> >
> > On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey 
> > wrote:
> >>
> >> All,
> >>
> >> We're looking at language choice in developing a simple streaming
> >> processing application in spark.  We've got a small set of example code
> >> built in Scala.  Articles like the following:
> >>
> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
> >> would seem to indicate that Scala is great for use in distributed
> >> programming (including Spark).  However, there is a large group of folks
> >> that seem to feel that interoperability with other Java libraries is
> much to
> >> be desired, and that the cost of learning (yet another) language is
> quite
> >> high.
> >>
> >> Has anyone looked at Scala for Spark dev in an enterprise environment?
> >> What was the outcome?
> >>
> >> Regards,
> >>
> >> Bryan Jeffrey
> >
> >
>


Re: 1.5 Build Errors

2015-09-08 Thread Benjamin Zaitlen
I'm still getting errors with 3g.  I've increase to 4g and I'll report back

To be clear:

export MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=1024M
-XX:ReservedCodeCacheSize=1024m"

[ERROR] GC overhead limit exceeded -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
> + return 1
> + exit 1


On Tue, Sep 8, 2015 at 10:03 AM, Sean Owen  wrote:

> It might need more memory in certain situations / running certain
> tests. If 3gb works for your relatively full build, yes you can open a
> PR to change any occurrences of lower recommendations to 3gb.
>
> On Tue, Sep 8, 2015 at 3:02 PM, Benjamin Zaitlen 
> wrote:
> > Ah, right.  Should've caught that.
> >
> > The docs seem to recommend 2gb.  Should that be increased as well?
> >
> > --Ben
> >
> > On Tue, Sep 8, 2015 at 9:33 AM, Sean Owen  wrote:
> >>
> >> It shows you there that Maven is out of memory. Give it more heap. I use
> >> 3gb.
> >>
> >> On Tue, Sep 8, 2015 at 1:53 PM, Benjamin Zaitlen 
> >> wrote:
> >> > Hi All,
> >> >
> >> > I'm trying to build a distribution off of the latest in master and I
> >> > keep
> >> > getting errors on MQTT and the build fails.   I'm running the build
> on a
> >> > m1.large which has 7.5 GB of RAM and no other major processes are
> >> > running.
> >> >
> >> >> MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
> -XX:ReservedCodeCacheSize=512m"
> >> >> ./make-distribution.sh  --name continuum-custom-spark-1.5 --tgz
> -Pyarn
> >> >> -Phive -Phive-thriftserver -Phadoop-2.4 -Dhadoop.version=2.4.0
> >> >
> >> >
> >> >
> >> >> INFO] Spark Project GraphX ... SUCCESS [
> >> >> 33.345 s]
> >> >> [INFO] Spark Project Streaming  SUCCESS
> >> >> [01:08
> >> >> min]
> >> >> [INFO] Spark Project Catalyst . SUCCESS
> >> >> [01:39
> >> >> min]
> >> >> [INFO] Spark Project SQL .. SUCCESS
> >> >> [02:06
> >> >> min]
> >> >> [INFO] Spark Project ML Library ... SUCCESS
> >> >> [02:16
> >> >> min]
> >> >> [INFO] Spark Project Tools  SUCCESS [
> >> >> 4.087 s]
> >> >> [INFO] Spark Project Hive . SUCCESS
> >> >> [01:28
> >> >> min]
> >> >> [INFO] Spark Project REPL . SUCCESS [
> >> >> 16.291 s]
> >> >> [INFO] Spark Project YARN Shuffle Service . SUCCESS [
> >> >> 13.671 s]
> >> >> [INFO] Spark Project YARN . SUCCESS [
> >> >> 20.554 s]
> >> >> [INFO] Spark Project Hive Thrift Server ... SUCCESS [
> >> >> 14.332 s]
> >> >> [INFO] Spark Project Assembly . SUCCESS
> >> >> [03:33
> >> >> min]
> >> >> [INFO] Spark Project External Twitter . SUCCESS [
> >> >> 14.208 s]
> >> >> [INFO] Spark Project External Flume Sink .. SUCCESS [
> >> >> 11.535 s]
> >> >> [INFO] Spark Project External Flume ... SUCCESS [
> >> >> 19.010 s]
> >> >> [INFO] Spark Project External Flume Assembly .. SUCCESS [
> >> >> 5.210 s]
> >> >> [INFO] Spark Project External MQTT  FAILURE
> >> >> [01:10
> >> >> min]
> >> >> [INFO] Spark Project External MQTT Assembly ... SKIPPED
> >> >> [INFO] Spark Project External ZeroMQ .. SKIPPED
> >> >> [INFO] Spark Project External Kafka ... SKIPPED
> >> >> [INFO] Spark Project Examples . SKIPPED
> >> >> [INFO] Spark Project External Kafka Assembly .. SKIPPED
> >> >> [INFO]
> >> >>
> >> >>
> 
> >> >> [INFO] BUILD FAILURE
> >> >> [INFO]
> >> >>
> >> >>
> 
> >> >> [INFO] Total time: 22:55 min
> >> >> [INFO] Finished at: 2015-09-07T22:42:57+00:00
> >> >> [INFO] Final Memory: 240M/455M
> >> >> [INFO]
> >> >>
> >> >>
> 
> >> >> [ERROR] GC overhead limit exceeded -> [Help 1]
> >> >> [ERROR]
> >> >> [ERROR] To see the full stack trace of the errors, re-run Maven with
> >> >> the
> >> >> -e switch.
> >> >> [ERROR] Re-run Maven using the -X switch to enable full debug
> logging.
> >> >> [ERROR]
> >> >> [ERROR] For more information about the errors and possible solutions,
> >> >> please read the following articles:
> >> >> [ERROR] [Help 1]
> >> >> http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
> >> >> + return 1
> >> >> + exit 1
> >> >
> >> >
> >> > Any thoughts would be extremely helpful.
> >> >
> >> > --Ben
> >

Re: 1.5 Build Errors

2015-09-08 Thread Ted Yu
Do you run Zinc while compiling ?

Cheers

On Tue, Sep 8, 2015 at 7:56 AM, Benjamin Zaitlen  wrote:

> I'm still getting errors with 3g.  I've increase to 4g and I'll report back
>
> To be clear:
>
> export MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=1024M
> -XX:ReservedCodeCacheSize=1024m"
>
> [ERROR] GC overhead limit exceeded -> [Help 1]
>> [ERROR]
>> [ERROR] To see the full stack trace of the errors, re-run Maven with the
>> -e switch.
>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> [ERROR]
>> [ERROR] For more information about the errors and possible solutions,
>> please read the following articles:
>> [ERROR] [Help 1]
>> http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
>> + return 1
>> + exit 1
>
>
> On Tue, Sep 8, 2015 at 10:03 AM, Sean Owen  wrote:
>
>> It might need more memory in certain situations / running certain
>> tests. If 3gb works for your relatively full build, yes you can open a
>> PR to change any occurrences of lower recommendations to 3gb.
>>
>> On Tue, Sep 8, 2015 at 3:02 PM, Benjamin Zaitlen 
>> wrote:
>> > Ah, right.  Should've caught that.
>> >
>> > The docs seem to recommend 2gb.  Should that be increased as well?
>> >
>> > --Ben
>> >
>> > On Tue, Sep 8, 2015 at 9:33 AM, Sean Owen  wrote:
>> >>
>> >> It shows you there that Maven is out of memory. Give it more heap. I
>> use
>> >> 3gb.
>> >>
>> >> On Tue, Sep 8, 2015 at 1:53 PM, Benjamin Zaitlen 
>> >> wrote:
>> >> > Hi All,
>> >> >
>> >> > I'm trying to build a distribution off of the latest in master and I
>> >> > keep
>> >> > getting errors on MQTT and the build fails.   I'm running the build
>> on a
>> >> > m1.large which has 7.5 GB of RAM and no other major processes are
>> >> > running.
>> >> >
>> >> >> MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
>> -XX:ReservedCodeCacheSize=512m"
>> >> >> ./make-distribution.sh  --name continuum-custom-spark-1.5 --tgz
>> -Pyarn
>> >> >> -Phive -Phive-thriftserver -Phadoop-2.4 -Dhadoop.version=2.4.0
>> >> >
>> >> >
>> >> >
>> >> >> INFO] Spark Project GraphX ... SUCCESS [
>> >> >> 33.345 s]
>> >> >> [INFO] Spark Project Streaming  SUCCESS
>> >> >> [01:08
>> >> >> min]
>> >> >> [INFO] Spark Project Catalyst . SUCCESS
>> >> >> [01:39
>> >> >> min]
>> >> >> [INFO] Spark Project SQL .. SUCCESS
>> >> >> [02:06
>> >> >> min]
>> >> >> [INFO] Spark Project ML Library ... SUCCESS
>> >> >> [02:16
>> >> >> min]
>> >> >> [INFO] Spark Project Tools  SUCCESS
>> [
>> >> >> 4.087 s]
>> >> >> [INFO] Spark Project Hive . SUCCESS
>> >> >> [01:28
>> >> >> min]
>> >> >> [INFO] Spark Project REPL . SUCCESS
>> [
>> >> >> 16.291 s]
>> >> >> [INFO] Spark Project YARN Shuffle Service . SUCCESS
>> [
>> >> >> 13.671 s]
>> >> >> [INFO] Spark Project YARN . SUCCESS
>> [
>> >> >> 20.554 s]
>> >> >> [INFO] Spark Project Hive Thrift Server ... SUCCESS
>> [
>> >> >> 14.332 s]
>> >> >> [INFO] Spark Project Assembly . SUCCESS
>> >> >> [03:33
>> >> >> min]
>> >> >> [INFO] Spark Project External Twitter . SUCCESS
>> [
>> >> >> 14.208 s]
>> >> >> [INFO] Spark Project External Flume Sink .. SUCCESS
>> [
>> >> >> 11.535 s]
>> >> >> [INFO] Spark Project External Flume ... SUCCESS
>> [
>> >> >> 19.010 s]
>> >> >> [INFO] Spark Project External Flume Assembly .. SUCCESS
>> [
>> >> >> 5.210 s]
>> >> >> [INFO] Spark Project External MQTT  FAILURE
>> >> >> [01:10
>> >> >> min]
>> >> >> [INFO] Spark Project External MQTT Assembly ... SKIPPED
>> >> >> [INFO] Spark Project External ZeroMQ .. SKIPPED
>> >> >> [INFO] Spark Project External Kafka ... SKIPPED
>> >> >> [INFO] Spark Project Examples . SKIPPED
>> >> >> [INFO] Spark Project External Kafka Assembly .. SKIPPED
>> >> >> [INFO]
>> >> >>
>> >> >>
>> 
>> >> >> [INFO] BUILD FAILURE
>> >> >> [INFO]
>> >> >>
>> >> >>
>> 
>> >> >> [INFO] Total time: 22:55 min
>> >> >> [INFO] Finished at: 2015-09-07T22:42:57+00:00
>> >> >> [INFO] Final Memory: 240M/455M
>> >> >> [INFO]
>> >> >>
>> >> >>
>> 
>> >> >> [ERROR] GC overhead limit exceeded -> [Help 1]
>> >> >> [ERROR]
>> >> >> [ERROR] To see the full stack trace of the errors, re-run Maven with
>> >> >> the
>> >> >> -e switch.
>> >> >> [ERROR] Re-run Maven using the -X switch to enable full debug
>> logging.
>> >> >> [ERROR]
>> >> >> [ERROR] For more information about the errors and possible
>> soluti

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
I've stopped the jobs, the workers, and the master. Deleted the contents of
the checkpointing dir. Then restarted master, workers, and consumers.

I'm seeing the job in question still firing every 10 seconds.  I'm seeing
the 10 seconds in the Spark Jobs GUI page as well as our logs.  Seems quite
strange given that the jobs used to fire every 1 second, we've switched to
10, now trying to switch to 20 and batch duration millis is not changing.

Does anything stand out in the code perhaps?

On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger  wrote:

> Have you tried deleting or moving the contents of the checkpoint directory
> and restarting the job?
>
> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Sorry, more relevant code below:
>>
>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>> JavaStreamingContext jssc = params.isCheckpointed() ?
>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>> params);
>> jssc.start();
>> jssc.awaitTermination();
>> jssc.close();
>> ………..
>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>> sparkConf, Parameters params) {
>> JavaStreamingContextFactory factory = new
>> JavaStreamingContextFactory() {
>>   @Override
>>   public JavaStreamingContext create() {
>> return createContext(sparkConf, params);
>>   }
>> };
>> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>> factory);
>>   }
>>
>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>> Parameters params) {
>> // Create context with the specified batch interval, in milliseconds.
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.milliseconds(params.getBatchDurationMillis()));
>> // Set the checkpoint directory, if we're checkpointing
>> if (params.isCheckpointed()) {
>>   jssc.checkpoint(params.getCheckpointDir());
>> }
>>
>> Set topicsSet = new HashSet(Arrays.asList(params
>> .getTopic()));
>>
>> // Set the Kafka parameters.
>> Map kafkaParams = new HashMap();
>> kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
>> .getBrokerList());
>> if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>>   kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
>> .getAutoOffsetReset());
>> }
>>
>> // Create direct Kafka stream with the brokers and the topic.
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(
>>   jssc,
>>   String.class,
>>   String.class,
>>   StringDecoder.class,
>>   StringDecoder.class,
>>   kafkaParams,
>>   topicsSet);
>>
>> // See if there's an override of the default checkpoint duration.
>> if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
>>   messages.checkpoint(Durations.milliseconds(params
>> .getCheckpointMillis()));
>> }
>>
>> JavaDStream messageBodies = messages.map(new
>> Function, String>() {
>>   @Override
>>   public String call(Tuple2 tuple2) {
>> return tuple2._2();
>>   }
>> });
>>
>> messageBodies.foreachRDD(new Function, Void>() {
>>   @Override
>>   public Void call(JavaRDD rdd) throws Exception {
>> ProcessPartitionFunction func = new
>> ProcessPartitionFunction(params);
>> rdd.foreachPartition(func);
>> return null;
>>   }
>> });
>>
>> return jssc;
>> }
>>
>> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> I'd think that we wouldn't be "accidentally recovering from checkpoint"
>>> hours or even days after consumers have been restarted, plus the content is
>>> the fresh content that I'm feeding, not some content that had been fed
>>> before the last restart.
>>>
>>> The code is basically as follows:
>>>
>>> SparkConf sparkConf = createSparkConf(...);
>>> // We'd be 'checkpointed' because we specify a checkpoint directory
>>> which makes isCheckpointed true
>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>>> params);jssc.start();
>>>
>>> jssc.awaitTermination();
>>>
>>> jssc.close();
>>>
>>>
>>>
>>> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das 
>>> wrote:
>>>
 Are you sure you are not accidentally recovering from checkpoint? How
 are you using StreamingContext.getOrCreate() in your code?

 TD

 On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> Tathagata,
>
> In our logs I see the batch duration millis being set first to 10 then
> to 20 seconds. I don't see the 20 being reflected later during ingestion.
>
> In the Spark UI under Streaming I see the below output, notice the *10
> second* Batch interval.  Can you think of a reason why it's stuck at
> 10?  It used to be 1 second by the way, then

Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
hmm...out of ideas.
can you check in spark ui environment tab that this jar is not somehow
appears 2 times or more...? or more generally - any 2 jars that can contain
this class by any chance

regarding your question about classloader - no idea, probably there is, I
remember stackoverflow has some examples on how to print all classes, but
how to print all classes of kryo classloader - no idea.

On 8 September 2015 at 16:43, Nick Peterson  wrote:

> Yes, the jar contains the class:
>
> $ jar -tf lumiata-evaluation-assembly-1.0.jar | grep 2028/Document/Document
> com/i2028/Document/Document$1.class
> com/i2028/Document/Document.class
>
> What else can I do?  Is there any way to get more information about the
> classes available to the particular classloader kryo is using?
>
> On Tue, Sep 8, 2015 at 6:34 AM Igor Berman  wrote:
>
>> java.lang.ClassNotFoundException: com.i2028.Document.Document
>>
>> 1. so have you checked that jar that you create(fat jar) contains this class?
>>
>> 2. might be there is some stale cache issue...not sure though
>>
>>
>> On 8 September 2015 at 16:12, Nicholas R. Peterson 
>> wrote:
>>
>>> Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot to 
>>> include the list.)
>>>
>>>
>>> 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 
>>> 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: 
>>> com.esotericsoftware.kryo.KryoException: Error constructing instance of 
>>> class: com.lumiata.patientanalysis.utils.CachedGraph
>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>> at 
>>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
>>> at 
>>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing 
>>> instance of class: com.lumiata.patientanalysis.utils.CachedGraph
>>> at 
>>> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>>> at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>>> at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>> at 
>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>>> ... 24 more
>>> Caused by: java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>> at 
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>> at 
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstanc

Re: Java vs. Scala for Spark

2015-09-08 Thread Igor Berman
we are using java7..its much more verbose that java8 or scala examples
in addition there sometimes libraries that has no java  api, so you need to
write them by yourself(e.g. graphx)
on the other hand, scala is not trivial language like java, so it depends
on your team

On 8 September 2015 at 17:44, Bryan Jeffrey  wrote:

> Thank you for the quick responses.  It's useful to have some insight from
> folks already extensively using Spark.
>
> Regards,
>
> Bryan Jeffrey
>
> On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen  wrote:
>
>> Why would Scala vs Java performance be different Ted? Relatively
>> speaking there is almost no runtime difference; it's the same APIs or
>> calls via a thin wrapper. Scala/Java vs Python is a different story.
>>
>> Java libraries can be used in Scala. Vice-versa too, though calling
>> Scala-generated classes can be clunky in Java. What's your concern
>> about interoperability Jeffrey?
>>
>> I disagree that Java 7 vs Scala usability is sooo different, but it's
>> certainly much more natural to use Spark in Scala. Java 8 closes a lot
>> of the usability gap with Scala, but not all of it. Enough that it's
>> not crazy for a Java shop to stick to Java 8 + Spark and not be at a
>> big disadvantage.
>>
>> The downsides of Scala IMHO are that it provides too much: lots of
>> nice features (closures! superb collections!), lots of rope to hang
>> yourself too (implicits sometimes!) and some WTF features (XML
>> literals!) Learning the good useful bits of Scala isn't hard. You can
>> always write Scala code as much like Java as you like, I find.
>>
>> Scala tooling is different from Java tooling; that's an
>> underappreciated barrier. For example I think SBT is good for
>> development, bad for general project lifecycle management compared to
>> Maven, but in any event still less developed. SBT/scalac are huge
>> resource hogs, since so much of Scala is really implemented in the
>> compiler; prepare to update your laptop to develop in Scala on your
>> IDE of choice, and start to think about running long-running compile
>> servers like we did in the year 2000.
>>
>> Still net-net I would choose Scala, FWIW.
>>
>> On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu  wrote:
>> > Performance wise, Scala is by far the best choice when you use Spark.
>> >
>> > The cost of learning Scala is not negligible but not insurmountable
>> either.
>> >
>> > My personal opinion.
>> >
>> > On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey 
>> > wrote:
>> >>
>> >> All,
>> >>
>> >> We're looking at language choice in developing a simple streaming
>> >> processing application in spark.  We've got a small set of example code
>> >> built in Scala.  Articles like the following:
>> >>
>> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
>> >> would seem to indicate that Scala is great for use in distributed
>> >> programming (including Spark).  However, there is a large group of
>> folks
>> >> that seem to feel that interoperability with other Java libraries is
>> much to
>> >> be desired, and that the cost of learning (yet another) language is
>> quite
>> >> high.
>> >>
>> >> Has anyone looked at Scala for Spark dev in an enterprise environment?
>> >> What was the outcome?
>> >>
>> >> Regards,
>> >>
>> >> Bryan Jeffrey
>> >
>> >
>>
>
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Nick Peterson
Yeah... none of the jars listed on the classpath contain this class.  The
only jar that does is the fat jar that I'm submitting with spark-submit,
which as mentioned isn't showing up on the classpath anywhere.

-- Nick

On Tue, Sep 8, 2015 at 8:26 AM Igor Berman  wrote:

> hmm...out of ideas.
> can you check in spark ui environment tab that this jar is not somehow
> appears 2 times or more...? or more generally - any 2 jars that can contain
> this class by any chance
>
> regarding your question about classloader - no idea, probably there is, I
> remember stackoverflow has some examples on how to print all classes, but
> how to print all classes of kryo classloader - no idea.
>
> On 8 September 2015 at 16:43, Nick Peterson  wrote:
>
>> Yes, the jar contains the class:
>>
>> $ jar -tf lumiata-evaluation-assembly-1.0.jar | grep
>> 2028/Document/Document
>> com/i2028/Document/Document$1.class
>> com/i2028/Document/Document.class
>>
>> What else can I do?  Is there any way to get more information about the
>> classes available to the particular classloader kryo is using?
>>
>> On Tue, Sep 8, 2015 at 6:34 AM Igor Berman  wrote:
>>
>>> java.lang.ClassNotFoundException: com.i2028.Document.Document
>>>
>>> 1. so have you checked that jar that you create(fat jar) contains this 
>>> class?
>>>
>>> 2. might be there is some stale cache issue...not sure though
>>>
>>>
>>> On 8 September 2015 at 16:12, Nicholas R. Peterson >> > wrote:
>>>
 Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot to 
 include the list.)


 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 
 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: 
 com.esotericsoftware.kryo.KryoException: Error constructing instance of 
 class: com.lumiata.patientanalysis.utils.CachedGraph
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
at 
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at 
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
 com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
at 
 com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
at 
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
 Caused by: com.esotericsoftware.kryo.KryoException: Error constructing 
 instance of class: com.lumiata.patientanalysis.utils.CachedGraph
at 
 com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
at 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
at 
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scal

Re: Partitioning a RDD for training multiple classifiers

2015-09-08 Thread Ben Tucker
Hi Maximo —

This is a relatively naive answer, but I would consider structuring the RDD
into a DataFrame, then saving the 'splits' using something like
DataFrame.write.parquet(hdfs_path, byPartition=('client')). You could then
read a DataFrame from each resulting parquet directory and do your
per-client work from these. You mention re-using the splits, so this
solution might be worth the file-writing time.

Does anyone know of a method that gets a collection of DataFrames — one for
each partition, in the byPartition=('client') sense — from a 'big'
DataFrame? Basically, the equivalent of writing by partition and creating a
DataFrame for each result, but skipping the HDFS step.


On Tue, Sep 8, 2015 at 10:47 AM, Maximo Gurmendez 
wrote:

> Hi,
> I have a RDD that needs to be split (say, by client) in order to train
> n models (i.e. one for each client). Since most of the classifiers that
> come with ml-lib only can accept an RDD as input (and cannot build multiple
> models in one pass - as I understand it), the only way to train n separate
> models is to create n RDDs (by filtering the original one).
>
> Conceptually:
>
> rdd1,rdd2,rdd3 = splitRdds(bigRdd)
>
> the function splitRdd would use the standard filter mechanism .  I would
> then need to submit n training spark jobs. When I do this, will it mean
> that it will traverse the bigRdd n times? Is there a better way to persist
> the splitted rdd (i.e. save the split RDD in a cache)?
>
> I could cache the bigRdd, but not sure that would be ver efficient either
> since it will require the same number of passes anyway (I think - but I’m
> relatively new to Spark). Also I’m planning on reusing the individual
> splits (rdd1, rdd2, etc so would be convenient to have them individually
> cached).
>
> Another problem is that the splits are could be very skewed (i.e. one
> split could represent a large percentage of the original bigRdd ). So
> saving the split RDDs to disk (at least, naively) could be a challenge.
>
> Is there any better way of doing this?
>
> Thanks!
>Máximo
>
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
another idea - you can add this fat jar explicitly to the classpath of
executors...it's not a solution, but might be it work...
I mean place it somewhere locally on executors and add it to cp with
spark.executor.extraClassPath

On 8 September 2015 at 18:30, Nick Peterson  wrote:

> Yeah... none of the jars listed on the classpath contain this class.  The
> only jar that does is the fat jar that I'm submitting with spark-submit,
> which as mentioned isn't showing up on the classpath anywhere.
>
> -- Nick
>
> On Tue, Sep 8, 2015 at 8:26 AM Igor Berman  wrote:
>
>> hmm...out of ideas.
>> can you check in spark ui environment tab that this jar is not somehow
>> appears 2 times or more...? or more generally - any 2 jars that can contain
>> this class by any chance
>>
>> regarding your question about classloader - no idea, probably there is, I
>> remember stackoverflow has some examples on how to print all classes, but
>> how to print all classes of kryo classloader - no idea.
>>
>> On 8 September 2015 at 16:43, Nick Peterson  wrote:
>>
>>> Yes, the jar contains the class:
>>>
>>> $ jar -tf lumiata-evaluation-assembly-1.0.jar | grep
>>> 2028/Document/Document
>>> com/i2028/Document/Document$1.class
>>> com/i2028/Document/Document.class
>>>
>>> What else can I do?  Is there any way to get more information about the
>>> classes available to the particular classloader kryo is using?
>>>
>>> On Tue, Sep 8, 2015 at 6:34 AM Igor Berman 
>>> wrote:
>>>
 java.lang.ClassNotFoundException: com.i2028.Document.Document

 1. so have you checked that jar that you create(fat jar) contains this 
 class?

 2. might be there is some stale cache issue...not sure though


 On 8 September 2015 at 16:12, Nicholas R. Peterson <
 nrpeter...@gmail.com> wrote:

> Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot to 
> include the list.)
>
>
> 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 
> 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: 
> com.esotericsoftware.kryo.KryoException: Error constructing instance of 
> class: com.lumiata.patientanalysis.utils.CachedGraph
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>   at 
> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
>   at 
> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>   at org.apache.spark.scheduler.Task.run(Task.scala:70)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing 
> instance of class: com.lumiata.patientanalysis.utils.CachedGraph
>   at 
> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>   at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>   at 
> org.apac

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Cody Koeninger
Well, I'm not sure why you're checkpointing messages.

I'd also put in some logging to see what values are actually being read out
of your params object for the various settings.


On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg  wrote:

> I've stopped the jobs, the workers, and the master. Deleted the contents
> of the checkpointing dir. Then restarted master, workers, and consumers.
>
> I'm seeing the job in question still firing every 10 seconds.  I'm seeing
> the 10 seconds in the Spark Jobs GUI page as well as our logs.  Seems quite
> strange given that the jobs used to fire every 1 second, we've switched to
> 10, now trying to switch to 20 and batch duration millis is not changing.
>
> Does anything stand out in the code perhaps?
>
> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger  wrote:
>
>> Have you tried deleting or moving the contents of the checkpoint
>> directory and restarting the job?
>>
>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Sorry, more relevant code below:
>>>
>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>>> params);
>>> jssc.start();
>>> jssc.awaitTermination();
>>> jssc.close();
>>> ………..
>>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>>> sparkConf, Parameters params) {
>>> JavaStreamingContextFactory factory = new
>>> JavaStreamingContextFactory() {
>>>   @Override
>>>   public JavaStreamingContext create() {
>>> return createContext(sparkConf, params);
>>>   }
>>> };
>>> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>> factory);
>>>   }
>>>
>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>> Parameters params) {
>>> // Create context with the specified batch interval, in
>>> milliseconds.
>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>> // Set the checkpoint directory, if we're checkpointing
>>> if (params.isCheckpointed()) {
>>>   jssc.checkpoint(params.getCheckpointDir());
>>> }
>>>
>>> Set topicsSet = new HashSet(Arrays.asList(params
>>> .getTopic()));
>>>
>>> // Set the Kafka parameters.
>>> Map kafkaParams = new HashMap();
>>> kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
>>> .getBrokerList());
>>> if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>>>   kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
>>> .getAutoOffsetReset());
>>> }
>>>
>>> // Create direct Kafka stream with the brokers and the topic.
>>> JavaPairInputDStream messages =
>>> KafkaUtils.createDirectStream(
>>>   jssc,
>>>   String.class,
>>>   String.class,
>>>   StringDecoder.class,
>>>   StringDecoder.class,
>>>   kafkaParams,
>>>   topicsSet);
>>>
>>> // See if there's an override of the default checkpoint duration.
>>> if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
>>>   messages.checkpoint(Durations.milliseconds(params
>>> .getCheckpointMillis()));
>>> }
>>>
>>> JavaDStream messageBodies = messages.map(new
>>> Function, String>() {
>>>   @Override
>>>   public String call(Tuple2 tuple2) {
>>> return tuple2._2();
>>>   }
>>> });
>>>
>>> messageBodies.foreachRDD(new Function, Void>() {
>>>   @Override
>>>   public Void call(JavaRDD rdd) throws Exception {
>>> ProcessPartitionFunction func = new
>>> ProcessPartitionFunction(params);
>>> rdd.foreachPartition(func);
>>> return null;
>>>   }
>>> });
>>>
>>> return jssc;
>>> }
>>>
>>> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 I'd think that we wouldn't be "accidentally recovering from checkpoint"
 hours or even days after consumers have been restarted, plus the content is
 the fresh content that I'm feeding, not some content that had been fed
 before the last restart.

 The code is basically as follows:

 SparkConf sparkConf = createSparkConf(...);
 // We'd be 'checkpointed' because we specify a checkpoint directory
 which makes isCheckpointed true
 JavaStreamingContext jssc = params.isCheckpointed() ?
 createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
 params);jssc.start();

 jssc.awaitTermination();

 jssc.close();



 On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das 
 wrote:

> Are you sure you are not accidentally recovering from checkpoint? How
> are you using StreamingContext.getOrCreate() in your code?
>
> TD
>
> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>

Re: [streaming] DStream with window performance issue

2015-09-08 Thread Alexey Ponkin
Ok.
Spark 1.4.1 on yarn

Here is my application
I have 4 different Kafka topics(different object streams)

type Edge = (String,String)

val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( nonEmpty 
).map( toEdge )
val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( nonEmpty 
).map( toEdge )
val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( nonEmpty 
).map( toEdge )

val u = a union b union c

val source = u.window(Seconds(600), Seconds(10))

val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( nonEmpty 
).map( toEdge )

val joinResult = source.rightOuterJoin( z )
joinResult.foreachRDD { rdd=>
  rdd.foreachPartition { partition =>
  // save to result topic in kafka
   }
 }

The 'window' function in the code above is constantly growing,
no matter how many events appeared in corresponding kafka topics

but if I change one line from   

val source = u.window(Seconds(600), Seconds(10))

to 

val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))

val source = u.transform(_.partitionBy(partitioner.value) 
).window(Seconds(600), Seconds(10))

Everything works perfect.

Perhaps the problem was in WindowedDStream

I forced to use PartitionerAwareUnionRDD( partitionBy the same partitioner ) 
instead of UnionRDD.

Nonetheless I did not see any hints about such a bahaviour in doc.
Is it a bug or absolutely normal behaviour?





08.09.2015, 17:03, "Cody Koeninger" :
>  Can you provide more info (what version of spark, code example)?
>
>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin  wrote:
>>  Hi,
>>
>>  I have an application with 2 streams, which are joined together.
>>  Stream1 - is simple DStream(relativly small size batch chunks)
>>  Stream2 - is a windowed DStream(with duration for example 60 seconds)
>>
>>  Stream1 and Stream2 are Kafka direct stream.
>>  The problem is that according to logs window operation is constantly 
>> increasing(> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php";>screen).
>>  And also I see gap in pocessing window(> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen)
>>  in logs there are no events in that period.
>>  So what is happen in that gap and why window is constantly insreasing?
>>
>>  Thank you in advance
>>
>>  -
>>  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>  For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Compress JSON dataframes

2015-09-08 Thread Saif.A.Ellafi
Hi,

I am trying to figure out a way to compress df.write.json() but have not been 
succesful, even changing spark.io.compression.

Any thoughts?
Saif



Re: Java vs. Scala for Spark

2015-09-08 Thread Dean Wampler
It's true that Java 8 lambdas help. If you've read Learning Spark, where
they use Java 7, Python, and Scala for the examples, it really shows how
awful Java without lambdas is for Spark development.

Still, there are several "power tools" in Scala I would sorely miss using
Java 8:

1. The REPL (interpreter): I do most of my work in the REPL, then move the
code to compiled code when I'm ready to turn it into a batch job. Even
better, use Spark Notebook ! (and on GitHub
).
2. Tuples: It's just too convenient to use tuples for schemas, return
values from functions, etc., etc., etc.,
3. Pattern matching: This has no analog in Java, so it's hard to appreciate
it until you understand it, but see this example

for a taste of how concise it makes code!
4. Type inference: Spark really shows its utility. It means a lot less code
to write, but you get the hints of what you just wrote!

My $0.02.

dean


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Tue, Sep 8, 2015 at 10:28 AM, Igor Berman  wrote:

> we are using java7..its much more verbose that java8 or scala examples
> in addition there sometimes libraries that has no java  api, so you need
> to write them by yourself(e.g. graphx)
> on the other hand, scala is not trivial language like java, so it depends
> on your team
>
> On 8 September 2015 at 17:44, Bryan Jeffrey 
> wrote:
>
>> Thank you for the quick responses.  It's useful to have some insight from
>> folks already extensively using Spark.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen  wrote:
>>
>>> Why would Scala vs Java performance be different Ted? Relatively
>>> speaking there is almost no runtime difference; it's the same APIs or
>>> calls via a thin wrapper. Scala/Java vs Python is a different story.
>>>
>>> Java libraries can be used in Scala. Vice-versa too, though calling
>>> Scala-generated classes can be clunky in Java. What's your concern
>>> about interoperability Jeffrey?
>>>
>>> I disagree that Java 7 vs Scala usability is sooo different, but it's
>>> certainly much more natural to use Spark in Scala. Java 8 closes a lot
>>> of the usability gap with Scala, but not all of it. Enough that it's
>>> not crazy for a Java shop to stick to Java 8 + Spark and not be at a
>>> big disadvantage.
>>>
>>> The downsides of Scala IMHO are that it provides too much: lots of
>>> nice features (closures! superb collections!), lots of rope to hang
>>> yourself too (implicits sometimes!) and some WTF features (XML
>>> literals!) Learning the good useful bits of Scala isn't hard. You can
>>> always write Scala code as much like Java as you like, I find.
>>>
>>> Scala tooling is different from Java tooling; that's an
>>> underappreciated barrier. For example I think SBT is good for
>>> development, bad for general project lifecycle management compared to
>>> Maven, but in any event still less developed. SBT/scalac are huge
>>> resource hogs, since so much of Scala is really implemented in the
>>> compiler; prepare to update your laptop to develop in Scala on your
>>> IDE of choice, and start to think about running long-running compile
>>> servers like we did in the year 2000.
>>>
>>> Still net-net I would choose Scala, FWIW.
>>>
>>> On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu  wrote:
>>> > Performance wise, Scala is by far the best choice when you use Spark.
>>> >
>>> > The cost of learning Scala is not negligible but not insurmountable
>>> either.
>>> >
>>> > My personal opinion.
>>> >
>>> > On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey >> >
>>> > wrote:
>>> >>
>>> >> All,
>>> >>
>>> >> We're looking at language choice in developing a simple streaming
>>> >> processing application in spark.  We've got a small set of example
>>> code
>>> >> built in Scala.  Articles like the following:
>>> >>
>>> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
>>> >> would seem to indicate that Scala is great for use in distributed
>>> >> programming (including Spark).  However, there is a large group of
>>> folks
>>> >> that seem to feel that interoperability with other Java libraries is
>>> much to
>>> >> be desired, and that the cost of learning (yet another) language is
>>> quite
>>> >> high.
>>> >>
>>> >> Has anyone looked at Scala for Spark dev in an enterprise environment?
>>> >> What was the outcome?
>>> >>
>>> >> Regards,
>>> >>
>>> >> Bryan Jeffrey
>>> >
>>> >
>>>
>>
>>
>


Re: [streaming] DStream with window performance issue

2015-09-08 Thread Cody Koeninger
I'm not 100% sure what's going on there, but why are you doing a union in
the first place?

If you want multiple topics in a stream, just pass them all in the set of
topics to one call to createDirectStream

On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin  wrote:

> Ok.
> Spark 1.4.1 on yarn
>
> Here is my application
> I have 4 different Kafka topics(different object streams)
>
> type Edge = (String,String)
>
> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( nonEmpty
> ).map( toEdge )
> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( nonEmpty
> ).map( toEdge )
> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( nonEmpty
> ).map( toEdge )
>
> val u = a union b union c
>
> val source = u.window(Seconds(600), Seconds(10))
>
> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( nonEmpty
> ).map( toEdge )
>
> val joinResult = source.rightOuterJoin( z )
> joinResult.foreachRDD { rdd=>
>   rdd.foreachPartition { partition =>
>   // save to result topic in kafka
>}
>  }
>
> The 'window' function in the code above is constantly growing,
> no matter how many events appeared in corresponding kafka topics
>
> but if I change one line from
>
> val source = u.window(Seconds(600), Seconds(10))
>
> to
>
> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
>
> val source = u.transform(_.partitionBy(partitioner.value)
> ).window(Seconds(600), Seconds(10))
>
> Everything works perfect.
>
> Perhaps the problem was in WindowedDStream
>
> I forced to use PartitionerAwareUnionRDD( partitionBy the same partitioner
> ) instead of UnionRDD.
>
> Nonetheless I did not see any hints about such a bahaviour in doc.
> Is it a bug or absolutely normal behaviour?
>
>
>
>
>
> 08.09.2015, 17:03, "Cody Koeninger" :
> >  Can you provide more info (what version of spark, code example)?
> >
> >  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin 
> wrote:
> >>  Hi,
> >>
> >>  I have an application with 2 streams, which are joined together.
> >>  Stream1 - is simple DStream(relativly small size batch chunks)
> >>  Stream2 - is a windowed DStream(with duration for example 60 seconds)
> >>
> >>  Stream1 and Stream2 are Kafka direct stream.
> >>  The problem is that according to logs window operation is constantly
> increasing(http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php
> ">screen).
> >>  And also I see gap in pocessing window(http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen)
> in logs there are no events in that period.
> >>  So what is happen in that gap and why window is constantly insreasing?
> >>
> >>  Thank you in advance
> >>
> >>  -
> >>  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>  For additional commands, e-mail: user-h...@spark.apache.org
>


Spark with proxy

2015-09-08 Thread Mohammad Tariq
Hi friends,

Is it possible to interact with Amazon S3 using Spark via a proxy? This is
what I have been doing :

SparkConf conf = new
SparkConf().setAppName("MyApp").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
Configuration hadoopConf = sparkContext.hadoopConfiguration();
hadoopConf.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem");
hadoopConf.set("fs.s3n.awsAccessKeyId", "***");
hadoopConf.set("fs.s3n.awsSecretAccessKey", "***");
hadoopConf.set("httpclient.proxy-autodetect", "false");
hadoopConf.set("httpclient.proxy-host", "***");
hadoopConf.set("httpclient.proxy-port", "");
SQLContext sqlContext = new SQLContext(sparkContext);

But whenever I try to run it, it says :

java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:625)
at
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:524)
at
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:403)
at
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177)
at
org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144)
at
org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:131)
at
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:611)
at
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:446)
at
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326)
at
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277)
at
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038)
at
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250)
at
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179)
at
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
at
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at org.apache.hadoop.fs.s3native.$Proxy21.retrieveMetadata(Unknown Source)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
at org.apache.hadoop.fs.Globber.glob(Globber.java:248)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1623)
at com.databricks.spark.avro.AvroRelation.newReader(AvroRelation.scala:105)
at com.databricks.spark.avro.AvroRelation.(AvroRelation.scala:60)
at
com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:41)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:219)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:673)

The same proxy is working fine with AWS S3 Java API, and the JetS3t API. I
could not find any document that explains about setting proxies in a Spark
program. Could someone please point me to the right direction?

Many thanks.

Thank

[image: http://]
Tariq, Mohammad
about.me/mti
[image: http://]



Re: [streaming] DStream with window performance issue

2015-09-08 Thread Понькин Алексей
The thing is, that these topics contain absolutely different AVRO 
objects(Array[Byte]) that I need to deserialize to different Java(Scala) 
objects, filter and then map to tuple (String, String). So i have 3 streams 
with different avro object in there. I need to cast them(using some business 
rules) to pairs and unite.

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1&t=1


08.09.2015, 19:11, "Cody Koeninger" :
> I'm not 100% sure what's going on there, but why are you doing a union in the 
> first place?
>
> If you want multiple topics in a stream, just pass them all in the set of 
> topics to one call to createDirectStream
>
> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin  wrote:
>> Ok.
>> Spark 1.4.1 on yarn
>>
>> Here is my application
>> I have 4 different Kafka topics(different object streams)
>>
>> type Edge = (String,String)
>>
>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( nonEmpty 
>> ).map( toEdge )
>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( nonEmpty 
>> ).map( toEdge )
>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( nonEmpty 
>> ).map( toEdge )
>>
>> val u = a union b union c
>>
>> val source = u.window(Seconds(600), Seconds(10))
>>
>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( nonEmpty 
>> ).map( toEdge )
>>
>> val joinResult = source.rightOuterJoin( z )
>> joinResult.foreachRDD { rdd=>
>>   rdd.foreachPartition { partition =>
>>       // save to result topic in kafka
>>    }
>>  }
>>
>> The 'window' function in the code above is constantly growing,
>> no matter how many events appeared in corresponding kafka topics
>>
>> but if I change one line from
>>
>> val source = u.window(Seconds(600), Seconds(10))
>>
>> to
>>
>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
>>
>> val source = u.transform(_.partitionBy(partitioner.value) 
>> ).window(Seconds(600), Seconds(10))
>>
>> Everything works perfect.
>>
>> Perhaps the problem was in WindowedDStream
>>
>> I forced to use PartitionerAwareUnionRDD( partitionBy the same partitioner ) 
>> instead of UnionRDD.
>>
>> Nonetheless I did not see any hints about such a bahaviour in doc.
>> Is it a bug or absolutely normal behaviour?
>>
>> 08.09.2015, 17:03, "Cody Koeninger" :
>>
>>>  Can you provide more info (what version of spark, code example)?
>>>
>>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin  wrote:
  Hi,

  I have an application with 2 streams, which are joined together.
  Stream1 - is simple DStream(relativly small size batch chunks)
  Stream2 - is a windowed DStream(with duration for example 60 seconds)

  Stream1 and Stream2 are Kafka direct stream.
  The problem is that according to logs window operation is constantly 
 increasing(>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php";>screen).
  And also I see gap in pocessing window(>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen)
  in logs there are no events in that period.
  So what is happen in that gap and why window is constantly insreasing?

  Thank you in advance

  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [streaming] DStream with window performance issue

2015-09-08 Thread Cody Koeninger
That doesn't really matter.  With the direct stream you'll get all objects
for a given topicpartition in the same spark partition.  You know what
topic it's from via hasOffsetRanges.  Then you can deserialize
appropriately based on topic.

On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей 
wrote:

> The thing is, that these topics contain absolutely different AVRO
> objects(Array[Byte]) that I need to deserialize to different Java(Scala)
> objects, filter and then map to tuple (String, String). So i have 3 streams
> with different avro object in there. I need to cast them(using some
> business rules) to pairs and unite.
>
> --
> Яндекс.Почта — надёжная почта
> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>
>
> 08.09.2015, 19:11, "Cody Koeninger" :
> > I'm not 100% sure what's going on there, but why are you doing a union
> in the first place?
> >
> > If you want multiple topics in a stream, just pass them all in the set
> of topics to one call to createDirectStream
> >
> > On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin 
> wrote:
> >> Ok.
> >> Spark 1.4.1 on yarn
> >>
> >> Here is my application
> >> I have 4 different Kafka topics(different object streams)
> >>
> >> type Edge = (String,String)
> >>
> >> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter(
> nonEmpty ).map( toEdge )
> >> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter(
> nonEmpty ).map( toEdge )
> >> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val u = a union b union c
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val joinResult = source.rightOuterJoin( z )
> >> joinResult.foreachRDD { rdd=>
> >>   rdd.foreachPartition { partition =>
> >>   // save to result topic in kafka
> >>}
> >>  }
> >>
> >> The 'window' function in the code above is constantly growing,
> >> no matter how many events appeared in corresponding kafka topics
> >>
> >> but if I change one line from
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> to
> >>
> >> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
> >>
> >> val source = u.transform(_.partitionBy(partitioner.value)
> ).window(Seconds(600), Seconds(10))
> >>
> >> Everything works perfect.
> >>
> >> Perhaps the problem was in WindowedDStream
> >>
> >> I forced to use PartitionerAwareUnionRDD( partitionBy the same
> partitioner ) instead of UnionRDD.
> >>
> >> Nonetheless I did not see any hints about such a bahaviour in doc.
> >> Is it a bug or absolutely normal behaviour?
> >>
> >> 08.09.2015, 17:03, "Cody Koeninger" :
> >>
> >>>  Can you provide more info (what version of spark, code example)?
> >>>
> >>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin 
> wrote:
>   Hi,
> 
>   I have an application with 2 streams, which are joined together.
>   Stream1 - is simple DStream(relativly small size batch chunks)
>   Stream2 - is a windowed DStream(with duration for example 60 seconds)
> 
>   Stream1 and Stream2 are Kafka direct stream.
>   The problem is that according to logs window operation is constantly
> increasing(http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php
> ">screen).
>   And also I see gap in pocessing window(http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen)
> in logs there are no events in that period.
>   So what is happen in that gap and why window is constantly
> insreasing?
> 
>   Thank you in advance
> 
>   -
>   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>   For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Java vs. Scala for Spark

2015-09-08 Thread Jerry Lam
Hi Bryan,

I would choose a language based on the requirements. It does not make sense
if you have a lot of dependencies that are java-based components and
interoperability between java and scala is not always obvious.

I agree with the above comments that Java is much more verbose than Scala
in many cases if not all. However, I personally don't find the verbosity is
a key factor in choosing a language. For the sake of argument, will you be
discouraged if you need to write 3 lines of Java for 1 line of scala? I
really don't care the number of lines as long as I can finish the task
within a period of time.

I believe, correct me if I'm wrong please, all spark functionalities you
can find in Scala are also available in Java that includes the mllib,
sparksql, streaming, etc. So you won't miss any features of spark by using
Java.

It seems the questions should be
- what language do the developers are comfortable with?
- what are the components in the system that will constraint the choice of
the language?

Best Regards,

Jerry

On Tue, Sep 8, 2015 at 11:59 AM, Dean Wampler  wrote:

> It's true that Java 8 lambdas help. If you've read Learning Spark, where
> they use Java 7, Python, and Scala for the examples, it really shows how
> awful Java without lambdas is for Spark development.
>
> Still, there are several "power tools" in Scala I would sorely miss using
> Java 8:
>
> 1. The REPL (interpreter): I do most of my work in the REPL, then move the
> code to compiled code when I'm ready to turn it into a batch job. Even
> better, use Spark Notebook ! (and on GitHub
> ).
> 2. Tuples: It's just too convenient to use tuples for schemas, return
> values from functions, etc., etc., etc.,
> 3. Pattern matching: This has no analog in Java, so it's hard to
> appreciate it until you understand it, but see this example
> 
> for a taste of how concise it makes code!
> 4. Type inference: Spark really shows its utility. It means a lot less
> code to write, but you get the hints of what you just wrote!
>
> My $0.02.
>
> dean
>
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Tue, Sep 8, 2015 at 10:28 AM, Igor Berman 
> wrote:
>
>> we are using java7..its much more verbose that java8 or scala examples
>> in addition there sometimes libraries that has no java  api, so you need
>> to write them by yourself(e.g. graphx)
>> on the other hand, scala is not trivial language like java, so it depends
>> on your team
>>
>> On 8 September 2015 at 17:44, Bryan Jeffrey 
>> wrote:
>>
>>> Thank you for the quick responses.  It's useful to have some insight
>>> from folks already extensively using Spark.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>> On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen  wrote:
>>>
 Why would Scala vs Java performance be different Ted? Relatively
 speaking there is almost no runtime difference; it's the same APIs or
 calls via a thin wrapper. Scala/Java vs Python is a different story.

 Java libraries can be used in Scala. Vice-versa too, though calling
 Scala-generated classes can be clunky in Java. What's your concern
 about interoperability Jeffrey?

 I disagree that Java 7 vs Scala usability is sooo different, but it's
 certainly much more natural to use Spark in Scala. Java 8 closes a lot
 of the usability gap with Scala, but not all of it. Enough that it's
 not crazy for a Java shop to stick to Java 8 + Spark and not be at a
 big disadvantage.

 The downsides of Scala IMHO are that it provides too much: lots of
 nice features (closures! superb collections!), lots of rope to hang
 yourself too (implicits sometimes!) and some WTF features (XML
 literals!) Learning the good useful bits of Scala isn't hard. You can
 always write Scala code as much like Java as you like, I find.

 Scala tooling is different from Java tooling; that's an
 underappreciated barrier. For example I think SBT is good for
 development, bad for general project lifecycle management compared to
 Maven, but in any event still less developed. SBT/scalac are huge
 resource hogs, since so much of Scala is really implemented in the
 compiler; prepare to update your laptop to develop in Scala on your
 IDE of choice, and start to think about running long-running compile
 servers like we did in the year 2000.

 Still net-net I would choose Scala, FWIW.

 On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu  wrote:
 > Performance wise, Scala is by far the best choice when you use Spark.
 >
 > The cost of learning Scala is not negligible but not insurmo

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
Just verified the logic for passing the batch duration millis in, looks OK.
I see the value of 20 seconds being reflected in the logs - but not in the
spark ui.

Also, just commented out this piece and the consumer is still stuck at
using 10 seconds for batch duration millis.

//if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
//
messages.checkpoint(Durations.milliseconds(params.getCheckpointMillis()));
//}

The reason this is in the code is so that we can control the checkpointing
millis.  Doing this through the checkpoint() method seems the only way to
override the default value which is max(batchdurationmillis, 10seconds).
Is there a better way of doing this?

Thanks.


On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger  wrote:

> Well, I'm not sure why you're checkpointing messages.
>
> I'd also put in some logging to see what values are actually being read
> out of your params object for the various settings.
>
>
> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I've stopped the jobs, the workers, and the master. Deleted the contents
>> of the checkpointing dir. Then restarted master, workers, and consumers.
>>
>> I'm seeing the job in question still firing every 10 seconds.  I'm seeing
>> the 10 seconds in the Spark Jobs GUI page as well as our logs.  Seems quite
>> strange given that the jobs used to fire every 1 second, we've switched to
>> 10, now trying to switch to 20 and batch duration millis is not changing.
>>
>> Does anything stand out in the code perhaps?
>>
>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger 
>> wrote:
>>
>>> Have you tried deleting or moving the contents of the checkpoint
>>> directory and restarting the job?
>>>
>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 Sorry, more relevant code below:

 SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
 JavaStreamingContext jssc = params.isCheckpointed() ?
 createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
 params);
 jssc.start();
 jssc.awaitTermination();
 jssc.close();
 ………..
   private JavaStreamingContext createCheckpointedContext(SparkConf
 sparkConf, Parameters params) {
 JavaStreamingContextFactory factory = new
 JavaStreamingContextFactory() {
   @Override
   public JavaStreamingContext create() {
 return createContext(sparkConf, params);
   }
 };
 return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
 factory);
   }

   private JavaStreamingContext createContext(SparkConf sparkConf,
 Parameters params) {
 // Create context with the specified batch interval, in
 milliseconds.
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
 Durations.milliseconds(params.getBatchDurationMillis()));
 // Set the checkpoint directory, if we're checkpointing
 if (params.isCheckpointed()) {
   jssc.checkpoint(params.getCheckpointDir());
 }

 Set topicsSet = new HashSet(Arrays.asList(params
 .getTopic()));

 // Set the Kafka parameters.
 Map kafkaParams = new HashMap();
 kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST,
 params.getBrokerList());
 if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
   kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
 .getAutoOffsetReset());
 }

 // Create direct Kafka stream with the brokers and the topic.
 JavaPairInputDStream messages =
 KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
   topicsSet);

 // See if there's an override of the default checkpoint duration.
 if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
   messages.checkpoint(Durations.milliseconds(params
 .getCheckpointMillis()));
 }

 JavaDStream messageBodies = messages.map(new
 Function, String>() {
   @Override
   public String call(Tuple2 tuple2) {
 return tuple2._2();
   }
 });

 messageBodies.foreachRDD(new Function, Void>() {
   @Override
   public Void call(JavaRDD rdd) throws Exception {
 ProcessPartitionFunction func = new
 ProcessPartitionFunction(params);
 rdd.foreachPartition(func);
 return null;
   }
 });

 return jssc;
 }

 On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> I'd think that we wouldn't be "accidentally recovering from
> checkpoint" hours or even days after consumers have been restarted, plus
> th

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
I just disabled checkpointing in our consumers and I can see that the batch
duration millis set to 20 seconds is now being honored.

Why would that be the case?

And how can we "untie" batch duration millis from checkpointing?

Thanks.

On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger  wrote:

> Well, I'm not sure why you're checkpointing messages.
>
> I'd also put in some logging to see what values are actually being read
> out of your params object for the various settings.
>
>
> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I've stopped the jobs, the workers, and the master. Deleted the contents
>> of the checkpointing dir. Then restarted master, workers, and consumers.
>>
>> I'm seeing the job in question still firing every 10 seconds.  I'm seeing
>> the 10 seconds in the Spark Jobs GUI page as well as our logs.  Seems quite
>> strange given that the jobs used to fire every 1 second, we've switched to
>> 10, now trying to switch to 20 and batch duration millis is not changing.
>>
>> Does anything stand out in the code perhaps?
>>
>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger 
>> wrote:
>>
>>> Have you tried deleting or moving the contents of the checkpoint
>>> directory and restarting the job?
>>>
>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 Sorry, more relevant code below:

 SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
 JavaStreamingContext jssc = params.isCheckpointed() ?
 createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
 params);
 jssc.start();
 jssc.awaitTermination();
 jssc.close();
 ………..
   private JavaStreamingContext createCheckpointedContext(SparkConf
 sparkConf, Parameters params) {
 JavaStreamingContextFactory factory = new
 JavaStreamingContextFactory() {
   @Override
   public JavaStreamingContext create() {
 return createContext(sparkConf, params);
   }
 };
 return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
 factory);
   }

   private JavaStreamingContext createContext(SparkConf sparkConf,
 Parameters params) {
 // Create context with the specified batch interval, in
 milliseconds.
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
 Durations.milliseconds(params.getBatchDurationMillis()));
 // Set the checkpoint directory, if we're checkpointing
 if (params.isCheckpointed()) {
   jssc.checkpoint(params.getCheckpointDir());
 }

 Set topicsSet = new HashSet(Arrays.asList(params
 .getTopic()));

 // Set the Kafka parameters.
 Map kafkaParams = new HashMap();
 kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST,
 params.getBrokerList());
 if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
   kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
 .getAutoOffsetReset());
 }

 // Create direct Kafka stream with the brokers and the topic.
 JavaPairInputDStream messages =
 KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
   topicsSet);

 // See if there's an override of the default checkpoint duration.
 if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
   messages.checkpoint(Durations.milliseconds(params
 .getCheckpointMillis()));
 }

 JavaDStream messageBodies = messages.map(new
 Function, String>() {
   @Override
   public String call(Tuple2 tuple2) {
 return tuple2._2();
   }
 });

 messageBodies.foreachRDD(new Function, Void>() {
   @Override
   public Void call(JavaRDD rdd) throws Exception {
 ProcessPartitionFunction func = new
 ProcessPartitionFunction(params);
 rdd.foreachPartition(func);
 return null;
   }
 });

 return jssc;
 }

 On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> I'd think that we wouldn't be "accidentally recovering from
> checkpoint" hours or even days after consumers have been restarted, plus
> the content is the fresh content that I'm feeding, not some content that
> had been fed before the last restart.
>
> The code is basically as follows:
>
> SparkConf sparkConf = createSparkConf(...);
> // We'd be 'checkpointed' because we specify a checkpoint
> directory which makes isCheckpointed true
> JavaStreamingContext jssc = params.isCheckpointed() ?
> createCheckpointedContext(sparkConf, params) : createContext(spar

Different Kafka createDirectStream implementations

2015-09-08 Thread Dan Dutrow
The two methods of createDirectStream appear to have different
implementations, the second checks the offset.reset flags and does some
error handling while the first does not. Besides the use of a
messageHandler, are they intended to be used in different situations?

def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
ClassTag,
VD <: Decoder[V]: ClassTag,* R: ClassTag] *
( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets:
Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K, V] =>
R *
):

def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
ClassTag, VD <: Decoder[V]: ClassTag]
( ssc: StreamingContext, kafkaParams: Map[String, String], topics:
Set[String] )
-- 
Dan📱


Re: 1.5 Build Errors

2015-09-08 Thread Benjamin Zaitlen
I'm running zinv while compiling.  It seems that MAVEN_OPTS doesn't really
change much?  Or perhaps I'm misunderstanding something -- grepping for
java i see

root 24355  102  8.8 4687376 1350724 pts/4 Sl   16:51  11:08
> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -server -Xmx2g
> -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
> -Dzinc.home=/root/spark/build/zinc-0.3.5.3 -classpath
> /root/spark/build/zinc-0.3.5.3/lib/compiler-interface-sources.jar:/root/spark/build/zinc-0.3.5.3/lib/incremental-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/nailgun-server.jar:/root/spark/build/zinc-0.3.5.3/lib/sbt-interface.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-library.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-reflect.jar:/root/spark/build/zinc-0.3.5.3/lib/zinc.jar
> com.typesafe.zinc.Nailgun 3030 0
> root 25151 22.0  3.2 2269092 495276 pts/4  Sl+  16:53   1:56
> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -Xms256m -Xmx512m -classpath
> /opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/boot/plexus-classworlds-2.5.2.jar
> -Dclassworlds.conf=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/bin/m2.conf
> -Dmaven.home=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3
> -Dmaven.multiModuleProjectDirectory=/root/spark
> org.codehaus.plexus.classworlds.launcher.Launcher -DzincPort=3030 clean
> package -DskipTests -Pyarn -Phive -Phive-thriftserver -Phadoop-2.4
> -Dhadoop.version=2.4.0


So the heap size is still 2g even with MAVEN_OPTS set with 4g.  I noticed
that within build/mvn _COMPILE_JVM_OPTS is set to 2g and this is what
ZINC_OPTS is set to.

--Ben


On Tue, Sep 8, 2015 at 11:06 AM, Ted Yu  wrote:

> Do you run Zinc while compiling ?
>
> Cheers
>
> On Tue, Sep 8, 2015 at 7:56 AM, Benjamin Zaitlen 
> wrote:
>
>> I'm still getting errors with 3g.  I've increase to 4g and I'll report
>> back
>>
>> To be clear:
>>
>> export MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=1024M
>> -XX:ReservedCodeCacheSize=1024m"
>>
>> [ERROR] GC overhead limit exceeded -> [Help 1]
>>> [ERROR]
>>> [ERROR] To see the full stack trace of the errors, re-run Maven with the
>>> -e switch.
>>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>>> [ERROR]
>>> [ERROR] For more information about the errors and possible solutions,
>>> please read the following articles:
>>> [ERROR] [Help 1]
>>> http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
>>> + return 1
>>> + exit 1
>>
>>
>> On Tue, Sep 8, 2015 at 10:03 AM, Sean Owen  wrote:
>>
>>> It might need more memory in certain situations / running certain
>>> tests. If 3gb works for your relatively full build, yes you can open a
>>> PR to change any occurrences of lower recommendations to 3gb.
>>>
>>> On Tue, Sep 8, 2015 at 3:02 PM, Benjamin Zaitlen 
>>> wrote:
>>> > Ah, right.  Should've caught that.
>>> >
>>> > The docs seem to recommend 2gb.  Should that be increased as well?
>>> >
>>> > --Ben
>>> >
>>> > On Tue, Sep 8, 2015 at 9:33 AM, Sean Owen  wrote:
>>> >>
>>> >> It shows you there that Maven is out of memory. Give it more heap. I
>>> use
>>> >> 3gb.
>>> >>
>>> >> On Tue, Sep 8, 2015 at 1:53 PM, Benjamin Zaitlen 
>>> >> wrote:
>>> >> > Hi All,
>>> >> >
>>> >> > I'm trying to build a distribution off of the latest in master and I
>>> >> > keep
>>> >> > getting errors on MQTT and the build fails.   I'm running the build
>>> on a
>>> >> > m1.large which has 7.5 GB of RAM and no other major processes are
>>> >> > running.
>>> >> >
>>> >> >> MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
>>> -XX:ReservedCodeCacheSize=512m"
>>> >> >> ./make-distribution.sh  --name continuum-custom-spark-1.5 --tgz
>>> -Pyarn
>>> >> >> -Phive -Phive-thriftserver -Phadoop-2.4 -Dhadoop.version=2.4.0
>>> >> >
>>> >> >
>>> >> >
>>> >> >> INFO] Spark Project GraphX ... SUCCESS
>>> [
>>> >> >> 33.345 s]
>>> >> >> [INFO] Spark Project Streaming  SUCCESS
>>> >> >> [01:08
>>> >> >> min]
>>> >> >> [INFO] Spark Project Catalyst . SUCCESS
>>> >> >> [01:39
>>> >> >> min]
>>> >> >> [INFO] Spark Project SQL .. SUCCESS
>>> >> >> [02:06
>>> >> >> min]
>>> >> >> [INFO] Spark Project ML Library ... SUCCESS
>>> >> >> [02:16
>>> >> >> min]
>>> >> >> [INFO] Spark Project Tools 
>>> SUCCESS [
>>> >> >> 4.087 s]
>>> >> >> [INFO] Spark Project Hive . SUCCESS
>>> >> >> [01:28
>>> >> >> min]
>>> >> >> [INFO] Spark Project REPL .
>>> SUCCESS [
>>> >> >> 16.291 s]
>>> >> >> [INFO] Spark Project YARN Shuffle Service .
>>> SUCCESS [
>>> >> >> 13.671 s]
>>> >> >> [INFO] Spark Project YARN .
>>> SUCCESS [
>>> >> >> 20.554 s]
>>> >> >> [INFO] Spark Project Hive Thrift Server ...
>>> SUCCESS [
>>> >> >> 14.332 s]
>>> >> >> [INFO] Spark Project Assembly ..

Best way to import data from Oracle to Spark?

2015-09-08 Thread Cui Lin
What's the best way to import data from Oracle to Spark? Thanks!


-- 
Best regards!

Lin,Cui


Can Spark Provide Multiple Context Support?

2015-09-08 Thread Rachana Srivastava
Question: How does Spark support multiple context?

Background:  I have a stream of data coming to Spark from Kafka.   For each 
data in the stream I want to download some files from HDFS and process the file 
data.  I have written code to process the file from HDFS and I have code 
written to process stream data from Kafka using SparkStreaming API.  I have not 
been able to link both.

Can you please let me know if it is feasible to create JavaRDD from file inside 
SparkStreamingRDD job processing step?

Thanks,

Rachana


foreachRDD causing executor lost failure

2015-09-08 Thread Priya Ch
Hello All,

 I am using foreachRDD in my code as -

  dstream.foreachRDD { rdd => rdd.foreach { record => // look up with
cassandra table
// save updated rows to cassandra table.
}
}
 This foreachRDD is causing executor lost failure. what is the behavior of
this foreachRDD ???

Thanks,
Padma Ch


Re: 1.5 Build Errors

2015-09-08 Thread Sean Owen
MAVEN_OPTS shouldn't affect zinc as it's an unrelated application. You
can run "zinc -J-Xmx4g..." in general, but in the provided script,
ZINC_OPTS seems to be the equivalent, yes. It kind of looks like your
mvn process isn't getting any special memory args there. Is MAVEN_OPTS
really exported?

FWIW I use my own local mvn and zinc and it works fine.

On Tue, Sep 8, 2015 at 6:05 PM, Benjamin Zaitlen  wrote:
> I'm running zinv while compiling.  It seems that MAVEN_OPTS doesn't really
> change much?  Or perhaps I'm misunderstanding something -- grepping for java
> i see
>
>> root 24355  102  8.8 4687376 1350724 pts/4 Sl   16:51  11:08
>> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -server -Xmx2g
>> -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
>> -Dzinc.home=/root/spark/build/zinc-0.3.5.3 -classpath
>> /root/spark/build/zinc-0.3.5.3/lib/compiler-interface-sources.jar:/root/spark/build/zinc-0.3.5.3/lib/incremental-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/nailgun-server.jar:/root/spark/build/zinc-0.3.5.3/lib/sbt-interface.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-library.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-reflect.jar:/root/spark/build/zinc-0.3.5.3/lib/zinc.jar
>> com.typesafe.zinc.Nailgun 3030 0
>> root 25151 22.0  3.2 2269092 495276 pts/4  Sl+  16:53   1:56
>> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -Xms256m -Xmx512m -classpath
>> /opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/boot/plexus-classworlds-2.5.2.jar
>> -Dclassworlds.conf=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/bin/m2.conf
>> -Dmaven.home=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3
>> -Dmaven.multiModuleProjectDirectory=/root/spark
>> org.codehaus.plexus.classworlds.launcher.Launcher -DzincPort=3030 clean
>> package -DskipTests -Pyarn -Phive -Phive-thriftserver -Phadoop-2.4
>> -Dhadoop.version=2.4.0
>
>
> So the heap size is still 2g even with MAVEN_OPTS set with 4g.  I noticed
> that within build/mvn _COMPILE_JVM_OPTS is set to 2g and this is what
> ZINC_OPTS is set to.
>
> --Ben
>
>
> On Tue, Sep 8, 2015 at 11:06 AM, Ted Yu  wrote:
>>
>> Do you run Zinc while compiling ?
>>
>> Cheers
>>
>> On Tue, Sep 8, 2015 at 7:56 AM, Benjamin Zaitlen 
>> wrote:
>>>
>>> I'm still getting errors with 3g.  I've increase to 4g and I'll report
>>> back
>>>
>>> To be clear:
>>>
>>> export MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=1024M
>>> -XX:ReservedCodeCacheSize=1024m"
>>>
 [ERROR] GC overhead limit exceeded -> [Help 1]
 [ERROR]
 [ERROR] To see the full stack trace of the errors, re-run Maven with the
 -e switch.
 [ERROR] Re-run Maven using the -X switch to enable full debug logging.
 [ERROR]
 [ERROR] For more information about the errors and possible solutions,
 please read the following articles:
 [ERROR] [Help 1]
 http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
 + return 1
 + exit 1
>>>
>>>
>>> On Tue, Sep 8, 2015 at 10:03 AM, Sean Owen  wrote:

 It might need more memory in certain situations / running certain
 tests. If 3gb works for your relatively full build, yes you can open a
 PR to change any occurrences of lower recommendations to 3gb.

 On Tue, Sep 8, 2015 at 3:02 PM, Benjamin Zaitlen 
 wrote:
 > Ah, right.  Should've caught that.
 >
 > The docs seem to recommend 2gb.  Should that be increased as well?
 >
 > --Ben
 >
 > On Tue, Sep 8, 2015 at 9:33 AM, Sean Owen  wrote:
 >>
 >> It shows you there that Maven is out of memory. Give it more heap. I
 >> use
 >> 3gb.
 >>
 >> On Tue, Sep 8, 2015 at 1:53 PM, Benjamin Zaitlen 
 >> wrote:
 >> > Hi All,
 >> >
 >> > I'm trying to build a distribution off of the latest in master and
 >> > I
 >> > keep
 >> > getting errors on MQTT and the build fails.   I'm running the build
 >> > on a
 >> > m1.large which has 7.5 GB of RAM and no other major processes are
 >> > running.
 >> >
 >> >> MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
 >> >> -XX:ReservedCodeCacheSize=512m"
 >> >> ./make-distribution.sh  --name continuum-custom-spark-1.5 --tgz
 >> >> -Pyarn
 >> >> -Phive -Phive-thriftserver -Phadoop-2.4 -Dhadoop.version=2.4.0
 >> >
 >> >
 >> >
 >> >> INFO] Spark Project GraphX ... SUCCESS
 >> >> [
 >> >> 33.345 s]
 >> >> [INFO] Spark Project Streaming 
 >> >> SUCCESS
 >> >> [01:08
 >> >> min]
 >> >> [INFO] Spark Project Catalyst .
 >> >> SUCCESS
 >> >> [01:39
 >> >> min]
 >> >> [INFO] Spark Project SQL ..
 >> >> SUCCESS
 >> >> [02:06
 >> >> min]
 >> >> [INFO] Spark Project ML Library ...
 >> >> SUCCESS
 >> >> [02:16
 >> >> min]
 >> >> [INFO] Spark Project Tools ...

Creating Dataframe from XML parsed by scalaxb

2015-09-08 Thread Christopher Matta
I’m attempting to ingest some XML data created by Microsoft Biztalk using
spark. In order to validate and simplify the parsing of the XML I’d like to
use scalaxb , however when trying to convert the RDD of
scalaxb objects to a Dataframe, I get the following error:

java.lang.UnsupportedOperationException: Schema for type
scalaxb.DataRecord[scala.Any] is not supported

I’ve written up a more detailed description over on Stack Overflow:
http://stackoverflow.com/questions/32361226/creating-dataframe-from-xml-parsed-by-scalaxb

Is there a way to get the toDF() function to support custom Any types? If
not, is there a better way to go about ingesting SOAP-like XML with Spark
and Scala? I’m new to scalaxb and Scala in general.

Thanks in advance.

Chris mattacma...@mapr.com
215-701-3146
​


Re: Split content into multiple Parquet files

2015-09-08 Thread Adrien Mogenet
My bad, I realized my question was unclear.

I did a partitionBy when using saveAsHadoopFile. My question was about
doing the same thing for Parquet file. We were using Spark 1.3.x, but now
that we've updated to 1.4.1 I totally forgot this makes things possible :-)

Thanks for the answer, then!

On 8 September 2015 at 12:58, Cheng Lian  wrote:

> In Spark 1.4 and 1.5, you can do something like this:
>
> df.write.partitionBy("key").parquet("/datasink/output-parquets")
>
> BTW, I'm curious about how did you do it without partitionBy using
> saveAsHadoopFile?
>
> Cheng
>
>
> On 9/8/15 2:34 PM, Adrien Mogenet wrote:
>
> Hi there,
>
> We've spent several hours to split our input data into several parquet
> files (or several folders, i.e.
> /datasink/output-parquets//foobar.parquet), based on a
> low-cardinality key. This works very well with a when using
> saveAsHadoopFile, but we can't achieve a similar thing with Parquet files.
>
> The only working solution so far is to persist the RDD and then loop over
> it N times to write N files. That does not look acceptable...
>
> Do you guys have any suggestion to do such an operation?
>
> --
>
> *Adrien Mogenet*
> Head of Backend/Infrastructure
> adrien.moge...@contentsquare.com
> (+33)6.59.16.64.22
> http://www.contentsquare.com
> 50, avenue Montaigne - 75008 Paris
>
>
>


-- 

*Adrien Mogenet*
Head of Backend/Infrastructure
adrien.moge...@contentsquare.com
(+33)6.59.16.64.22
http://www.contentsquare.com
50, avenue Montaigne - 75008 Paris


Re: Memory-efficient successive calls to repartition()

2015-09-08 Thread Aurélien Bellet

Hi,

This is what I tried:

for i in range(1000):
print i
data2=data.repartition(50).cache()
if (i+1) % 10 == 0:
data2.checkpoint()
data2.first() # materialize rdd
data.unpersist() # unpersist previous version
sc._jvm.System.gc()
data=data2

But unfortunately I do not get any significant improvement from the call 
to sc._jvm.System.gc()...


I checked the WebUI and I have a single RDD in memory, so unpersist() 
works as expected but still no solution to trigger the cleaning of 
shuffle files...


Aurélien

Le 9/2/15 4:11 PM, alexis GILLAIN a écrit :

Just made some tests on my laptop.

Deletion of the files is not immediate but a System.gc() call makes the
job on shuffle files of a checkpointed RDD.
It should solve your problem (`sc._jvm.System.gc()` in Python as pointed
in the databricks link in my previous message).


2015-09-02 20:55 GMT+08:00 Aurélien Bellet
mailto:aurelien.bel...@telecom-paristech.fr>>:

Thanks a lot for the useful link and comments Alexis!

First of all, the problem occurs without doing anything else in the
code (except of course loading my data from HDFS at the beginning) -
so it definitely comes from the shuffling. You're right, in the
current version, checkpoint files are not removed and take up some
space in HDFS (this is easy to fix). But this is negligible compared
to the non hdfs files which keeps growing as iterations go. So I
agree with you that this must come from the shuffling operations: it
seems that the shuffle files are not removed along the execution
(they are only removed if I stop/kill the application), despite the
use of checkpoint.

The class you mentioned is very interesting but I did not find a way
to use it from pyspark. I will try to implement my own version,
looking at the source code. But besides the queueing and removing of
checkpoint files, I do not really see anything special there that
could solve my issue.

I will continue to investigate this. Just found out I can use a
command line browser to look at the webui (I cannot access the
server in graphical display mode), this should help me understand
what's going on. I will also try the workarounds mentioned in the
link. Keep you posted.

Again, thanks a lot!

Best,

Aurelien


Le 02/09/2015 14:15, alexis GILLAIN a écrit :

Aurélien,

  From what you're saying, I can think of a couple of things
considering
I don't know what you are doing in the rest of the code :

- There is lot of non hdfs writes, it comes from the rest of
your code
and/or repartittion(). Repartition involve a shuffling and
creation of
files on disk. I would have said that the problem come from that
but I
just checked and checkpoint() is supposed to delete shuffle files :

https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
(looks exactly as your problem so you could maybe try the others
workarounds)
Still, you may do a lot of shuffle in the rest of the code (you
should
see the amount of shuffle files written in the webui) and consider
increasing the disk space available...if you can do that.

- On the hdfs side, the class I pointed to has an update
function which
"automatically handles persisting and (optionally) checkpointing, as
well as unpersisting and removing checkpoint files". Not sure your
method for checkpointing remove previous checkpoint file.

In the end, does the disk space error come from hdfs growing or
local
disk growing ?

You should check the webui to identify which tasks spill data on
disk
and verify if the shuffle files are properly deleted when you
checkpoint
your rdd.


Regards,


2015-09-01 22:48 GMT+08:00 Aurélien Bellet
mailto:aurelien.bel...@telecom-paristech.fr>
>>:


 Dear Alexis,

 Thanks again for your reply. After reading about
checkpointing I
 have modified my sample code as follows:

 for i in range(1000):
  print i
  data2=data.repartition(50).cache()
  if (i+1) % 10 == 0:
  data2.checkpoint()
  data2.first() # materialize rdd
  data.unpersist() # unpersist previous version
  data=data2

 The data is checkpointed every 10 iterations to a directory
that I
 specified. While this seems to improve things a little bit,
there is
 still a lot of writing on disk (appcache directory, shown
as "non
 HDFS files" in Cloudera Man

Re: Can Spark Provide Multiple Context Support?

2015-09-08 Thread Silvio Fiorito
Is the data from HDFS static or is it unique for each event in the stream? If 
it’s static, you can just create the SparkContext, load the files from HDFS, 
then start a StreamingContext with the existing SparkContext and go from there.

From: Rachana Srivastava
Date: Tuesday, September 8, 2015 at 1:12 PM
To: "user@spark.apache.org"
Subject: Can Spark Provide Multiple Context Support?

Question: How does Spark support multiple context?

Background:  I have a stream of data coming to Spark from Kafka.   For each 
data in the stream I want to download some files from HDFS and process the file 
data.  I have written code to process the file from HDFS and I have code 
written to process stream data from Kafka using SparkStreaming API.  I have not 
been able to link both.

Can you please let me know if it is feasible to create JavaRDD from file inside 
SparkStreamingRDD job processing step?

Thanks,

Rachana


Re: Different Kafka createDirectStream implementations

2015-09-08 Thread Cody Koeninger
If you're providing starting offsets explicitly, then auto offset reset
isn't relevant.

On Tue, Sep 8, 2015 at 11:44 AM, Dan Dutrow  wrote:

> The two methods of createDirectStream appear to have different
> implementations, the second checks the offset.reset flags and does some
> error handling while the first does not. Besides the use of a
> messageHandler, are they intended to be used in different situations?
>
> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
> ClassTag,
> VD <: Decoder[V]: ClassTag,* R: ClassTag] *
> ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets:
> Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K, V]
> => R *
> ):
>
> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
> ClassTag, VD <: Decoder[V]: ClassTag]
> ( ssc: StreamingContext, kafkaParams: Map[String, String], topics:
> Set[String] )
> --
> Dan📱


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Nick Peterson
Yes, putting the jar on each node and adding it manually to the executor
classpath does it.  So, it seems that's where the issue lies.

I'll do some experimenting and see if I can narrow down the problem; but,
for now, at least I can run my job!

Thanks for your help.

On Tue, Sep 8, 2015 at 8:40 AM Igor Berman  wrote:

> another idea - you can add this fat jar explicitly to the classpath of
> executors...it's not a solution, but might be it work...
> I mean place it somewhere locally on executors and add it to cp with
> spark.executor.extraClassPath
>
> On 8 September 2015 at 18:30, Nick Peterson  wrote:
>
>> Yeah... none of the jars listed on the classpath contain this class.  The
>> only jar that does is the fat jar that I'm submitting with spark-submit,
>> which as mentioned isn't showing up on the classpath anywhere.
>>
>> -- Nick
>>
>> On Tue, Sep 8, 2015 at 8:26 AM Igor Berman  wrote:
>>
>>> hmm...out of ideas.
>>> can you check in spark ui environment tab that this jar is not somehow
>>> appears 2 times or more...? or more generally - any 2 jars that can contain
>>> this class by any chance
>>>
>>> regarding your question about classloader - no idea, probably there is,
>>> I remember stackoverflow has some examples on how to print all classes, but
>>> how to print all classes of kryo classloader - no idea.
>>>
>>> On 8 September 2015 at 16:43, Nick Peterson 
>>> wrote:
>>>
 Yes, the jar contains the class:

 $ jar -tf lumiata-evaluation-assembly-1.0.jar | grep
 2028/Document/Document
 com/i2028/Document/Document$1.class
 com/i2028/Document/Document.class

 What else can I do?  Is there any way to get more information about the
 classes available to the particular classloader kryo is using?

 On Tue, Sep 8, 2015 at 6:34 AM Igor Berman 
 wrote:

> java.lang.ClassNotFoundException: com.i2028.Document.Document
>
> 1. so have you checked that jar that you create(fat jar) contains this 
> class?
>
> 2. might be there is some stale cache issue...not sure though
>
>
> On 8 September 2015 at 16:12, Nicholas R. Peterson <
> nrpeter...@gmail.com> wrote:
>
>> Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot to 
>> include the list.)
>>
>>
>> 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in 
>> stage 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: 
>> com.esotericsoftware.kryo.KryoException: Error constructing instance of 
>> class: com.lumiata.patientanalysis.utils.CachedGraph
>>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>  at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>  at 
>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
>>  at 
>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing 
>> instance of class: com.lumiata.patientanalysis.utils.CachedGraph
>>  at 
>> com.twitter.chill.Instantiators$$anon$1.newInst

Re: 1.5 Build Errors

2015-09-08 Thread Benjamin Zaitlen
Yes, just reran with the following

(spark_build)root@ip-10-45-130-206:~/spark# export MAVEN_OPTS="-Xmx4096mb
> -XX:MaxPermSize=1024M -XX:ReservedCodeCacheSize=1024m"
> (spark_build)root@ip-10-45-130-206:~/spark# build/mvn -Pyarn -Phadoop-2.4
> -Dhadoop.version=2.4.0 -DskipTests clean package


and grepping for java


root   641  9.9  0.3 4411732 49040 pts/4   Sl+  17:35   0:01
> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -server -Xmx2g
> -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
> -Dzinc.home=/root/spark/build/zinc-0.3.5.3 -classpath
> /root/spark/build/zinc-0.3.5.3/lib/compiler-interface-sources.jar:/root/spark/build/zinc-0.3.5.3/lib/incremental-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/nailgun-server.jar:/root/spark/build/zinc-0.3.5.3/lib/sbt-interface.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-library.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-reflect.jar:/root/spark/build/zinc-0.3.5.3/lib/zinc.jar
> com.typesafe.zinc.Nailgun 3030 0
> root   687  226  2.0 1803664 312876 pts/4  Sl+  17:36   0:22
> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -Xms256m -Xmx512m -classpath
> /opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/boot/plexus-classworlds-2.5.2.jar
> -Dclassworlds.conf=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/bin/m2.conf
> -Dmaven.home=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3
> -Dmaven.multiModuleProjectDirectory=/root/spark
> org.codehaus.plexus.classworlds.launcher.Launcher -DzincPort=3030 -Pyarn
> -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package


On Tue, Sep 8, 2015 at 1:14 PM, Sean Owen  wrote:

> MAVEN_OPTS shouldn't affect zinc as it's an unrelated application. You
> can run "zinc -J-Xmx4g..." in general, but in the provided script,
> ZINC_OPTS seems to be the equivalent, yes. It kind of looks like your
> mvn process isn't getting any special memory args there. Is MAVEN_OPTS
> really exported?
>
> FWIW I use my own local mvn and zinc and it works fine.
>
> On Tue, Sep 8, 2015 at 6:05 PM, Benjamin Zaitlen 
> wrote:
> > I'm running zinv while compiling.  It seems that MAVEN_OPTS doesn't
> really
> > change much?  Or perhaps I'm misunderstanding something -- grepping for
> java
> > i see
> >
> >> root 24355  102  8.8 4687376 1350724 pts/4 Sl   16:51  11:08
> >> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -server -Xmx2g
> >> -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
> >> -Dzinc.home=/root/spark/build/zinc-0.3.5.3 -classpath
> >>
> /root/spark/build/zinc-0.3.5.3/lib/compiler-interface-sources.jar:/root/spark/build/zinc-0.3.5.3/lib/incremental-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/nailgun-server.jar:/root/spark/build/zinc-0.3.5.3/lib/sbt-interface.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-library.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-reflect.jar:/root/spark/build/zinc-0.3.5.3/lib/zinc.jar
> >> com.typesafe.zinc.Nailgun 3030 0
> >> root 25151 22.0  3.2 2269092 495276 pts/4  Sl+  16:53   1:56
> >> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -Xms256m -Xmx512m -classpath
> >>
> /opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/boot/plexus-classworlds-2.5.2.jar
> >>
> -Dclassworlds.conf=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/bin/m2.conf
> >> -Dmaven.home=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3
> >> -Dmaven.multiModuleProjectDirectory=/root/spark
> >> org.codehaus.plexus.classworlds.launcher.Launcher -DzincPort=3030 clean
> >> package -DskipTests -Pyarn -Phive -Phive-thriftserver -Phadoop-2.4
> >> -Dhadoop.version=2.4.0
> >
> >
> > So the heap size is still 2g even with MAVEN_OPTS set with 4g.  I noticed
> > that within build/mvn _COMPILE_JVM_OPTS is set to 2g and this is what
> > ZINC_OPTS is set to.
> >
> > --Ben
> >
> >
> > On Tue, Sep 8, 2015 at 11:06 AM, Ted Yu  wrote:
> >>
> >> Do you run Zinc while compiling ?
> >>
> >> Cheers
> >>
> >> On Tue, Sep 8, 2015 at 7:56 AM, Benjamin Zaitlen 
> >> wrote:
> >>>
> >>> I'm still getting errors with 3g.  I've increase to 4g and I'll report
> >>> back
> >>>
> >>> To be clear:
> >>>
> >>> export MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=1024M
> >>> -XX:ReservedCodeCacheSize=1024m"
> >>>
>  [ERROR] GC overhead limit exceeded -> [Help 1]
>  [ERROR]
>  [ERROR] To see the full stack trace of the errors, re-run Maven with
> the
>  -e switch.
>  [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>  [ERROR]
>  [ERROR] For more information about the errors and possible solutions,
>  please read the following articles:
>  [ERROR] [Help 1]
>  http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
>  + return 1
>  + exit 1
> >>>
> >>>
> >>> On Tue, Sep 8, 2015 at 10:03 AM, Sean Owen  wrote:
> 
>  It might need more memory in certain situations / running certain
>  tests. If 3gb works for your relatively full build, yes you can open a

Re: Best way to import data from Oracle to Spark?

2015-09-08 Thread Devin Jones
I'd look into the JdbcRDD class in the Java and Scala apis

On Tue, Sep 8, 2015 at 1:11 PM, Cui Lin  wrote:

> What's the best way to import data from Oracle to Spark? Thanks!
>
>
> --
> Best regards!
>
> Lin,Cui
>


Re: Different Kafka createDirectStream implementations

2015-09-08 Thread Dan Dutrow
Yes, but one implementation checks those flags and the other one doesn't. I
would think they should be consistent.

On Tue, Sep 8, 2015 at 1:32 PM Cody Koeninger  wrote:

> If you're providing starting offsets explicitly, then auto offset reset
> isn't relevant.
>
> On Tue, Sep 8, 2015 at 11:44 AM, Dan Dutrow  wrote:
>
>> The two methods of createDirectStream appear to have different
>> implementations, the second checks the offset.reset flags and does some
>> error handling while the first does not. Besides the use of a
>> messageHandler, are they intended to be used in different situations?
>>
>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>> ClassTag,
>> VD <: Decoder[V]: ClassTag,* R: ClassTag] *
>> ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets:
>> Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K, V]
>> => R *
>> ):
>>
>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>> ClassTag, VD <: Decoder[V]: ClassTag]
>> ( ssc: StreamingContext, kafkaParams: Map[String, String], topics:
>> Set[String] )
>> --
>> Dan📱
>
>
> --
Dan📱


Re: Best way to import data from Oracle to Spark?

2015-09-08 Thread Muhammad Atif
Looking into Spark SQL sources API will be a good start
https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html

On Tue, Sep 8, 2015 at 1:56 PM, Devin Jones 
wrote:

> I'd look into the JdbcRDD class in the Java and Scala apis
>
> On Tue, Sep 8, 2015 at 1:11 PM, Cui Lin  wrote:
>
>> What's the best way to import data from Oracle to Spark? Thanks!
>>
>>
>> --
>> Best regards!
>>
>> Lin,Cui
>>
>
>


Re: Different Kafka createDirectStream implementations

2015-09-08 Thread Cody Koeninger
What exactly do you think should be done with auto offset reset if someone
has explicitly provided offsets?

auto offset reset is only useful for determining whether to start the
stream at the beginning or the end of the log... if someone's provided
explicit offsets, it's pretty clear where to start the stream.

On Tue, Sep 8, 2015 at 1:19 PM, Dan Dutrow  wrote:

> Yes, but one implementation checks those flags and the other one doesn't.
> I would think they should be consistent.
>
> On Tue, Sep 8, 2015 at 1:32 PM Cody Koeninger  wrote:
>
>> If you're providing starting offsets explicitly, then auto offset reset
>> isn't relevant.
>>
>> On Tue, Sep 8, 2015 at 11:44 AM, Dan Dutrow  wrote:
>>
>>> The two methods of createDirectStream appear to have different
>>> implementations, the second checks the offset.reset flags and does some
>>> error handling while the first does not. Besides the use of a
>>> messageHandler, are they intended to be used in different situations?
>>>
>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>> ClassTag,
>>> VD <: Decoder[V]: ClassTag,* R: ClassTag] *
>>> ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets:
>>> Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K,
>>> V] => R *
>>> ):
>>>
>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>> ClassTag, VD <: Decoder[V]: ClassTag]
>>> ( ssc: StreamingContext, kafkaParams: Map[String, String], topics:
>>> Set[String] )
>>> --
>>> Dan📱
>>
>>
>> --
> Dan📱


NPE while reading ORC file using Spark 1.4 API

2015-09-08 Thread unk1102
Hi I read many ORC files in Spark and process it those files are basically
Hive partitions. Most of the times processing goes well but for few files I
get the following exception dont know why? These files are working fine in
Hive using Hive queries. Please guide. Thanks in advance.

DataFrame df = hiveContext.read().format("orc").load("/path/in/hdfs");

java.lang.NullPointerException
at
org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:402)
at
org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:206)
at
org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
at
org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:238)
at
org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:290)
at
org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:288)
at
org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NPE-while-reading-ORC-file-using-Spark-1-4-API-tp24609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NPE while reading ORC file using Spark 1.4 API

2015-09-08 Thread Umesh Kacha
Hi Zhan, thanks for the reply. Yes schema should be same actually I am
reading Hive table partitions as ORC format into Spark. So I believe it
should be same. I am new to Hive so dont know if schema can be different in
Hive partitioned table.

On Wed, Sep 9, 2015 at 12:16 AM, Zhan Zhang  wrote:

> Does your directory includes some orc files that is not having same schema
> of the table? Please refer to
> https://issues.apache.org/jira/browse/SPARK-10304
>
> Thanks.
>
> Zhan Zhang
>
> On Sep 8, 2015, at 11:39 AM, unk1102  wrote:
>
> Hi I read many ORC files in Spark and process it those files are basically
> Hive partitions. Most of the times processing goes well but for few files I
> get the following exception dont know why? These files are working fine in
> Hive using Hive queries. Please guide. Thanks in advance.
>
> DataFrame df = hiveContext.read().format("orc").load("/path/in/hdfs");
>
> java.lang.NullPointerException
>at
>
> org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:402)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:206)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
>at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at scala.collection.immutable.List.foreach(List.scala:318)
>at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>at
> org.apache.spark.sql.hive.orc.OrcTableScan.org
> $apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:238)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:290)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:288)
>at
>
> org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>at org.apache.spark.scheduler.Task.run(Task.scala:70)
>at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>at java.lang.Thread.run(Thread.java:744)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/NPE-while-reading-ORC-file-using-Spark-1-4-API-tp24609.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: Is HDFS required for Spark streaming?

2015-09-08 Thread Tathagata Das
You can use local directories in that case but it is not recommended and
not a well-test code path (so I have no idea what can happen).

On Tue, Sep 8, 2015 at 6:59 AM, Cody Koeninger  wrote:

> Yes, local directories will be sufficient
>
> On Sat, Sep 5, 2015 at 10:44 AM, N B  wrote:
>
>> Hi TD,
>>
>> Thanks!
>>
>> So our application does turn on checkpoints but we do not recover upon
>> application restart (we just blow the checkpoint directory away first and
>> re-create the StreamingContext) as we don't have a real need for that type
>> of recovery. However, because the application does reduceeByKeyAndWindow
>> operations, checkpointing has to be turned on. Do you think this scenario
>> will also only work with HDFS or having local directories suffice?
>>
>> Thanks
>> Nikunj
>>
>>
>>
>> On Fri, Sep 4, 2015 at 3:09 PM, Tathagata Das 
>> wrote:
>>
>>> Shuffle spills will use local disk, HDFS not needed.
>>> Spark and Spark Streaming checkpoint info WILL NEED HDFS for
>>> fault-tolerance. So that stuff can be recovered even if the spark cluster
>>> nodes go down.
>>>
>>> TD
>>>
>>> On Fri, Sep 4, 2015 at 2:45 PM, N B  wrote:
>>>
 Hello,

 We have a Spark Streaming program that is currently running on a single
 node in "local[n]" master mode. We currently give it local directories for
 Spark's own state management etc. The input is streaming from network/flume
 and output is also to network/kafka etc, so the process as such does not
 need any distributed file system.

 Now, we do want to start distributing this procesing across a few
 machines and make a real cluster out of it. However, I am not sure if HDFS
 is a hard requirement for that to happen. I am thinking about the Shuffle
 spills, DStream/RDD persistence and checkpoint info. Do any of these
 require the state to be shared via HDFS? Are there other alternatives that
 can be utilized if state sharing is accomplished via the file system only.

 Thanks
 Nikunj


>>>
>>
>


Re: NPE while reading ORC file using Spark 1.4 API

2015-09-08 Thread Davies Liu
I think this is fixed in 1.5 (release soon), by
https://github.com/apache/spark/pull/8407

On Tue, Sep 8, 2015 at 11:39 AM, unk1102  wrote:
> Hi I read many ORC files in Spark and process it those files are basically
> Hive partitions. Most of the times processing goes well but for few files I
> get the following exception dont know why? These files are working fine in
> Hive using Hive queries. Please guide. Thanks in advance.
>
> DataFrame df = hiveContext.read().format("orc").load("/path/in/hdfs");
>
> java.lang.NullPointerException
> at
> org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:402)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:206)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:238)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:290)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:288)
> at
> org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/NPE-while-reading-ORC-file-using-Spark-1-4-API-tp24609.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark ANN

2015-09-08 Thread Feynman Liang
Just wondering, why do we need tensors? Is the implementation of convnets
using im2col (see here )
insufficient?

On Tue, Sep 8, 2015 at 11:55 AM, Ulanov, Alexander  wrote:

> Ruslan, thanks for including me in the discussion!
>
>
>
> Dropout and other features such as Autoencoder were implemented, but not
> merged yet in order to have room for improving the internal Layer API. For
> example, there is an ongoing work with convolutional layer that
> consumes/outputs 2D arrays. We’ll probably need to change the Layer’s
> input/output type to tensors. This will influence dropout which will need
> some refactoring to handle tensors too. Also, all new components should
> have ML pipeline public interface. There is an umbrella issue for deep
> learning in Spark https://issues.apache.org/jira/browse/SPARK-5575 which
> includes various features of Autoencoder, in particular
> https://issues.apache.org/jira/browse/SPARK-10408. You are very welcome
> to join and contribute since there is a lot of work to be done.
>
>
>
> Best regards, Alexander
>
> *From:* Ruslan Dautkhanov [mailto:dautkha...@gmail.com]
> *Sent:* Monday, September 07, 2015 10:09 PM
> *To:* Feynman Liang
> *Cc:* Nick Pentreath; user; na...@yandex.ru
> *Subject:* Re: Spark ANN
>
>
>
> Found a dropout commit from avulanov:
>
>
> https://github.com/avulanov/spark/commit/3f25e26d10ef8617e46e35953fe0ad1a178be69d
>
>
>
> It probably hasn't made its way to MLLib (yet?).
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 8:34 PM, Feynman Liang 
> wrote:
>
> Unfortunately, not yet... Deep learning support (autoencoders, RBMs) is on
> the roadmap for 1.6 
> though, and there is a spark package
>  for
> dropout regularized logistic regression.
>
>
>
>
>
> On Mon, Sep 7, 2015 at 3:15 PM, Ruslan Dautkhanov 
> wrote:
>
> Thanks!
>
>
>
> It does not look Spark ANN yet supports dropout/dropconnect or any other
> techniques that help avoiding overfitting?
>
> http://www.cs.toronto.edu/~rsalakhu/papers/srivastava14a.pdf
>
> https://cs.nyu.edu/~wanli/dropc/dropc.pdf
>
>
>
> ps. There is a small copy-paste typo in
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala#L43
>
> should read B&C :)
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 12:47 PM, Feynman Liang 
> wrote:
>
> Backprop is used to compute the gradient here
> ,
> which is then optimized by SGD or LBFGS here
> 
>
>
>
> On Mon, Sep 7, 2015 at 11:24 AM, Nick Pentreath 
> wrote:
>
> Haven't checked the actual code but that doc says "MLPC employes
> backpropagation for learning the model. .."?
>
>
>
>
>
>
> —
> Sent from Mailbox 
>
>
>
> On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanov 
> wrote:
>
> http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html
>
>
>
> Implementation seems missing backpropagation?
>
> Was there is a good reason to omit BP?
>
> What are the drawbacks of a pure feedforward-only ANN?
>
>
>
> Thanks!
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
>
>
>
>
>
>
>
>


read compressed hdfs files using SparkContext.textFile?

2015-09-08 Thread shenyan zhen
Hi,

For hdfs files written with below code:

rdd.saveAsTextFile(getHdfsPath(...), classOf
[org.apache.hadoop.io.compress.GzipCodec])


I can see the hdfs files been generated:


0  /lz/streaming/am/144173460/_SUCCESS

1.6 M  /lz/streaming/am/144173460/part-0.gz

1.6 M  /lz/streaming/am/144173460/part-1.gz

1.6 M  /lz/streaming/am/144173460/part-2.gz

...


How do I read it using SparkContext?


My naive attempt:

val t1 = sc.textFile("/lz/streaming/am/144173460")

t1.take(1).head

did not work:


org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
file:/lz/streaming/am/144173460

at
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)

at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)

at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)

at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)


Thanks,

Shenyan


Re: Python Spark Streaming example with textFileStream does not work. Why?

2015-09-08 Thread Tathagata Das
Can you give absolute paths just to be sure?

On Mon, Sep 7, 2015 at 12:59 AM, Kamil Khadiev  wrote:

> I think that problem also depends on file system,
> I use mac and My program found file, but only when I created new, but not
> rename or move
>
> And in logs
> 15/09/07 10:44:52 INFO FileInputDStream: New files at time 1441611892000
> ms:
> I found my file
>
> But I don't see any processing of file in logs
>
> 2015-09-07 8:44 GMT+03:00 Kamil Khadiev :
>
>> Thank you.
>>
>> But it still does not work.
>>
>> Also I did another mistake: I wrote name of file, but not directory.
>>
>> I fix it:
>>   conf = (SparkConf()
>>  .setMaster("local")
>>  .setAppName("My app")
>>  .set("spark.executor.memory", "1g"))
>> sc = SparkContext(conf = conf)
>> ssc = StreamingContext(sc, 1)
>> lines = ssc.textFileStream('../inputs/streaminginputs')
>> counts = lines.flatMap(lambda line: line.split(" "))\
>>   .map(lambda x: (x, 1))\
>>   .reduceByKey(lambda a, b: a+b)
>> counts.pprint()
>> ssc.start()
>> ssc.awaitTermination()
>>
>> I add file to '../inputs/streaminginputs' directory, then rename it,
>> also try to copy new.
>> But it does not help.  I have same situation in console.
>> Also I have logs like this every second (But I haven't expected logs
>> about new file):
>>
>> ---
>> Time: 2015-09-07 08:39:29
>> ---
>>
>> 15/09/07 08:39:30 INFO FileInputDStream: Finding new files took 0 ms
>> 15/09/07 08:39:30 INFO FileInputDStream: New files at time 144160437
>> ms:
>>
>> 15/09/07 08:39:30 INFO JobScheduler: Added jobs for time 144160437 ms
>> 15/09/07 08:39:30 INFO JobScheduler: Starting job streaming job
>> 144160437 ms.0 from job set of time 144160437 ms
>> 15/09/07 08:39:30 INFO SparkContext: Starting job: runJob at
>> PythonRDD.scala:362
>> 15/09/07 08:39:30 INFO DAGScheduler: Registering RDD 163 (call at
>> /Library/Python/2.7/site-packages/py4j/java_gateway.py:1206)
>> 15/09/07 08:39:30 INFO DAGScheduler: Got job 20 (runJob at
>> PythonRDD.scala:362) with 1 output partitions (allowLocal=true)
>> 15/09/07 08:39:30 INFO DAGScheduler: Final stage: Stage 41(runJob at
>> PythonRDD.scala:362)
>> 15/09/07 08:39:30 INFO DAGScheduler: Parents of final stage: List(Stage
>> 40)
>> 15/09/07 08:39:30 INFO DAGScheduler: Missing parents: List()
>> 15/09/07 08:39:30 INFO DAGScheduler: Submitting Stage 41 (PythonRDD[167]
>> at RDD at PythonRDD.scala:43), which has no missing parents
>> 15/09/07 08:39:30 INFO MemoryStore: ensureFreeSpace(5952) called with
>> curMem=31088, maxMem=278019440
>> 15/09/07 08:39:30 INFO MemoryStore: Block broadcast_20 stored as values
>> in memory (estimated size 5.8 KB, free 265.1 MB)
>> 15/09/07 08:39:30 INFO MemoryStore: ensureFreeSpace(4413) called with
>> curMem=37040, maxMem=278019440
>> 15/09/07 08:39:30 INFO MemoryStore: Block broadcast_20_piece0 stored as
>> bytes in memory (estimated size 4.3 KB, free 265.1 MB)
>> 15/09/07 08:39:30 INFO BlockManagerInfo: Added broadcast_20_piece0 in
>> memory on localhost:57739 (size: 4.3 KB, free: 265.1 MB)
>> 15/09/07 08:39:30 INFO BlockManagerMaster: Updated info of block
>> broadcast_20_piece0
>> 15/09/07 08:39:30 INFO SparkContext: Created broadcast 20 from broadcast
>> at DAGScheduler.scala:839
>> 15/09/07 08:39:30 INFO DAGScheduler: Submitting 1 missing tasks from
>> Stage 41 (PythonRDD[167] at RDD at PythonRDD.scala:43)
>> 15/09/07 08:39:30 INFO TaskSchedulerImpl: Adding task set 41.0 with 1
>> tasks
>> 15/09/07 08:39:30 INFO TaskSetManager: Starting task 0.0 in stage 41.0
>> (TID 20, localhost, PROCESS_LOCAL, 1056 bytes)
>> 15/09/07 08:39:30 INFO Executor: Running task 0.0 in stage 41.0 (TID 20)
>> 15/09/07 08:39:30 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty
>> blocks out of 0 blocks
>> 15/09/07 08:39:30 INFO ShuffleBlockFetcherIterator: Started 0 remote
>> fetches in 0 ms
>> 15/09/07 08:39:30 INFO PythonRDD: Times: total = 1, boot = -1005, init =
>> 1006, finish = 0
>> 15/09/07 08:39:30 INFO PythonRDD: Times: total = 2, boot = -1004, init =
>> 1006, finish = 0
>> 15/09/07 08:39:30 INFO Executor: Finished task 0.0 in stage 41.0 (TID
>> 20). 932 bytes result sent to driver
>> 15/09/07 08:39:30 INFO TaskSetManager: Finished task 0.0 in stage 41.0
>> (TID 20) in 7 ms on localhost (1/1)
>> 15/09/07 08:39:30 INFO TaskSchedulerImpl: Removed TaskSet 41.0, whose
>> tasks have all completed, from pool
>> 15/09/07 08:39:30 INFO DAGScheduler: Stage 41 (runJob at
>> PythonRDD.scala:362) finished in 0.008 s
>> 15/09/07 08:39:30 INFO DAGScheduler: Job 20 finished: runJob at
>> PythonRDD.scala:362, took 0.015576 s
>> 15/09/07 08:39:30 INFO JobScheduler: Finished job streaming job
>> 144160437 ms.0 from job set of time 144160437 ms
>>
>> 2015-09-04 20:14 GMT+03:00 Davies Liu :
>>
>>> Spark Streaming only process the NEW files after it started, so you
>>> should point it to a direc

Re: Different Kafka createDirectStream implementations

2015-09-08 Thread Dan Dutrow
I see... the first method takes the offsets as it's third parameter while
the second method just takes topic names and that's the primary reason why
the implementations are different.

In that case, what I am noticing is that setting the messageHandler is
unavailable in the second method. This isn't a killer for me, but maybe
someone else would want to set that.

On Tue, Sep 8, 2015 at 2:32 PM Cody Koeninger  wrote:

> What exactly do you think should be done with auto offset reset if someone
> has explicitly provided offsets?
>
> auto offset reset is only useful for determining whether to start the
> stream at the beginning or the end of the log... if someone's provided
> explicit offsets, it's pretty clear where to start the stream.
>
> On Tue, Sep 8, 2015 at 1:19 PM, Dan Dutrow  wrote:
>
>> Yes, but one implementation checks those flags and the other one doesn't.
>> I would think they should be consistent.
>>
>> On Tue, Sep 8, 2015 at 1:32 PM Cody Koeninger  wrote:
>>
>>> If you're providing starting offsets explicitly, then auto offset reset
>>> isn't relevant.
>>>
>>> On Tue, Sep 8, 2015 at 11:44 AM, Dan Dutrow 
>>> wrote:
>>>
 The two methods of createDirectStream appear to have different
 implementations, the second checks the offset.reset flags and does some
 error handling while the first does not. Besides the use of a
 messageHandler, are they intended to be used in different situations?

 def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
 ClassTag,
 VD <: Decoder[V]: ClassTag,* R: ClassTag] *
 ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets:
 Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K,
 V] => R *
 ):

 def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
 ClassTag, VD <: Decoder[V]: ClassTag]
 ( ssc: StreamingContext, kafkaParams: Map[String, String], topics:
 Set[String] )
 --
 Dan📱
>>>
>>>
>>> --
>> Dan📱
>
>
> --
Dan📱


performance when checking if data frame is empty or not

2015-09-08 Thread Axel Dahl
I have a join, that fails when one of the data frames is empty.

To avoid this I am hoping to check if the dataframe is empty or not before
the join.

The question is what's the most performant way to do that?

should I do df.count() or df.first() or something else?

Thanks in advance,

-Axel


Re: [streaming] DStream with window performance issue

2015-09-08 Thread Cody Koeninger
Yeah, that's the general idea.

When you say hard code topic name, do you mean  Set(topicA, topicB, topicB)
?  You should be able to use a variable for that - read it from a config
file, whatever.

If you're talking about the match statement, yeah you'd need to hardcode
your cases.


On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей  wrote:

> Ok. I got it!
> But it seems that I need to hard code topic name.
>
> something like that?
>
> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](
>   ssc, kafkaParams, Set(topicA, topicB, topicB))
>   .transform{ rdd =>
> val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> rdd.mapPartitionsWithIndex(
>   (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
> offsetRange(idx).topic match {
>   case "topicA" => ...
>   case "topicB" => ...
>   case _ => 
> }
>  )
> }
>
>
>
>
> 08.09.2015, 19:21, "Cody Koeninger" :
>
> That doesn't really matter.  With the direct stream you'll get all objects
> for a given topicpartition in the same spark partition.  You know what
> topic it's from via hasOffsetRanges.  Then you can deserialize
> appropriately based on topic.
>
> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей 
> wrote:
>
> The thing is, that these topics contain absolutely different AVRO
> objects(Array[Byte]) that I need to deserialize to different Java(Scala)
> objects, filter and then map to tuple (String, String). So i have 3 streams
> with different avro object in there. I need to cast them(using some
> business rules) to pairs and unite.
>
> --
> Яндекс.Почта — надёжная почта
> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>
>
> 08.09.2015, 19:11, "Cody Koeninger" :
> > I'm not 100% sure what's going on there, but why are you doing a union
> in the first place?
> >
> > If you want multiple topics in a stream, just pass them all in the set
> of topics to one call to createDirectStream
> >
> > On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin 
> wrote:
> >> Ok.
> >> Spark 1.4.1 on yarn
> >>
> >> Here is my application
> >> I have 4 different Kafka topics(different object streams)
> >>
> >> type Edge = (String,String)
> >>
> >> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter(
> nonEmpty ).map( toEdge )
> >> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter(
> nonEmpty ).map( toEdge )
> >> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val u = a union b union c
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val joinResult = source.rightOuterJoin( z )
> >> joinResult.foreachRDD { rdd=>
> >>   rdd.foreachPartition { partition =>
> >>   // save to result topic in kafka
> >>}
> >>  }
> >>
> >> The 'window' function in the code above is constantly growing,
> >> no matter how many events appeared in corresponding kafka topics
> >>
> >> but if I change one line from
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> to
> >>
> >> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
> >>
> >> val source = u.transform(_.partitionBy(partitioner.value)
> ).window(Seconds(600), Seconds(10))
> >>
> >> Everything works perfect.
> >>
> >> Perhaps the problem was in WindowedDStream
> >>
> >> I forced to use PartitionerAwareUnionRDD( partitionBy the same
> partitioner ) instead of UnionRDD.
> >>
> >> Nonetheless I did not see any hints about such a bahaviour in doc.
> >> Is it a bug or absolutely normal behaviour?
> >>
> >> 08.09.2015, 17:03, "Cody Koeninger" :
> >>
> >>>  Can you provide more info (what version of spark, code example)?
> >>>
> >>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin 
> wrote:
>   Hi,
> 
>   I have an application with 2 streams, which are joined together.
>   Stream1 - is simple DStream(relativly small size batch chunks)
>   Stream2 - is a windowed DStream(with duration for example 60 seconds)
> 
>   Stream1 and Stream2 are Kafka direct stream.
>   The problem is that according to logs window operation is constantly
> increasing(http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php
> ">screen).
>   And also I see gap in pocessing window(http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen)
> in logs there are no events in that period.
>   So what is happen in that gap and why window is constantly
> insreasing?
> 
>   Thank you in advance
> 
>   -
>   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>   For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [streaming] DStream with window performance issue

2015-09-08 Thread Понькин Алексей
Thank you very much for great answer!

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1&t=1


08.09.2015, 23:53, "Cody Koeninger" :
> Yeah, that's the general idea.
>
> When you say hard code topic name, do you mean  Set(topicA, topicB, topicB) ? 
>  You should be able to use a variable for that - read it from a config file, 
> whatever.
>
> If you're talking about the match statement, yeah you'd need to hardcode your 
> cases.
>
> On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей  wrote:
>> Ok. I got it!
>> But it seems that I need to hard code topic name.
>>
>> something like that?
>>
>> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], 
>> DefaultDecoder, DefaultDecoder](
>>   ssc, kafkaParams, Set(topicA, topicB, topicB))
>>   .transform{ rdd =>
>>     val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>     rdd.mapPartitionsWithIndex(
>>       (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
>>         offsetRange(idx).topic match {
>>           case "topicA" => ...
>>           case "topicB" => ...
>>           case _ => 
>>         }
>>      )
>>     }
>>
>> 08.09.2015, 19:21, "Cody Koeninger" :
>>> That doesn't really matter.  With the direct stream you'll get all objects 
>>> for a given topicpartition in the same spark partition.  You know what 
>>> topic it's from via hasOffsetRanges.  Then you can deserialize 
>>> appropriately based on topic.
>>>
>>> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей  
>>> wrote:
 The thing is, that these topics contain absolutely different AVRO 
 objects(Array[Byte]) that I need to deserialize to different Java(Scala) 
 objects, filter and then map to tuple (String, String). So i have 3 
 streams with different avro object in there. I need to cast them(using 
 some business rules) to pairs and unite.

 --
 Яндекс.Почта — надёжная почта
 http://mail.yandex.ru/neo2/collect/?exp=1&t=1

 08.09.2015, 19:11, "Cody Koeninger" :

> I'm not 100% sure what's going on there, but why are you doing a union in 
> the first place?
>
> If you want multiple topics in a stream, just pass them all in the set of 
> topics to one call to createDirectStream
>
> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin  
> wrote:
>> Ok.
>> Spark 1.4.1 on yarn
>>
>> Here is my application
>> I have 4 different Kafka topics(different object streams)
>>
>> type Edge = (String,String)
>>
>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( 
>> nonEmpty ).map( toEdge )
>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( 
>> nonEmpty ).map( toEdge )
>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( 
>> nonEmpty ).map( toEdge )
>>
>> val u = a union b union c
>>
>> val source = u.window(Seconds(600), Seconds(10))
>>
>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( 
>> nonEmpty ).map( toEdge )
>>
>> val joinResult = source.rightOuterJoin( z )
>> joinResult.foreachRDD { rdd=>
>>   rdd.foreachPartition { partition =>
>>       // save to result topic in kafka
>>    }
>>  }
>>
>> The 'window' function in the code above is constantly growing,
>> no matter how many events appeared in corresponding kafka topics
>>
>> but if I change one line from
>>
>> val source = u.window(Seconds(600), Seconds(10))
>>
>> to
>>
>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
>>
>> val source = u.transform(_.partitionBy(partitioner.value) 
>> ).window(Seconds(600), Seconds(10))
>>
>> Everything works perfect.
>>
>> Perhaps the problem was in WindowedDStream
>>
>> I forced to use PartitionerAwareUnionRDD( partitionBy the same 
>> partitioner ) instead of UnionRDD.
>>
>> Nonetheless I did not see any hints about such a bahaviour in doc.
>> Is it a bug or absolutely normal behaviour?
>>
>> 08.09.2015, 17:03, "Cody Koeninger" :
>>
>>>  Can you provide more info (what version of spark, code example)?
>>>
>>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin  
>>> wrote:
  Hi,

  I have an application with 2 streams, which are joined together.
  Stream1 - is simple DStream(relativly small size batch chunks)
  Stream2 - is a windowed DStream(with duration for example 60 seconds)

  Stream1 and Stream2 are Kafka direct stream.
  The problem is that according to logs window operation is constantly 
 increasing(>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php";>screen).
  And also I see gap in pocessing window(>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen)
  in logs there are no events in that period.

Re: Memory-efficient successive calls to repartition()

2015-09-08 Thread Aurélien Bellet
What is strange is that if I remove the if condition (i.e., checkpoint 
at each iteration), then it basically works: non HDFS disk usage remains 
very small and stable throughout the execution.


If instead I checkpoint only every now and then (cf code in my previous 
email), then the disk usage grows regularly throughout the execution 
until no free space is available, despite the call to the GC.


Aurelien

Le 9/8/15 6:22 PM, Aurélien Bellet a écrit :

Hi,

This is what I tried:

for i in range(1000):
 print i
 data2=data.repartition(50).cache()
 if (i+1) % 10 == 0:
 data2.checkpoint()
 data2.first() # materialize rdd
 data.unpersist() # unpersist previous version
 sc._jvm.System.gc()
 data=data2

But unfortunately I do not get any significant improvement from the call
to sc._jvm.System.gc()...

I checked the WebUI and I have a single RDD in memory, so unpersist()
works as expected but still no solution to trigger the cleaning of
shuffle files...

Aurélien

Le 9/2/15 4:11 PM, alexis GILLAIN a écrit :

Just made some tests on my laptop.

Deletion of the files is not immediate but a System.gc() call makes the
job on shuffle files of a checkpointed RDD.
It should solve your problem (`sc._jvm.System.gc()` in Python as pointed
in the databricks link in my previous message).


2015-09-02 20:55 GMT+08:00 Aurélien Bellet
mailto:aurelien.bel...@telecom-paristech.fr>>:

Thanks a lot for the useful link and comments Alexis!

First of all, the problem occurs without doing anything else in the
code (except of course loading my data from HDFS at the beginning) -
so it definitely comes from the shuffling. You're right, in the
current version, checkpoint files are not removed and take up some
space in HDFS (this is easy to fix). But this is negligible compared
to the non hdfs files which keeps growing as iterations go. So I
agree with you that this must come from the shuffling operations: it
seems that the shuffle files are not removed along the execution
(they are only removed if I stop/kill the application), despite the
use of checkpoint.

The class you mentioned is very interesting but I did not find a way
to use it from pyspark. I will try to implement my own version,
looking at the source code. But besides the queueing and removing of
checkpoint files, I do not really see anything special there that
could solve my issue.

I will continue to investigate this. Just found out I can use a
command line browser to look at the webui (I cannot access the
server in graphical display mode), this should help me understand
what's going on. I will also try the workarounds mentioned in the
link. Keep you posted.

Again, thanks a lot!

Best,

Aurelien


Le 02/09/2015 14:15, alexis GILLAIN a écrit :

Aurélien,

  From what you're saying, I can think of a couple of things
considering
I don't know what you are doing in the rest of the code :

- There is lot of non hdfs writes, it comes from the rest of
your code
and/or repartittion(). Repartition involve a shuffling and
creation of
files on disk. I would have said that the problem come from that
but I
just checked and checkpoint() is supposed to delete shuffle
files :

https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html

(looks exactly as your problem so you could maybe try the others
workarounds)
Still, you may do a lot of shuffle in the rest of the code (you
should
see the amount of shuffle files written in the webui) and
consider
increasing the disk space available...if you can do that.

- On the hdfs side, the class I pointed to has an update
function which
"automatically handles persisting and (optionally)
checkpointing, as
well as unpersisting and removing checkpoint files". Not sure
your
method for checkpointing remove previous checkpoint file.

In the end, does the disk space error come from hdfs growing or
local
disk growing ?

You should check the webui to identify which tasks spill data on
disk
and verify if the shuffle files are properly deleted when you
checkpoint
your rdd.


Regards,


2015-09-01 22:48 GMT+08:00 Aurélien Bellet
mailto:aurelien.bel...@telecom-paristech.fr>
>>:


 Dear Alexis,

 Thanks again for your reply. After reading about
checkpointing I
 have modified my sample code as follows:

 for i in range(1000):
  print i
  data2=data.repartition(50).cache()
  if (i+1) % 10 == 0:
  data2.checkpoi

Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?

2015-09-08 Thread Richard Marscher
Hi,

what is the reasoning behind the use of `coalesce(1,false)`? This is saying
to aggregate all data into a single partition, which must fit in memory on
one node in the Spark cluster. If the cluster has more than one node it
must shuffle to move the data. It doesn't seem like the following map or
union necessitate coalesce, but the use case is not clear to me.

On Fri, Sep 4, 2015 at 12:29 PM, unk1102  wrote:

> Hi I have Spark job which does some processing on ORC data and stores back
> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have
> the following piece of code which is using heavy shuffle memory. How do I
> optimize below code? Is there anything wrong with it? It is working fine as
> expected only causing slowness because of GC pause and shuffles lots of
> data
> so hitting memory issues. Please guide I am new to Spark. Thanks in
> advance.
>
> JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1,
> false).map(new Function() {
>@Override
>public Row call(Row row) throws Exception {
> List rowAsList;
> Row row1 = null;
> if (row != null) {
>   rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
>   row1 = RowFactory.create(rowAsList.toArray());
> }
> return row1;
>}
> }).union(modifiedRDD);
> DataFrame updatedDataFrame =
> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
>
> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity",
> "date").save("baseTable");
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



Re: ClassCastException in driver program

2015-09-08 Thread Jeff Jones
Thanks for the response.
Turns out that this post addressed the issue. 
http://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser
We have some UDFs defined and the jar containing the class for these UDFs 
wasn’t in the dependent jars list.  Unfortunately the actual error got masked 
by the one I sent below.

Jeff

From: Shixiong Zhu
Date: Sunday, September 6, 2015 at 9:02 AM
To: Jeff Jones
Cc: "user@spark.apache.org"
Subject: Re: ClassCastException in driver program

Looks there are some circular references in SQL making the immutable List 
serialization fail in 2.11.

In 2.11, Scala immutable List uses writeReplace()/readResolve() which don't 
play nicely with circular references. Here is an example to reproduce this 
issue in 2.11.6:

  class Foo extends Serializable {
var l: Seq[Any] = null
  }

  import java.io._

  val o = new ByteArrayOutputStream()
  val o1 = new ObjectOutputStream(o)
  val m = new Foo
  val n = List(1, m)
  m.l = n
  o1.writeObject(n)
  o1.close()
  val i = new ByteArrayInputStream(o.toByteArray)
  val i1 = new ObjectInputStream(i)
  i1.readObject()

Could you provide the "explain" output? It would be helpful to find the 
circular references.




Best Regards,

Shixiong Zhu

2015-09-05 0:26 GMT+08:00 Jeff Jones 
mailto:jjo...@adaptivebiotech.com>>:
We are using Scala 2.11 for a driver program that is running Spark SQL queries 
in a standalone cluster. I’ve rebuilt Spark for Scala 2.11 using the 
instructions at http://spark.apache.org/docs/latest/building-spark.html.  I’ve 
had to work through a few dependency conflict but all-in-all it seems to work 
for some simple Spark examples. I integrated the Spark SQL code into my 
application and I’m able to run using a local client, but when I switch over to 
the standalone cluster I get the following error.  Any help tracking this down 
would be appreciated.

This exception occurs during a DataFrame.collect() call. I’ve tried to use 
–Dsun.io.serialization.extendedDebugInfo=true to get more information but it 
didn’t provide anything more.


[error] o.a.s.s.TaskSetManager - Task 0 in stage 1.0 failed 4 times; aborting 
job

[error] c.a.i.c.Analyzer - Job aborted due to stage failure: Task 0 in stage 
1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 
10.248.0.242): java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.sql.execution.Project.projectList of type scala.collection.Seq 
in instance of org.apache.spark.sql.execution.Project

at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source)

at java.io.ObjectStreamClass.setObjFieldValues(Unknown Source)

at java.io.ObjectInputStream.defaultReadFields(Unknown Source)

at java.io.ObjectInputStream.readSerialData(Unknown Source)

at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)

at java.io.ObjectInputStream.readObject0(Unknown Source)

at java.io.ObjectInputStream.defaultReadFields(Unknown Source)

at java.io.ObjectInputStream.readSerialData(Unknown Source)

at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)

at java.io.ObjectInputStream.readObject0(Unknown Source)

at java.io.ObjectInputStream.defaultReadFields(Unknown Source)

at java.io.ObjectInputStream.readSerialData(Unknown Source)

at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)

at java.io.ObjectInputStream.readObject0(Unknown Source)

at java.io.ObjectInputStream.defaultReadFields(Unknown Source)

at java.io.ObjectInputStream.readSerialData(Unknown Source)

at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)

at java.io.ObjectInputStream.readObject0(Unknown Source)

at java.io.ObjectInputStream.defaultReadFields(Unknown Source)

at java.io.ObjectInputStream.readSerialData(Unknown Source)

at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)

at java.io.ObjectInputStream.readObject0(Unknown Source)

at java.io.ObjectInputStream.readObject(Unknown Source)

at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:477)

at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

at java.lang.reflect.Method.invoke(Unknown Source)

at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)

at java.io.ObjectInputStream.readSerialData(Unknown Source)

at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)

at java.io.ObjectInputStream.readObject0(Unknown Source)

at java.io.ObjectInputStream.defaultReadFields(Unknown Source)

at java.io.ObjectInputStream.readSerialData(Unknown Source)

at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)

at java.io.ObjectInputStream.readObject0(Unknown Source)

at java.io.ObjectInputStream.defaultReadFields(Unknown Source)

at java.io.ObjectInputStream.readSerialData(Unknown Source)

at java.io.ObjectI

Re: read compressed hdfs files using SparkContext.textFile?

2015-09-08 Thread shenyan zhen
Realized I was using spark-shell, so it assumes local file.
By submitting a spark job, the same code worked fine..

On Tue, Sep 8, 2015 at 3:13 PM, shenyan zhen  wrote:

> Hi,
>
> For hdfs files written with below code:
>
> rdd.saveAsTextFile(getHdfsPath(...), classOf
> [org.apache.hadoop.io.compress.GzipCodec])
>
>
> I can see the hdfs files been generated:
>
>
> 0  /lz/streaming/am/144173460/_SUCCESS
>
> 1.6 M  /lz/streaming/am/144173460/part-0.gz
>
> 1.6 M  /lz/streaming/am/144173460/part-1.gz
>
> 1.6 M  /lz/streaming/am/144173460/part-2.gz
>
> ...
>
>
> How do I read it using SparkContext?
>
>
> My naive attempt:
>
> val t1 = sc.textFile("/lz/streaming/am/144173460")
>
> t1.take(1).head
>
> did not work:
>
>
> org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
> file:/lz/streaming/am/144173460
>
> at
> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
>
> at
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
>
> at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
>
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
>
> Thanks,
>
> Shenyan
>
>
>
>
>


Re: New to Spark - Paritioning Question

2015-09-08 Thread Richard Marscher
That seems like it could work, although I don't think `partitionByKey` is a
thing, at least for RDD. You might be able to merge step #2 and step #3
into one step by using the `reduceByKey` function signature that takes in a
Partitioner implementation.

def reduceByKey(partitioner: Partitioner

, func: (V, V) ⇒ V): RDD

[(K, V)]

Merge the values for each key using an associative reduce function. This
will also perform the merging locally on each mapper before sending results
to a reducer, similarly to a "combiner" in MapReduce.

The tricky part might be getting the partitioner to know about the number
of partitions, which I think it needs to know upfront in `abstract def
numPartitions: Int`. The `HashPartitioner` for example takes in the number
as a constructor argument, maybe you could use that with an upper bound
size if you don't mind empty partitions. Otherwise you might have to mess
around to extract the exact number of keys if it's not readily available.

Aside: what is the requirement to have each partition only contain the data
related to one key?

On Fri, Sep 4, 2015 at 11:06 AM, mmike87  wrote:

> Hello, I am new to Apache Spark and this is my company's first Spark
> project.
> Essentially, we are calculating models dealing with Mining data using
> Spark.
>
> I am holding all the source data in a persisted RDD that we will refresh
> periodically. When a "scenario" is passed to the Spark job (we're using Job
> Server) the persisted RDD is filtered to the relevant mines. For example,
> we
> may want all mines in Chile and the 1990-2015 data for each.
>
> Many of the calculations are cumulative, that is when we apply user-input
> "adjustment factors" to a value, we also need the "flexed" value we
> calculated for that mine previously.
>
> To ensure that this works, the idea if to:
>
> 1) Filter the superset to relevant mines (done)
> 2) Group the subset by the unique identifier for the mine. So, a group may
> be all the rows for mine "A" for 1990-2015
> 3) I then want to ensure that the RDD is partitioned by the Mine Identifier
> (and Integer).
>
> It's step 3 that is confusing me. I suspect it's very easy ... do I simply
> use PartitionByKey?
>
> We're using Java if that makes any difference.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/New-to-Spark-Paritioning-Question-tp24580.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



Re: Spark on Yarn vs Standalone

2015-09-08 Thread Sandy Ryza
Those settings seem reasonable to me.

Are you observing performance that's worse than you would expect?

-Sandy

On Mon, Sep 7, 2015 at 11:22 AM, Alexander Pivovarov 
wrote:

> Hi Sandy
>
> Thank you for your reply
> Currently we use r3.2xlarge boxes (vCPU: 8, Mem: 61 GiB)
> with emr setting for Spark "maximizeResourceAllocation": "true"
>
> It is automatically converted to Spark settings
> spark.executor.memory47924M
> spark.yarn.executor.memoryOverhead 5324
>
> we also set spark.default.parallelism = slave_count * 16
>
> Does it look good for you? (we run single heavy job on cluster)
>
> Alex
>
> On Mon, Sep 7, 2015 at 11:03 AM, Sandy Ryza 
> wrote:
>
>> Hi Alex,
>>
>> If they're both configured correctly, there's no reason that Spark
>> Standalone should provide performance or memory improvement over Spark on
>> YARN.
>>
>> -Sandy
>>
>> On Fri, Sep 4, 2015 at 1:24 PM, Alexander Pivovarov > > wrote:
>>
>>> Hi Everyone
>>>
>>> We are trying the latest aws emr-4.0.0 and Spark and my question is
>>> about YARN vs Standalone mode.
>>> Our usecase is
>>> - start 100-150 nodes cluster every week,
>>> - run one heavy spark job (5-6 hours)
>>> - save data to s3
>>> - stop cluster
>>>
>>> Officially aws emr-4.0.0 comes with Spark on Yarn
>>> It's probably possible to hack emr by creating bootstrap script which
>>> stops yarn and starts master and slaves on each computer  (to start Spark
>>> in standalone mode)
>>>
>>> My questions are
>>> - Does Spark standalone provides significant performance / memory
>>> improvement in comparison to YARN mode?
>>> - Does it worth hacking official emr Spark on Yarn and switch Spark to
>>> Standalone mode?
>>>
>>>
>>> I already created comparison table and want you to check if my
>>> understanding is correct
>>>
>>> Lets say r3.2xlarge computer has 52GB ram available for Spark Executor
>>> JVMs
>>>
>>> standalone to yarn comparison
>>>
>>>
>>> STDLN   YARN
>>>
>>> can executor allocate up to 52GB ram   - yes  |
>>>  yes
>>>
>>> will executor be unresponsive after using all 52GB ram because of GC -
>>> yes  |  yes
>>>
>>> additional JVMs on slave except of spark executor- workr | node
>>> mngr
>>>
>>> are additional JVMs lightweight -
>>> yes  |  yes
>>>
>>>
>>> Thank you
>>>
>>> Alex
>>>
>>
>>
>


Re: [streaming] DStream with window performance issue

2015-09-08 Thread Понькин Алексей
Oh my, I implemented one directStream instead of union of three but it is still 
growing exponential with window method. 

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1&t=1


08.09.2015, 23:53, "Cody Koeninger" :
> Yeah, that's the general idea.
>
> When you say hard code topic name, do you mean  Set(topicA, topicB, topicB) ? 
>  You should be able to use a variable for that - read it from a config file, 
> whatever.
>
> If you're talking about the match statement, yeah you'd need to hardcode your 
> cases.
>
> On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей  wrote:
>> Ok. I got it!
>> But it seems that I need to hard code topic name.
>>
>> something like that?
>>
>> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], 
>> DefaultDecoder, DefaultDecoder](
>>   ssc, kafkaParams, Set(topicA, topicB, topicB))
>>   .transform{ rdd =>
>>     val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>     rdd.mapPartitionsWithIndex(
>>       (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
>>         offsetRange(idx).topic match {
>>           case "topicA" => ...
>>           case "topicB" => ...
>>           case _ => 
>>         }
>>      )
>>     }
>>
>> 08.09.2015, 19:21, "Cody Koeninger" :
>>> That doesn't really matter.  With the direct stream you'll get all objects 
>>> for a given topicpartition in the same spark partition.  You know what 
>>> topic it's from via hasOffsetRanges.  Then you can deserialize 
>>> appropriately based on topic.
>>>
>>> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей  
>>> wrote:
 The thing is, that these topics contain absolutely different AVRO 
 objects(Array[Byte]) that I need to deserialize to different Java(Scala) 
 objects, filter and then map to tuple (String, String). So i have 3 
 streams with different avro object in there. I need to cast them(using 
 some business rules) to pairs and unite.

 --
 Яндекс.Почта — надёжная почта
 http://mail.yandex.ru/neo2/collect/?exp=1&t=1

 08.09.2015, 19:11, "Cody Koeninger" :

> I'm not 100% sure what's going on there, but why are you doing a union in 
> the first place?
>
> If you want multiple topics in a stream, just pass them all in the set of 
> topics to one call to createDirectStream
>
> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin  
> wrote:
>> Ok.
>> Spark 1.4.1 on yarn
>>
>> Here is my application
>> I have 4 different Kafka topics(different object streams)
>>
>> type Edge = (String,String)
>>
>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( 
>> nonEmpty ).map( toEdge )
>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( 
>> nonEmpty ).map( toEdge )
>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( 
>> nonEmpty ).map( toEdge )
>>
>> val u = a union b union c
>>
>> val source = u.window(Seconds(600), Seconds(10))
>>
>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( 
>> nonEmpty ).map( toEdge )
>>
>> val joinResult = source.rightOuterJoin( z )
>> joinResult.foreachRDD { rdd=>
>>   rdd.foreachPartition { partition =>
>>       // save to result topic in kafka
>>    }
>>  }
>>
>> The 'window' function in the code above is constantly growing,
>> no matter how many events appeared in corresponding kafka topics
>>
>> but if I change one line from
>>
>> val source = u.window(Seconds(600), Seconds(10))
>>
>> to
>>
>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
>>
>> val source = u.transform(_.partitionBy(partitioner.value) 
>> ).window(Seconds(600), Seconds(10))
>>
>> Everything works perfect.
>>
>> Perhaps the problem was in WindowedDStream
>>
>> I forced to use PartitionerAwareUnionRDD( partitionBy the same 
>> partitioner ) instead of UnionRDD.
>>
>> Nonetheless I did not see any hints about such a bahaviour in doc.
>> Is it a bug or absolutely normal behaviour?
>>
>> 08.09.2015, 17:03, "Cody Koeninger" :
>>
>>>  Can you provide more info (what version of spark, code example)?
>>>
>>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin  
>>> wrote:
  Hi,

  I have an application with 2 streams, which are joined together.
  Stream1 - is simple DStream(relativly small size batch chunks)
  Stream2 - is a windowed DStream(with duration for example 60 seconds)

  Stream1 and Stream2 are Kafka direct stream.
  The problem is that according to logs window operation is constantly 
 increasing(>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php";>screen).
  And also I see gap in pocessing window(>>> href="http://en.zimagez.com/zimage/screenshot2015-09-081

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Tathagata Das
Why are you checkpointing the direct kafka stream? It serves not purpose.

TD

On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg 
wrote:

> I just disabled checkpointing in our consumers and I can see that the
> batch duration millis set to 20 seconds is now being honored.
>
> Why would that be the case?
>
> And how can we "untie" batch duration millis from checkpointing?
>
> Thanks.
>
> On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger 
> wrote:
>
>> Well, I'm not sure why you're checkpointing messages.
>>
>> I'd also put in some logging to see what values are actually being read
>> out of your params object for the various settings.
>>
>>
>> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> I've stopped the jobs, the workers, and the master. Deleted the contents
>>> of the checkpointing dir. Then restarted master, workers, and consumers.
>>>
>>> I'm seeing the job in question still firing every 10 seconds.  I'm
>>> seeing the 10 seconds in the Spark Jobs GUI page as well as our logs.
>>> Seems quite strange given that the jobs used to fire every 1 second, we've
>>> switched to 10, now trying to switch to 20 and batch duration millis is not
>>> changing.
>>>
>>> Does anything stand out in the code perhaps?
>>>
>>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger 
>>> wrote:
>>>
 Have you tried deleting or moving the contents of the checkpoint
 directory and restarting the job?

 On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> Sorry, more relevant code below:
>
> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
> JavaStreamingContext jssc = params.isCheckpointed() ?
> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
> params);
> jssc.start();
> jssc.awaitTermination();
> jssc.close();
> ………..
>   private JavaStreamingContext createCheckpointedContext(SparkConf
> sparkConf, Parameters params) {
> JavaStreamingContextFactory factory = new
> JavaStreamingContextFactory() {
>   @Override
>   public JavaStreamingContext create() {
> return createContext(sparkConf, params);
>   }
> };
> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
> factory);
>   }
>
>   private JavaStreamingContext createContext(SparkConf sparkConf,
> Parameters params) {
> // Create context with the specified batch interval, in
> milliseconds.
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.milliseconds(params.getBatchDurationMillis()));
> // Set the checkpoint directory, if we're checkpointing
> if (params.isCheckpointed()) {
>   jssc.checkpoint(params.getCheckpointDir());
> }
>
> Set topicsSet = new HashSet(Arrays.asList(params
> .getTopic()));
>
> // Set the Kafka parameters.
> Map kafkaParams = new HashMap();
> kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST,
> params.getBrokerList());
> if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>   kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET,
> params.getAutoOffsetReset());
> }
>
> // Create direct Kafka stream with the brokers and the topic.
> JavaPairInputDStream messages =
> KafkaUtils.createDirectStream(
>   jssc,
>   String.class,
>   String.class,
>   StringDecoder.class,
>   StringDecoder.class,
>   kafkaParams,
>   topicsSet);
>
> // See if there's an override of the default checkpoint duration.
> if (params.isCheckpointed() && params.getCheckpointMillis() > 0L)
> {
>   messages.checkpoint(Durations.milliseconds(params
> .getCheckpointMillis()));
> }
>
> JavaDStream messageBodies = messages.map(new
> Function, String>() {
>   @Override
>   public String call(Tuple2 tuple2) {
> return tuple2._2();
>   }
> });
>
> messageBodies.foreachRDD(new Function, Void>() {
>   @Override
>   public Void call(JavaRDD rdd) throws Exception {
> ProcessPartitionFunction func = new
> ProcessPartitionFunction(params);
> rdd.foreachPartition(func);
> return null;
>   }
> });
>
> return jssc;
> }
>
> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I'd think that we wouldn't be "accidentally recovering from
>> checkpoint" hours or even days after consumers have been restarted, plus
>> the content is the fresh content that I'm feeding, not some content that
>> had been fed before the last restart.
>>
>> The code is basically as follows:
>>
>> SparkCo

Re: New to Spark - Paritioning Question

2015-09-08 Thread Mike Wright
Thanks for the response!

Well, in retrospect each partition doesn't need to be restricted to a
single key. But, I cannot have values associated with a key span partitions
since they all need to be processed together for a key to facilitate
cumulative calcs. So provided an individual key has all its values in a
single partition, I'm OK.

Additionally, the values will be written to the database, and from what I
have read doing this at the partition level is the best compromise between
1) Writing the calculated values for each key (lots of connect/disconnects)
and collecting them all at the end and writing them all at once.

I am using a groupBy against the filtered RDD the get the grouping I want,
but apparently this may not be the most efficient way, and it seems that
everything is always in a single partition under this scenario.


___

*Mike Wright*
Principal Architect, Software Engineering

SNL Financial LC
434-951-7816 *p*
434-244-4466 *f*
540-470-0119 *m*

mwri...@snl.com

On Tue, Sep 8, 2015 at 5:38 PM, Richard Marscher 
wrote:

> That seems like it could work, although I don't think `partitionByKey` is
> a thing, at least for RDD. You might be able to merge step #2 and step #3
> into one step by using the `reduceByKey` function signature that takes in a
> Partitioner implementation.
>
> def reduceByKey(partitioner: Partitioner
> 
> , func: (V, V) ⇒ V): RDD
> 
> [(K, V)]
>
> Merge the values for each key using an associative reduce function. This
> will also perform the merging locally on each mapper before sending results
> to a reducer, similarly to a "combiner" in MapReduce.
>
> The tricky part might be getting the partitioner to know about the number
> of partitions, which I think it needs to know upfront in `abstract def
> numPartitions: Int`. The `HashPartitioner` for example takes in the
> number as a constructor argument, maybe you could use that with an upper
> bound size if you don't mind empty partitions. Otherwise you might have to
> mess around to extract the exact number of keys if it's not readily
> available.
>
> Aside: what is the requirement to have each partition only contain the
> data related to one key?
>
> On Fri, Sep 4, 2015 at 11:06 AM, mmike87  wrote:
>
>> Hello, I am new to Apache Spark and this is my company's first Spark
>> project.
>> Essentially, we are calculating models dealing with Mining data using
>> Spark.
>>
>> I am holding all the source data in a persisted RDD that we will refresh
>> periodically. When a "scenario" is passed to the Spark job (we're using
>> Job
>> Server) the persisted RDD is filtered to the relevant mines. For example,
>> we
>> may want all mines in Chile and the 1990-2015 data for each.
>>
>> Many of the calculations are cumulative, that is when we apply user-input
>> "adjustment factors" to a value, we also need the "flexed" value we
>> calculated for that mine previously.
>>
>> To ensure that this works, the idea if to:
>>
>> 1) Filter the superset to relevant mines (done)
>> 2) Group the subset by the unique identifier for the mine. So, a group may
>> be all the rows for mine "A" for 1990-2015
>> 3) I then want to ensure that the RDD is partitioned by the Mine
>> Identifier
>> (and Integer).
>>
>> It's step 3 that is confusing me. I suspect it's very easy ... do I simply
>> use PartitionByKey?
>>
>> We're using Java if that makes any difference.
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/New-to-Spark-Paritioning-Question-tp24580.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com  | Our Blog
>  | Twitter  |
> Facebook  | LinkedIn
> 
>


  1   2   >