Hello ! You could try something like that :
def exists[T](rdd:RDD[T])(f:T=>Boolean, n:Long):Boolean = { val context: SparkContext = rdd.sparkContext val grp: String = Random.alphanumeric.take(10).mkString context.setJobGroup(grp, "exist") val count: Accumulator[Long] = context.accumulator(0L) val iteratorToInt: (Iterator[T]) => Int = { iterator => val i: Int = iterator.count(f) count += i i } val t = new Thread { override def run { while (count.value < n) {} context.cancelJobGroup(grp) } } t.start() try { context.runJob(rdd, iteratorToInt) > n } catch { case e:SparkException => { count.value > n } } finally { t.stop() } } It stops the computation if enough elements satisfying the condition are witnessed. It is performant if the RDD is well partitioned. If this is a problem, you could change iteratorToInt to : val iteratorToInt: (Iterator[T]) => Int = { iterator => val i: Int = iterator.count(x => { if(f(x)) { count += 1 true } else false }) i } I am interested in a safer way to perform partial computation in spark. Cheers, Jonathan On 5 August 2015 at 18:54, Feynman Liang <fli...@databricks.com> wrote: > qualifying_function() will be executed on each partition in parallel; > stopping all parallel execution after the first instance satisfying > qualifying_function() would mean that you would have to effectively make > the computation sequential. > > On Wed, Aug 5, 2015 at 9:05 AM, Sandeep Giri <sand...@knowbigdata.com> > wrote: > >> Okay. I think I got it now. Yes take() does not need to be called more >> than once. I got the impression that we wanted to bring elements to the >> driver node and then run out qualifying_function on driver_node. >> >> Now, I am back to my question which I started with: Could there be an >> approach where the qualifying_function() does not get called after an >> element has been found? >> >> >> Regards, >> Sandeep Giri, >> +1 347 781 4573 (US) >> +91-953-899-8962 (IN) >> >> www.KnowBigData.com. <http://KnowBigData.com.> >> Phone: +1-253-397-1945 (Office) >> >> [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image: >> other site icon] <http://knowbigdata.com> [image: facebook icon] >> <https://facebook.com/knowbigdata> [image: twitter icon] >> <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData> >> >> >> On Wed, Aug 5, 2015 at 9:21 PM, Sean Owen <so...@cloudera.com> wrote: >> >>> take only brings n elements to the driver, which is probably still a win >>> if n is small. I'm not sure what you mean by only taking a count argument >>> -- what else would be an arg to take? >>> >>> On Wed, Aug 5, 2015 at 4:49 PM, Sandeep Giri <sand...@knowbigdata.com> >>> wrote: >>> >>>> Yes, but in the take() approach we will be bringing the data to the >>>> driver and is no longer distributed. >>>> >>>> Also, the take() takes only count as argument which means that every >>>> time we would transferring the redundant elements. >>>> >>>> >> >