So I finally figured it out on my own. I had to basically write my custom 
ordering function as a separate scala project and then call that in clojure.

I had my scala file written in this manner:


import org.apache.spark.Partitionerimport org.apache.spark.rdd.RDD
case class RFMCKey(cId: String, R: Double, F: Long, M: Double, C: Double)class 
RFMCPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, "Number of partitions ($partitions) cannot be 
negative.")
  override def numPartitions: Int = partitions
  override def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[RFMCKey]
    k.cId.hashCode() % numPartitions
  }}object RFMCKey {
  implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
    Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
  }}
class rfmcSort {
  def sortWithRFMC(a: RDD[(String, (((Double, Long), Double), Double))], parts: 
Int): RDD[(RFMCKey, String)] = {
    val x = a.map(v => v match {
                case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, 
rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal)
            }).repartitionAndSortWithinPartitions(new RFMCPartitioner(parts))
    x
  }}

I compiled it as ascala project and used it in my clojure code this way:


(:import [org.formcept.wisdom rfmcSort]
         [org.apache.spark.rdd.RDD])

sorted-rfmc-records (.toJavaRDD (.sortWithRFMC (rfmcSort.) (.rdd rfmc-records) 
num_partitions))

Please notice the way I am calling the sortWithRFMC function from the 
rfmcSort object that I created. Also one very important thing to note here 
is when you pass your JavaPairRDD to your scala function, you have to 
convert it into a normal spark RDD first by calling the .rdd method on it. 
And then you have to convert the spark RDD back to JavaPairRDD to work with 
it in clojure.


And sorry that I got your name wrong *Blake :)

On Tuesday, July 12, 2016 at 12:16:42 AM UTC+5:30, Punit Naik wrote:
>
> Hi Black
>
> Thanks for the reply but  figured it out on my own. Posting the answer 
> after this.
>
> On Monday, July 11, 2016 at 11:42:10 PM UTC+5:30, Blake Miller wrote:
>>
>> Hi Punit
>>
>> The behavior you are referring to is a feature of the Scala compiler, 
>> which is why it does not happen automatically when you try to use it from 
>> Clojure.
>>
>> Please see the note here:
>>
>>
>> https://github.com/t6/from-scala/blob/4e1752aaa2ef835dd67a8404273bee067510a431/test/t6/from_scala/guide.clj#L161-L166
>>
>> You may find that library a useful resource, either as a dependency or 
>> simply as reference material.
>>
>> What you want to do is find the full method signature, including the 
>> implicits, and invoke _that_ from clojure, passing values for all implicit 
>> parameters (in this case, your custom ordering function.
>>
>> HTH
>>
>> On Saturday, July 9, 2016 at 6:13:17 AM UTC, Punit Naik wrote:
>>>
>>> Hi Ashish
>>>
>>> The "package" is indeed the full package name.
>>> On 09-Jul-2016 11:02 AM, "Ashish Negi" <thisismy...@gmail.com> wrote:
>>>
>>>> Should not be `package` in `:import` be the actual package name of  `
>>>> RFMCPartitioner` ?
>>>>
>>>> see examples at https://clojuredocs.org/clojure.core/import
>>>>
>>>> like :
>>>>
>>>> (ns foo.bar
>>>>   (:import (java.util Date
>>>>                       Calendar)
>>>>            (java.util.logging Logger
>>>>                               Level)))
>>>>
>>>>
>>>>
>>>> (ns xyz
>>>>   (:import
>>>>     [**  RFMCPartitioner]
>>>>     [** RFMCKey]
>>>>     )
>>>>   )
>>>>
>>>>
>>>> where ** is package full name.
>>>>
>>>>
>>>>
>>>> On Friday, 8 July 2016 21:31:27 UTC+5:30, Punit Naik wrote:
>>>>>
>>>>>
>>>>>          
>>>>>
>>>>> I have a scala program in which I have implemented a secondary sort 
>>>>> which works perfectly. The way I have written that program is:
>>>>>
>>>>> object rfmc {
>>>>>   // Custom Key and partitioner
>>>>>
>>>>>   case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: 
>>>>> Double)
>>>>>   class RFMCPartitioner(partitions: Int) extends Partitioner {
>>>>>     require(partitions >= 0, "Number of partitions ($partitions) cannot 
>>>>> be negative.")
>>>>>     override def numPartitions: Int = partitions
>>>>>     override def getPartition(key: Any): Int = {
>>>>>       val k = key.asInstanceOf[RFMCKey]
>>>>>       k.cId.hashCode() % numPartitions
>>>>>     }
>>>>>   }
>>>>>   object RFMCKey {
>>>>>     implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
>>>>>       Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
>>>>>     }
>>>>>   }
>>>>>   // The body of the code
>>>>>   //
>>>>>   //
>>>>>   val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c)
>>>>>   val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))}
>>>>>
>>>>> I wanted to implement the same thing using clojure's DSL for spark 
>>>>> called flambo. Since I can't write partitioner using clojure, I re-used 
>>>>> the 
>>>>> code defind above, compiled it and used it as a dependency in my Clojure 
>>>>> code.
>>>>>
>>>>> Now I am importing the partitioner and the key in my clojure code the 
>>>>> following way:
>>>>>
>>>>> (ns xyz
>>>>>   (:import
>>>>>     [package RFMCPartitioner]
>>>>>     [package RFMCKey]
>>>>>     )
>>>>>   )
>>>>>
>>>>> But when I try to create RFMCKey by doing (RFMCKey. cust_id r f m c), 
>>>>> it throws the following error:
>>>>>
>>>>> java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast 
>>>>> to java.lang.Comparable
>>>>>     at 
>>>>> org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
>>>>>     at 
>>>>> scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
>>>>>     at 
>>>>> org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170)
>>>>>     at 
>>>>> org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164)
>>>>>     at 
>>>>> org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252)
>>>>>     at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110)
>>>>>     at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>>>>>     at 
>>>>> org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83)
>>>>>     at 
>>>>> org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687)
>>>>>     at 
>>>>> org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705)
>>>>>     at 
>>>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
>>>>>     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>     at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>     at 
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>     at 
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>     at 
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> My guess is that its not able to find the ordering that I have defined 
>>>>> after the partitioner. But if it works in Scala, why doesn't it work in 
>>>>> Clojure?
>>>>>
>>>> -- 
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Clojure" group.
>>>> To post to this group, send email to clo...@googlegroups.com
>>>> Note that posts from new members are moderated - please be patient with 
>>>> your first post.
>>>> To unsubscribe from this group, send email to
>>>> clojure+u...@googlegroups.com
>>>> For more options, visit this group at
>>>> http://groups.google.com/group/clojure?hl=en
>>>> --- 
>>>> You received this message because you are subscribed to a topic in the 
>>>> Google Groups "Clojure" group.
>>>> To unsubscribe from this topic, visit 
>>>> https://groups.google.com/d/topic/clojure/ZoLWl_vbcdU/unsubscribe.
>>>> To unsubscribe from this group and all its topics, send an email to 
>>>> clojure+u...@googlegroups.com.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to