net.razorvine.pickle.PickleException in Pyspark

2016-04-24 Thread Caique Marques
Hello, everyone!

I'm trying to implement the association rules in Python. I got implement an
association by a frequent element, works as expected (example can be seen
here
).


Now, my challenge is to implement by a custom RDD. I study the structure of
Spark and how it implement Python functions of machine learning algorithms.
The implementations can be seen in the fork
.

The example for a custom RDD for association rule can be seen here
,
in the line 33 the output is:

MapPartitionsRDD[10] at mapPartitions at PythonMLLibAPI.scala:1533

It is ok. Testing the Scala example, the structure returned is a
MapPartitions. But, when I try use a *foreach* in this collection:

net.razorvine.pickle.PickleException: expected zero arguments for
construction of ClassDict (for numpy.core.multiarray._reconstruct)
at
net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
at
org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$2.apply(PythonMLLibAPI.scala:1547)
at
org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$2.apply(PythonMLLibAPI.scala:1546)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:77)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:45)
at org.apache.spark.scheduler.Task.run(Task.scala:81)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

What is this? What does mean? Any help or tip is welcome.

Thanks,
Caique.


Re: executor delay in Spark

2016-04-24 Thread Mike Hynes
Could you change numPartitions to {16, 32, 64} and run your program for
each to see how many partitions are allocated to each worker? Let's see if
you experience an all-nothing imbalance that way; if so, my guess is that
something else is odd in your program logic or spark runtime environment,
but if not and your executors all receive at least *some* partitions, then
I still wouldn't rule out effects of scheduling delay. It's a simple test,
but it could give some insight.

Mike

his could still be a  scheduling  If only one has *all* partitions,  and
email me the log file? (If it's 10+ MB, just the first few thousand lines
are fine).
On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" 
wrote:

> Mike, All,
>
> It turns out that the second time we encountered the uneven-partition
> issue is not due to spark-submit. It was resolved with the change in
> placement of count().
>
> Case-1:
>
> val numPartitions = 8
> // read uAxioms from HDFS, use hash partitioner on it and persist it
> // read type1Axioms from HDFS, use hash partitioner on it and persist it
> currDeltaURule1 = type1Axioms.join(uAxioms)
>  .values
>  .distinct(numPartitions)
>  .partitionBy(hashPartitioner)
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>.count()
>
> 
>
> currDeltaURule1 RDD results in all the data on one node (there are 2
> worker nodes). If we move count(), the uneven partition issue is resolved.
>
> Case-2:
>
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>
>
> 
>
>  -- this rdd depends on currDeltaURule1 and it gets executed.
> This resolved the uneven partitioning issue.
>
> I don't see why the moving of an action to a later part in the code would
> affect the partitioning. Are there other factors at play here that affect
> the partitioning?
>
> (Inconsistent) uneven partitioning leads to one machine getting over
> burdened (memory and number of tasks). We see a clear improvement in
> runtime when the partitioning is even (happens when count is moved).
>
> Any pointers in figuring out this issue is much appreciated.
>
> Regards,
> Raghava.
>
>
>
>
> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Glad to hear that the problem was solvable! I have not seen delays of
>> this type for later stages in jobs run by spark-submit, but I do not think
>> it impossible if your stage has no lineage dependence on other RDDs.
>>
>> I'm CC'ing the dev list to report of other users observing load imbalance
>> caused by unusual initial task scheduling. I don't know of ways to avoid
>> this other than creating a dummy task to synchronize the executors, but
>> hopefully someone from there can suggest other possibilities.
>>
>> Mike
>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" 
>> wrote:
>>
>>> Mike,
>>>
>>> It turns out the executor delay, as you mentioned, is the cause. After
>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>> happen during later stages as well? We noticed the same behavior
>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>> later stage also.
>>>
>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>> there any other option to fix this?
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>>
 When submitting a job with spark-submit, I've observed delays (up to
 1--2 seconds) for the executors to respond to the driver in order to
 receive tasks in the first stage. The delay does not persist once the
 executors have been synchronized.

 When the tasks are very short, as may be your case (relatively small
 data and a simple map task like you have described), the 8 tasks in
 your stage may be allocated to only 1 executor in 2 waves of 4, since
 the second executor won't have responded to the master before the
 first 4 tasks on the first executor have completed.

 To see if this is the cause in your particular case, you could try the
 following to confirm:
 1. Examine the starting times of the tasks alongside their
 executor
 2. Make a "dummy" stage execute before your real stages to
 synchronize the executors by creating and materializing any random RDD
 3. Make the tasks longer, i.e. with some silly computational
 work.

 Mike


 On 4/17/16, Raghava Mutharaju  wrote:
 > Yes its the same data.
 >
 > 1) The number of partitions are the same (8, which is an argument to
 the
 > HashPartitioner). In the first case, these partitions are spread
 across
 > both the worker nodes. In the second case, all the partitions are on
>

Re: executor delay in Spark

2016-04-24 Thread Jeff Zhang
Maybe this is due to config spark.scheduler.minRegisteredResourcesRatio,
you can try set it as 1 to see the behavior.


// Submit tasks only after (registered resources / total expected resources)

// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
  math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))


On Mon, Apr 25, 2016 at 7:17 AM, Mike Hynes <91m...@gmail.com> wrote:

> Could you change numPartitions to {16, 32, 64} and run your program for
> each to see how many partitions are allocated to each worker? Let's see if
> you experience an all-nothing imbalance that way; if so, my guess is that
> something else is odd in your program logic or spark runtime environment,
> but if not and your executors all receive at least *some* partitions, then
> I still wouldn't rule out effects of scheduling delay. It's a simple test,
> but it could give some insight.
>
> Mike
>
> his could still be a  scheduling  If only one has *all* partitions,  and
> email me the log file? (If it's 10+ MB, just the first few thousand lines
> are fine).
> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" 
> wrote:
>
>> Mike, All,
>>
>> It turns out that the second time we encountered the uneven-partition
>> issue is not due to spark-submit. It was resolved with the change in
>> placement of count().
>>
>> Case-1:
>>
>> val numPartitions = 8
>> // read uAxioms from HDFS, use hash partitioner on it and persist it
>> // read type1Axioms from HDFS, use hash partitioner on it and persist it
>> currDeltaURule1 = type1Axioms.join(uAxioms)
>>  .values
>>  .distinct(numPartitions)
>>  .partitionBy(hashPartitioner)
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>.count()
>>
>> 
>>
>> currDeltaURule1 RDD results in all the data on one node (there are 2
>> worker nodes). If we move count(), the uneven partition issue is resolved.
>>
>> Case-2:
>>
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>
>>
>> 
>>
>>  -- this rdd depends on currDeltaURule1 and it gets
>> executed. This resolved the uneven partitioning issue.
>>
>> I don't see why the moving of an action to a later part in the code would
>> affect the partitioning. Are there other factors at play here that affect
>> the partitioning?
>>
>> (Inconsistent) uneven partitioning leads to one machine getting over
>> burdened (memory and number of tasks). We see a clear improvement in
>> runtime when the partitioning is even (happens when count is moved).
>>
>> Any pointers in figuring out this issue is much appreciated.
>>
>> Regards,
>> Raghava.
>>
>>
>>
>>
>> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> Glad to hear that the problem was solvable! I have not seen delays of
>>> this type for later stages in jobs run by spark-submit, but I do not think
>>> it impossible if your stage has no lineage dependence on other RDDs.
>>>
>>> I'm CC'ing the dev list to report of other users observing load
>>> imbalance caused by unusual initial task scheduling. I don't know of ways
>>> to avoid this other than creating a dummy task to synchronize the
>>> executors, but hopefully someone from there can suggest other possibilities.
>>>
>>> Mike
>>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" 
>>> wrote:
>>>
 Mike,

 It turns out the executor delay, as you mentioned, is the cause. After
 we introduced a dummy stage, partitioning was working fine. Does this delay
 happen during later stages as well? We noticed the same behavior
 (partitioning happens on spark-shell but not through spark-submit) at a
 later stage also.

 Apart from introducing a dummy stage or running it from spark-shell, is
 there any other option to fix this?

 Regards,
 Raghava.


 On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:

> When submitting a job with spark-submit, I've observed delays (up to
> 1--2 seconds) for the executors to respond to the driver in order to
> receive tasks in the first stage. The delay does not persist once the
> executors have been synchronized.
>
> When the tasks are very short, as may be your case (relatively small
> data and a simple map task like you have described), the 8 tasks in
> your stage may be allocated to only 1 executor in 2 waves of 4, since
> the second executor won't have responded to the master before the
> first 4 tasks on the first executor have completed.
>
> To see if this is the cause in your particular case, you could try the
> following to confirm:
> 1. Examine the starting times of the tasks alongside their
> executor
>  

Spark sql with large sql syntax job failed with outofmemory error and grows beyond 64k warn

2016-04-24 Thread FangFang Chen
Hi all,
With large sql command, job failed with following error. Please give your 
suggestion on how to resolve it. Thanks


Sql file size: 676k
Log:
16/04/25 10:55:00 WARN TaskSetManager: Lost task 84.0 in stage 0.0 (TID 6, 
BJHC-HADOOP-HERA-17493.jd.local): java.util.concurrent.ExecutionException: 
java.lang.Exception: failed to compile: 
org.codehaus.janino.JaninoRuntimeException: Code of method 
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
 grows beyond 64 KB


public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
exprs) {
  return new SpecificUnsafeProjection(exprs);
}


class SpecificUnsafeProjection extends 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection {




..


java.lang.OutOfMemoryError: Java heap space
  at com.google.protobuf.ByteString.copyFrom(ByteString.java:192)
  at com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324)
  at akka.remote.WireFormats$AkkaProtocolMessage.(WireFormats.java:6657)
  at akka.remote.WireFormats$AkkaProtocolMessage.(WireFormats.java:6607)
  at 
akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6703)
  at 
akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6698)
  at 
com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
  at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
  at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
  at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
  at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
  at 
akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:6821)
  at 
akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:168)


  


发自 网易邮箱大师

Do transformation functions on RDD invoke a Job [sc.runJob]?

2016-04-24 Thread Praveen Devarao
Hi,

I have a streaming program with the block as below [ref: 
https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala
]

1 val lines = messages.map(_._2)
2 val hashTags = lines.flatMap(status => status.split(" "
).filter(_.startsWith("#")))

3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ )
3a  .map { case (topic, count) => (count, topic) }
3b  .transform(_.sortByKey(false))

4a topCounts60.foreachRDD( rdd => {
4b  val topList = rdd.take( 10 )
})

This batch is triggering 2 jobs...one at line 3b (sortByKey)  and 
the other at 4b (rdd.take) I agree that there is a Job triggered on line 
4b as take() is an action on RDD while as on line 3b sortByKey is just a 
transformation function which as per docs is lazy evaluation...but I see 
that this line uses a RangePartitioner and Rangepartitioner on 
initialization invokes a method called sketch() that invokes collect() 
triggering a Job.

My question: Is it expected that sortByKey will invoke a Job...if 
yes, why is sortByKey listed as a transformation and not action. Are there 
any other functions like this that invoke a Job, though they are 
transformations and not actions?

I am on Spark 1.6

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



Re: Do transformation functions on RDD invoke a Job [sc.runJob]?

2016-04-24 Thread Reynold Xin
Usually no - but sortByKey does because it needs the range boundary to be
built in order to have the RDD. It is a long standing problem that's
unfortunately very difficult to solve without breaking the RDD API.

In DataFrame/Dataset we don't have this issue though.


On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao 
wrote:

> Hi,
>
> I have a streaming program with the block as below [ref:
> https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala
> ]
>
> *1 val **lines *= *messages*.map(_._2)
> *2 val **hashTags *= *lines*.flatMap(status => status.split(*" "*
> ).filter(_.startsWith(*"#"*)))
>
> *3 val **topCounts60 *= *hashTags*.map((_, 1)).reduceByKey( _ + _ )
> *3a* .map { *case *(topic, count) => (count, topic) }
> *3b* .transform(_.sortByKey(*false*))
>
> *4a**topCounts60*.foreachRDD( rdd => {
> *4b* *val *topList = rdd.take( 10 )
> })
>
> This batch is triggering 2 jobs...one at line *3b**(sortByKey)*
>  and the other at *4b (rdd.take) *I agree that there is a Job triggered
> on line 4b as take() is an action on RDD while as on line 3b sortByKey is
> just a transformation function which as per docs is lazy evaluation...but I
> see that this line uses a RangePartitioner and Rangepartitioner on
> initialization invokes a method called *sketch() *that invokes *collect()*
> triggering a Job.
>
> My question: Is it expected that sortByKey will invoke a Job...if
> yes, why is sortByKey listed as a transformation and not action. Are there
> any other functions like this that invoke a Job, though they are
> transformations and not actions?
>
> I am on Spark 1.6
>
> Thanking You
>
> -
> Praveen Devarao
> Spark Technology Centre
> IBM India Software Labs
>
> -
> "Courage doesn't always roar. Sometimes courage is the quiet voice at the
> end of the day saying I will try again"
>


Re: Do transformation functions on RDD invoke a Job [sc.runJob]?

2016-04-24 Thread Praveen Devarao
Thanks Reynold for the reason as to why sortBykey invokes a Job

When you say "DataFrame/Dataset does not have this issue" is it right to 
assume you are referring to Spark 2.0 or Spark 1.6 DF already has built-in 
it?

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:   Reynold Xin 
To: Praveen Devarao/India/IBM@IBMIN
Cc: "dev@spark.apache.org" , user 

Date:   25/04/2016 11:26 am
Subject:Re: Do transformation functions on RDD invoke a Job 
[sc.runJob]?



Usually no - but sortByKey does because it needs the range boundary to be 
built in order to have the RDD. It is a long standing problem that's 
unfortunately very difficult to solve without breaking the RDD API.

In DataFrame/Dataset we don't have this issue though.


On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao  
wrote:
Hi,

I have a streaming program with the block as below [ref: 
https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala
]

1 val lines = messages.map(_._2)
2 val hashTags = lines.flatMap(status => status.split(" "
).filter(_.startsWith("#")))

3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ )
3a .map { case (topic, count) => (count, topic) }
3b .transform(_.sortByKey(false))

4atopCounts60.foreachRDD( rdd => {
4b val topList = rdd.take( 10 )
})

This batch is triggering 2 jobs...one at line 3b(sortByKey)  and 
the other at 4b (rdd.take) I agree that there is a Job triggered on line 
4b as take() is an action on RDD while as on line 3b sortByKey is just a 
transformation function which as per docs is lazy evaluation...but I see 
that this line uses a RangePartitioner and Rangepartitioner on 
initialization invokes a method called sketch() that invokes collect() 
triggering a Job.

My question: Is it expected that sortByKey will invoke a Job...if 
yes, why is sortByKey listed as a transformation and not action. Are there 
any other functions like this that invoke a Job, though they are 
transformations and not actions?

I am on Spark 1.6

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"