I'm running into all kinds of problems with Spark 1.5.1 -- does anyone have
a version that's working smoothly for them?

On Fri, Nov 20, 2015 at 10:50 AM, Dean Wampler <deanwamp...@gmail.com>
wrote:

> I didn't expect that to fail. I would call it a bug for sure, since it's
> practically useless if this method doesn't work.
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Fri, Nov 20, 2015 at 12:45 PM, Walrus theCat <walrusthe...@gmail.com>
> wrote:
>
>> Dean,
>>
>> What's the point of Scala without magic? :-)
>>
>> Thanks for your help.  It's still giving me unreliable results.  There
>> just has to be a way to do this in Spark.  It's a pretty fundamental thing.
>>
>> scala> targets.takeOrdered(1) // imported as implicit here
>> res23: Array[(String, Int)] = Array()
>>
>> scala> targets.takeOrdered(1)(CountOrdering)
>> res24: Array[(String, Int)] = Array((\bmurders?\b,717))
>>
>> scala> targets.takeOrdered(1)(CountOrdering)
>> res25: Array[(String, Int)] = Array((\bmurders?\b,717))
>>
>> scala> targets.takeOrdered(1)(CountOrdering)
>> res26: Array[(String, Int)] = Array((\bguns?\b,1253))
>>
>> scala> targets.takeOrdered(1)(CountOrdering)
>> res27: Array[(String, Int)] = Array((\bmurders?\b,717))
>>
>>
>>
>> On Wed, Nov 18, 2015 at 6:20 PM, Dean Wampler <deanwamp...@gmail.com>
>> wrote:
>>
>>> You don't have to use sortBy (although that would be better...). You
>>> have to define an Ordering object and pass it as the second argument list
>>> to takeOrdered()(), or declare it "implicitly". This is more fancy Scala
>>> than Spark should require here. Here's an example I've used:
>>>
>>>   // schema with (String,Int). Order by the Int descending
>>>   object CountOrdering extends Ordering[(String,Int)] {
>>>     def compare(a:(String,Int), b:(String,Int)) =
>>>       -(a._2 compare b._2)  // - so that it sorts descending
>>>   }
>>>
>>>   myRDD.takeOrdered(100)(CountOrdering)
>>>
>>>
>>> Or, if you add the keyword "implicit" before "object CountOrdering
>>> {...}", then you can omit the second argument list. That's more magic than
>>> is justified. ;)
>>>
>>> HTH,
>>>
>>> dean
>>>
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>> Typesafe <http://typesafe.com>
>>> @deanwampler <http://twitter.com/deanwampler>
>>> http://polyglotprogramming.com
>>>
>>> On Wed, Nov 18, 2015 at 6:37 PM, Walrus theCat <walrusthe...@gmail.com>
>>> wrote:
>>>
>>>> Dean,
>>>>
>>>> Thanks a lot.  Very helpful.  How would I use takeOrdered to order by
>>>> the second member of the tuple, as I am attempting to do with
>>>> rdd.sortBy(_._2).first?
>>>>
>>>> On Wed, Nov 18, 2015 at 4:24 PM, Dean Wampler <deanwamp...@gmail.com>
>>>> wrote:
>>>>
>>>>> Someone please correct me if I'm wrong, but I think the answer is
>>>>> actually "it's not implemented that way" in the sort methods, and it 
>>>>> should
>>>>> either be documented more explicitly or fixed.
>>>>>
>>>>> Reading the Spark source code, it looks like each partition is sorted
>>>>> internally, and each partition holds a contiguous range of keys in the 
>>>>> RDD.
>>>>> So, if you know which order the partitions should be in, you can produce a
>>>>> total order and hence allow take(n) to do what you expect.
>>>>>
>>>>> The take(n) appears to walk the list of partitions in order, but it's
>>>>> that list that's not deterministic. I can't find any evidence that the RDD
>>>>> output by sortBy has this list of partitions in the correct order. So, 
>>>>> each
>>>>> time you ran your job, the "targets" RDD had sorted partitions, but the
>>>>> list of partitions itself was not properly ordered globally. When you got
>>>>> an exception, probably the first partition happened to be empty.
>>>>>
>>>>> Now, you could argue that take(n) is a "debug" method and the
>>>>> performance implications of getting the RDD.partitions list in total order
>>>>> is not justified. There is a takeOrdered(n) method that is both much more
>>>>> efficient than sort().take(n), and it does the correct thing. Still, at 
>>>>> the
>>>>> very least, the documentation for take(n) should tell you what to expect.
>>>>>
>>>>> Hope I'm right and this helps!
>>>>>
>>>>> dean
>>>>>
>>>>> Dean Wampler, Ph.D.
>>>>> Author: Programming Scala, 2nd Edition
>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>> Typesafe <http://typesafe.com>
>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>> http://polyglotprogramming.com
>>>>>
>>>>> On Wed, Nov 18, 2015 at 5:53 PM, Walrus theCat <walrusthe...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Dean,
>>>>>>
>>>>>> Thanks for the insight.  Shouldn't take(n) or first return the same
>>>>>> result, provided that the RDD is sorted?  If I specify that the RDD is
>>>>>> ordered, I need to have guarantees as I reason about it that the first 
>>>>>> item
>>>>>> is in fact the first, and the last is the last.
>>>>>>
>>>>>> On Wed, Nov 18, 2015 at 3:16 PM, Dean Wampler <deanwamp...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Methods like first() and take(n) can't guarantee to return the same
>>>>>>> result in a distributed context, because Spark uses an algorithm to grab
>>>>>>> data from one or more partitions that involves running a distributed job
>>>>>>> over the cluster, with tasks on the nodes where the chosen partitions 
>>>>>>> are
>>>>>>> located.  You can look at the logic in the Spark code base, RDD.scala
>>>>>>> (first method calls the take method) and SparkContext.scala (runJob 
>>>>>>> method,
>>>>>>> which take calls).
>>>>>>>
>>>>>>> However, the exceptions definitely look like bugs to me. There must
>>>>>>> be some empty partitions.
>>>>>>>
>>>>>>> dean
>>>>>>>
>>>>>>> Dean Wampler, Ph.D.
>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>> Typesafe <http://typesafe.com>
>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>> http://polyglotprogramming.com
>>>>>>>
>>>>>>> On Wed, Nov 18, 2015 at 4:52 PM, Walrus theCat <
>>>>>>> walrusthe...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm launching a Spark cluster with the spark-ec2 script and playing
>>>>>>>> around in spark-shell. I'm running the same line of code over and over
>>>>>>>> again, and getting different results, and sometimes exceptions.  
>>>>>>>> Towards
>>>>>>>> the end, after I cache the first RDD, it gives me the correct result
>>>>>>>> multiple times in a row before throwing an exception.  How can I get
>>>>>>>> correct behavior out of these operations on these RDDs?
>>>>>>>>
>>>>>>>> scala> val targets =
>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>> MapPartitionsRDD[116] at sortBy at <console>:36
>>>>>>>>
>>>>>>>> scala> targets.first
>>>>>>>> res26: (String, Int) = (\bguns?\b,1253)
>>>>>>>>
>>>>>>>> scala> val targets = data map {_.REGEX} groupBy{identity} map {
>>>>>>>> Function.tupled(_->_.size)} sortBy(_._2,false)
>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>> MapPartitionsRDD[125] at sortBy at <console>:36
>>>>>>>>
>>>>>>>> scala> targets.first
>>>>>>>> res27: (String, Int) = (nika,7)
>>>>>>>>
>>>>>>>>
>>>>>>>> scala> val targets =
>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>> MapPartitionsRDD[134] at sortBy at <console>:36
>>>>>>>>
>>>>>>>> scala> targets.first
>>>>>>>> res28: (String, Int) = (\bcalientes?\b,6)
>>>>>>>>
>>>>>>>> scala> targets.sortBy(_._2,false).first
>>>>>>>> java.lang.UnsupportedOperationException: empty collection
>>>>>>>>
>>>>>>>> scala> val targets =
>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>> MapPartitionsRDD[283] at sortBy at <console>:36
>>>>>>>>
>>>>>>>> scala> targets.first
>>>>>>>> res46: (String, Int) = (\bhurting\ yous?\b,8)
>>>>>>>>
>>>>>>>> scala> val targets =
>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>> MapPartitionsRDD[292] at sortBy at <console>:36
>>>>>>>>
>>>>>>>> scala> targets.first
>>>>>>>> java.lang.UnsupportedOperationException: empty collection
>>>>>>>>
>>>>>>>> scala> val targets =
>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>> MapPartitionsRDD[301] at sortBy at <console>:36
>>>>>>>>
>>>>>>>> scala> targets.first
>>>>>>>> res48: (String, Int) = (\bguns?\b,1253)
>>>>>>>>
>>>>>>>> scala> val targets =
>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>> MapPartitionsRDD[310] at sortBy at <console>:36
>>>>>>>>
>>>>>>>> scala> targets.first
>>>>>>>> res49: (String, Int) = (\bguns?\b,1253)
>>>>>>>>
>>>>>>>> scala> val targets =
>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>> MapPartitionsRDD[319] at sortBy at <console>:36
>>>>>>>>
>>>>>>>> scala> targets.first
>>>>>>>> res50: (String, Int) = (\bguns?\b,1253)
>>>>>>>>
>>>>>>>> scala> val targets =
>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>> MapPartitionsRDD[328] at sortBy at <console>:36
>>>>>>>>
>>>>>>>> scala> targets.first
>>>>>>>> java.lang.UnsupportedOperationException: empty collection
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to