Hello !

You could try something like that :

def exists[T](rdd:RDD[T])(f:T=>Boolean, n:Long):Boolean = {

  val context: SparkContext = rdd.sparkContext
  val grp: String = Random.alphanumeric.take(10).mkString
  context.setJobGroup(grp, "exist")
  val count: Accumulator[Long] = context.accumulator(0L)

  val iteratorToInt: (Iterator[T]) => Int = {
    iterator =>
      val i: Int = iterator.count(f)
      count += i
      i
  }

  val t = new Thread {
    override def run {
      while (count.value < n) {}
      context.cancelJobGroup(grp)
    }
  }
  t.start()
  try {
    context.runJob(rdd, iteratorToInt) > n
  } catch  {
    case e:SparkException => {
      count.value > n
    }
  } finally {
    t.stop()
  }

}



It stops the computation if enough elements satisfying the condition are
witnessed.

It is performant if the RDD is well partitioned. If this is a problem, you
could change iteratorToInt to :

val iteratorToInt: (Iterator[T]) => Int = {
  iterator =>
    val i: Int = iterator.count(x => {
      if(f(x)) {
        count += 1
        true
      } else false
    })
    i

}


I am interested in a safer way to perform partial computation in spark.

Cheers,
Jonathan

On 5 August 2015 at 18:54, Feynman Liang <fli...@databricks.com> wrote:

> qualifying_function() will be executed on each partition in parallel;
> stopping all parallel execution after the first instance satisfying
> qualifying_function() would mean that you would have to effectively make
> the computation sequential.
>
> On Wed, Aug 5, 2015 at 9:05 AM, Sandeep Giri <sand...@knowbigdata.com>
> wrote:
>
>> Okay. I think I got it now. Yes take() does not need to be called more
>> than once. I got the impression that we wanted to bring elements to the
>> driver node and then run out qualifying_function on driver_node.
>>
>> Now, I am back to my question which I started with: Could there be an
>> approach where the qualifying_function() does not get called after an
>> element has been found?
>>
>>
>> Regards,
>> Sandeep Giri,
>> +1 347 781 4573 (US)
>> +91-953-899-8962 (IN)
>>
>> www.KnowBigData.com. <http://KnowBigData.com.>
>> Phone: +1-253-397-1945 (Office)
>>
>> [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image:
>> other site icon] <http://knowbigdata.com>  [image: facebook icon]
>> <https://facebook.com/knowbigdata> [image: twitter icon]
>> <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData>
>>
>>
>> On Wed, Aug 5, 2015 at 9:21 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> take only brings n elements to the driver, which is probably still a win
>>> if n is small. I'm not sure what you mean by only taking a count argument
>>> -- what else would be an arg to take?
>>>
>>> On Wed, Aug 5, 2015 at 4:49 PM, Sandeep Giri <sand...@knowbigdata.com>
>>> wrote:
>>>
>>>> Yes, but in the take() approach we will be bringing the data to the
>>>> driver and is no longer distributed.
>>>>
>>>> Also, the take() takes only count as argument which means that every
>>>> time we would transferring the redundant elements.
>>>>
>>>>
>>
>
  • [no subject] Sandeep Giri
    • Re: Sean Owen
      • Re: Sandeep Giri
        • Re: Feynman Liang
          • Re: Jonathan Winandy
            • Re: Jonathan Winandy
              • Re: Sandeep Giri

Reply via email to