Looks good.

Regards,
Sandeep Giri,
+1 347 781 4573 (US)
+91-953-899-8962 (IN)

www.KnowBigData.com. <http://KnowBigData.com.>
Phone: +1-253-397-1945 (Office)

[image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image:
other site icon] <http://knowbigdata.com>  [image: facebook icon]
<https://facebook.com/knowbigdata> [image: twitter icon]
<https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData>


On Thu, Aug 6, 2015 at 3:02 PM, Jonathan Winandy <jonathan.wina...@gmail.com
> wrote:

> Hello !
>
> I think I found a performant and nice solution based on take' source code
> :
>
> def exists[T](rdd: RDD[T])(qualif: T => Boolean, num: Int): Boolean = {
>   if (num == 0) {
>     true
>   } else {
>     var count: Int = 0
>     val totalParts: Int = rdd.partitions.length
>     var partsScanned: Int = 0
>     while (count < num && partsScanned < totalParts) {
>       var numPartsToTry: Int = 1
>       if (partsScanned > 0) {
>         if (count == 0) {
>           numPartsToTry = partsScanned * 4
>         } else {
>           numPartsToTry = Math.max((1.5 * num * partsScanned / count).toInt - 
> partsScanned, 1)
>           numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
>         }
>       }
>
>       val left: Int = num - count
>       val p: Range = partsScanned until math.min(partsScanned + 
> numPartsToTry, totalParts)
>       val res: Array[Int] = rdd.sparkContext.runJob(rdd, (it: Iterator[T]) => 
> it.filter(qualif).take(left).size, p, allowLocal = true)
>
>       count = count + res.sum
>       partsScanned += numPartsToTry
>     }
>
>     count >= num
>   }
> }
>
> //val all:RDD[Any]
> println(exists(all)(_ => {println(".") ; true}, 10))
>
> It's super fast for small values of n and I think it parallelise nicely for 
> large values.
>
> Please tell me what you think.
>
>
> Have a nice day,
>
> Jonathan
>
>
>
>
>
> On 5 August 2015 at 19:18, Jonathan Winandy <jonathan.wina...@gmail.com>
> wrote:
>
>> Hello !
>>
>> You could try something like that :
>>
>> def exists[T](rdd:RDD[T])(f:T=>Boolean, n:Long):Boolean = {
>>
>>   val context: SparkContext = rdd.sparkContext
>>   val grp: String = Random.alphanumeric.take(10).mkString
>>   context.setJobGroup(grp, "exist")
>>   val count: Accumulator[Long] = context.accumulator(0L)
>>
>>   val iteratorToInt: (Iterator[T]) => Int = {
>>     iterator =>
>>       val i: Int = iterator.count(f)
>>       count += i
>>       i
>>   }
>>
>>   val t = new Thread {
>>     override def run {
>>       while (count.value < n) {}
>>       context.cancelJobGroup(grp)
>>     }
>>   }
>>   t.start()
>>   try {
>>     context.runJob(rdd, iteratorToInt) > n
>>   } catch  {
>>     case e:SparkException => {
>>       count.value > n
>>     }
>>   } finally {
>>     t.stop()
>>   }
>>
>> }
>>
>>
>>
>> It stops the computation if enough elements satisfying the condition are
>> witnessed.
>>
>> It is performant if the RDD is well partitioned. If this is a problem,
>> you could change iteratorToInt to :
>>
>> val iteratorToInt: (Iterator[T]) => Int = {
>>   iterator =>
>>     val i: Int = iterator.count(x => {
>>       if(f(x)) {
>>         count += 1
>>         true
>>       } else false
>>     })
>>     i
>>
>> }
>>
>>
>> I am interested in a safer way to perform partial computation in spark.
>>
>> Cheers,
>> Jonathan
>>
>> On 5 August 2015 at 18:54, Feynman Liang <fli...@databricks.com> wrote:
>>
>>> qualifying_function() will be executed on each partition in parallel;
>>> stopping all parallel execution after the first instance satisfying
>>> qualifying_function() would mean that you would have to effectively make
>>> the computation sequential.
>>>
>>> On Wed, Aug 5, 2015 at 9:05 AM, Sandeep Giri <sand...@knowbigdata.com>
>>> wrote:
>>>
>>>> Okay. I think I got it now. Yes take() does not need to be called more
>>>> than once. I got the impression that we wanted to bring elements to the
>>>> driver node and then run out qualifying_function on driver_node.
>>>>
>>>> Now, I am back to my question which I started with: Could there be an
>>>> approach where the qualifying_function() does not get called after an
>>>> element has been found?
>>>>
>>>>
>>>> Regards,
>>>> Sandeep Giri,
>>>> +1 347 781 4573 (US)
>>>> +91-953-899-8962 (IN)
>>>>
>>>> www.KnowBigData.com. <http://KnowBigData.com.>
>>>> Phone: +1-253-397-1945 (Office)
>>>>
>>>> [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image:
>>>> other site icon] <http://knowbigdata.com>  [image: facebook icon]
>>>> <https://facebook.com/knowbigdata> [image: twitter icon]
>>>> <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData>
>>>>
>>>>
>>>> On Wed, Aug 5, 2015 at 9:21 PM, Sean Owen <so...@cloudera.com> wrote:
>>>>
>>>>> take only brings n elements to the driver, which is probably still a
>>>>> win if n is small. I'm not sure what you mean by only taking a count
>>>>> argument -- what else would be an arg to take?
>>>>>
>>>>> On Wed, Aug 5, 2015 at 4:49 PM, Sandeep Giri <sand...@knowbigdata.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, but in the take() approach we will be bringing the data to the
>>>>>> driver and is no longer distributed.
>>>>>>
>>>>>> Also, the take() takes only count as argument which means that every
>>>>>> time we would transferring the redundant elements.
>>>>>>
>>>>>>
>>>>
>>>
>>
>
  • [no subject] Sandeep Giri
    • Re: Sean Owen
      • Re: Sandeep Giri
        • Re: Feynman Liang
          • Re: Jonathan Winandy
            • Re: Jonathan Winandy
              • Re: Sandeep Giri

Reply via email to