Looks good. Regards, Sandeep Giri, +1 347 781 4573 (US) +91-953-899-8962 (IN)
www.KnowBigData.com. <http://KnowBigData.com.> Phone: +1-253-397-1945 (Office) [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image: other site icon] <http://knowbigdata.com> [image: facebook icon] <https://facebook.com/knowbigdata> [image: twitter icon] <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData> On Thu, Aug 6, 2015 at 3:02 PM, Jonathan Winandy <jonathan.wina...@gmail.com > wrote: > Hello ! > > I think I found a performant and nice solution based on take' source code > : > > def exists[T](rdd: RDD[T])(qualif: T => Boolean, num: Int): Boolean = { > if (num == 0) { > true > } else { > var count: Int = 0 > val totalParts: Int = rdd.partitions.length > var partsScanned: Int = 0 > while (count < num && partsScanned < totalParts) { > var numPartsToTry: Int = 1 > if (partsScanned > 0) { > if (count == 0) { > numPartsToTry = partsScanned * 4 > } else { > numPartsToTry = Math.max((1.5 * num * partsScanned / count).toInt - > partsScanned, 1) > numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) > } > } > > val left: Int = num - count > val p: Range = partsScanned until math.min(partsScanned + > numPartsToTry, totalParts) > val res: Array[Int] = rdd.sparkContext.runJob(rdd, (it: Iterator[T]) => > it.filter(qualif).take(left).size, p, allowLocal = true) > > count = count + res.sum > partsScanned += numPartsToTry > } > > count >= num > } > } > > //val all:RDD[Any] > println(exists(all)(_ => {println(".") ; true}, 10)) > > It's super fast for small values of n and I think it parallelise nicely for > large values. > > Please tell me what you think. > > > Have a nice day, > > Jonathan > > > > > > On 5 August 2015 at 19:18, Jonathan Winandy <jonathan.wina...@gmail.com> > wrote: > >> Hello ! >> >> You could try something like that : >> >> def exists[T](rdd:RDD[T])(f:T=>Boolean, n:Long):Boolean = { >> >> val context: SparkContext = rdd.sparkContext >> val grp: String = Random.alphanumeric.take(10).mkString >> context.setJobGroup(grp, "exist") >> val count: Accumulator[Long] = context.accumulator(0L) >> >> val iteratorToInt: (Iterator[T]) => Int = { >> iterator => >> val i: Int = iterator.count(f) >> count += i >> i >> } >> >> val t = new Thread { >> override def run { >> while (count.value < n) {} >> context.cancelJobGroup(grp) >> } >> } >> t.start() >> try { >> context.runJob(rdd, iteratorToInt) > n >> } catch { >> case e:SparkException => { >> count.value > n >> } >> } finally { >> t.stop() >> } >> >> } >> >> >> >> It stops the computation if enough elements satisfying the condition are >> witnessed. >> >> It is performant if the RDD is well partitioned. If this is a problem, >> you could change iteratorToInt to : >> >> val iteratorToInt: (Iterator[T]) => Int = { >> iterator => >> val i: Int = iterator.count(x => { >> if(f(x)) { >> count += 1 >> true >> } else false >> }) >> i >> >> } >> >> >> I am interested in a safer way to perform partial computation in spark. >> >> Cheers, >> Jonathan >> >> On 5 August 2015 at 18:54, Feynman Liang <fli...@databricks.com> wrote: >> >>> qualifying_function() will be executed on each partition in parallel; >>> stopping all parallel execution after the first instance satisfying >>> qualifying_function() would mean that you would have to effectively make >>> the computation sequential. >>> >>> On Wed, Aug 5, 2015 at 9:05 AM, Sandeep Giri <sand...@knowbigdata.com> >>> wrote: >>> >>>> Okay. I think I got it now. Yes take() does not need to be called more >>>> than once. I got the impression that we wanted to bring elements to the >>>> driver node and then run out qualifying_function on driver_node. >>>> >>>> Now, I am back to my question which I started with: Could there be an >>>> approach where the qualifying_function() does not get called after an >>>> element has been found? >>>> >>>> >>>> Regards, >>>> Sandeep Giri, >>>> +1 347 781 4573 (US) >>>> +91-953-899-8962 (IN) >>>> >>>> www.KnowBigData.com. <http://KnowBigData.com.> >>>> Phone: +1-253-397-1945 (Office) >>>> >>>> [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image: >>>> other site icon] <http://knowbigdata.com> [image: facebook icon] >>>> <https://facebook.com/knowbigdata> [image: twitter icon] >>>> <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData> >>>> >>>> >>>> On Wed, Aug 5, 2015 at 9:21 PM, Sean Owen <so...@cloudera.com> wrote: >>>> >>>>> take only brings n elements to the driver, which is probably still a >>>>> win if n is small. I'm not sure what you mean by only taking a count >>>>> argument -- what else would be an arg to take? >>>>> >>>>> On Wed, Aug 5, 2015 at 4:49 PM, Sandeep Giri <sand...@knowbigdata.com> >>>>> wrote: >>>>> >>>>>> Yes, but in the take() approach we will be bringing the data to the >>>>>> driver and is no longer distributed. >>>>>> >>>>>> Also, the take() takes only count as argument which means that every >>>>>> time we would transferring the redundant elements. >>>>>> >>>>>> >>>> >>> >> >