Building on what Davies Liu said, How about something like: def indexing(splitIndex, iterator,*offset_lists* ): count = 0 offset = sum(*offset_lists*[:splitIndex]) if splitIndex else 0 indexed = [] for i, e in enumerate(iterator): index = count + offset + i for j, ele in enumerate(e): indexed.append((index, j, ele)) yield indexed
def another_funct(offset_lists): *#get that damn offset_lists* #rdd.mapPartitionsWithSplit(indexing) *rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it, offset_lists))* Ps. Probably in indexing function, count variable is not really being effective? On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu <chengi.liu...@gmail.com> wrote: > Hi, > Thanks for the response.. > In the second case f2?? > foo will have to be declared globablly??right?? > > My function is somthing like: > def indexing(splitIndex, iterator): > count = 0 > offset = sum(*offset_lists*[:splitIndex]) if splitIndex else 0 > indexed = [] > for i, e in enumerate(iterator): > index = count + offset + i > for j, ele in enumerate(e): > indexed.append((index, j, ele)) > yield indexed > > def another_funct(offset_lists): > *#get that damn offset_lists* > rdd.mapPartitionsWithSplit(indexing) > But then, the issue is that offset_lists? > Any suggestions? > > > On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu <dav...@databricks.com> > wrote: > >> The callback function f only accept 2 arguments, if you want to pass >> another objects to it, you need closure, such as: >> >> foo=xxx >> def f(index, iterator, foo): >> yield (index, foo) >> rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo)) >> >> also you can make f become `closure`: >> >> def f2(index, iterator): >> yield (index, foo) >> rdd.mapPartitionsWithIndex(f2) >> >> On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu <chengi.liu...@gmail.com> >> wrote: >> > Hi, >> > In this example: >> > >> http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit >> > Let say, f takes three arguments: >> > def f(splitIndex, iterator, foo): yield splitIndex >> > Now, how do i send this foo parameter to this method? >> > rdd.mapPartitionsWithSplit(f) >> > Thanks >> > >> > > -- Mohit "When you want success as badly as you want the air, then you will get it. There is no other secret of success." -Socrates