Hello !
You could try something like that :
def exists[T](rdd:RDD[T])(f:T=>Boolean, n:Long):Boolean = {
val context: SparkContext = rdd.sparkContext
val grp: String = Random.alphanumeric.take(10).mkString
context.setJobGroup(grp, "exist")
val count: Accumulator[Long] = context.accumulator(0L)
val iteratorToInt: (Iterator[T]) => Int = {
iterator =>
val i: Int = iterator.count(f)
count += i
i
}
val t = new Thread {
override def run {
while (count.value < n) {}
context.cancelJobGroup(grp)
}
}
t.start()
try {
context.runJob(rdd, iteratorToInt) > n
} catch {
case e:SparkException => {
count.value > n
}
} finally {
t.stop()
}
}
It stops the computation if enough elements satisfying the condition are
witnessed.
It is performant if the RDD is well partitioned. If this is a problem, you
could change iteratorToInt to :
val iteratorToInt: (Iterator[T]) => Int = {
iterator =>
val i: Int = iterator.count(x => {
if(f(x)) {
count += 1
true
} else false
})
i
}
I am interested in a safer way to perform partial computation in spark.
Cheers,
Jonathan
On 5 August 2015 at 18:54, Feynman Liang <[email protected]> wrote:
> qualifying_function() will be executed on each partition in parallel;
> stopping all parallel execution after the first instance satisfying
> qualifying_function() would mean that you would have to effectively make
> the computation sequential.
>
> On Wed, Aug 5, 2015 at 9:05 AM, Sandeep Giri <[email protected]>
> wrote:
>
>> Okay. I think I got it now. Yes take() does not need to be called more
>> than once. I got the impression that we wanted to bring elements to the
>> driver node and then run out qualifying_function on driver_node.
>>
>> Now, I am back to my question which I started with: Could there be an
>> approach where the qualifying_function() does not get called after an
>> element has been found?
>>
>>
>> Regards,
>> Sandeep Giri,
>> +1 347 781 4573 (US)
>> +91-953-899-8962 (IN)
>>
>> www.KnowBigData.com. <http://KnowBigData.com.>
>> Phone: +1-253-397-1945 (Office)
>>
>> [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image:
>> other site icon] <http://knowbigdata.com> [image: facebook icon]
>> <https://facebook.com/knowbigdata> [image: twitter icon]
>> <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData>
>>
>>
>> On Wed, Aug 5, 2015 at 9:21 PM, Sean Owen <[email protected]> wrote:
>>
>>> take only brings n elements to the driver, which is probably still a win
>>> if n is small. I'm not sure what you mean by only taking a count argument
>>> -- what else would be an arg to take?
>>>
>>> On Wed, Aug 5, 2015 at 4:49 PM, Sandeep Giri <[email protected]>
>>> wrote:
>>>
>>>> Yes, but in the take() approach we will be bringing the data to the
>>>> driver and is no longer distributed.
>>>>
>>>> Also, the take() takes only count as argument which means that every
>>>> time we would transferring the redundant elements.
>>>>
>>>>
>>
>