Building on what Davies Liu said,
How about something like:
def indexing(splitIndex, iterator,*offset_lists* ):
  count = 0
  offset = sum(*offset_lists*[:splitIndex]) if splitIndex else 0
  indexed = []
  for i, e in enumerate(iterator):
    index = count + offset + i
    for j, ele in enumerate(e):
      indexed.append((index, j, ele))
  yield indexed

def another_funct(offset_lists):
    *#get that damn offset_lists*
     #rdd.mapPartitionsWithSplit(indexing)
     *rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it,
offset_lists))*

Ps. Probably in indexing function, count variable is not really being
effective?



On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu <chengi.liu...@gmail.com>
wrote:

> Hi,
>   Thanks for the response..
> In the second case f2??
> foo will have to be declared globablly??right??
>
> My function is somthing like:
> def indexing(splitIndex, iterator):
>   count = 0
>   offset = sum(*offset_lists*[:splitIndex]) if splitIndex else 0
>   indexed = []
>   for i, e in enumerate(iterator):
>     index = count + offset + i
>     for j, ele in enumerate(e):
>       indexed.append((index, j, ele))
>   yield indexed
>
> def another_funct(offset_lists):
>     *#get that damn offset_lists*
>     rdd.mapPartitionsWithSplit(indexing)
> But then, the issue is that offset_lists?
> Any suggestions?
>
>
> On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu <dav...@databricks.com>
> wrote:
>
>> The callback function f only accept 2 arguments, if you want to pass
>> another objects to it, you need closure, such as:
>>
>> foo=xxx
>> def f(index, iterator, foo):
>>      yield (index, foo)
>> rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo))
>>
>> also you can make f become `closure`:
>>
>> def f2(index, iterator):
>>     yield (index, foo)
>> rdd.mapPartitionsWithIndex(f2)
>>
>> On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu <chengi.liu...@gmail.com>
>> wrote:
>> > Hi,
>> >   In this example:
>> >
>> http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit
>> > Let say, f takes three arguments:
>> > def f(splitIndex, iterator, foo): yield splitIndex
>> > Now, how do i send this foo parameter to this method?
>> > rdd.mapPartitionsWithSplit(f)
>> > Thanks
>> >
>>
>
>


-- 
Mohit

"When you want success as badly as you want the air, then you will get it.
There is no other secret of success."
-Socrates

Reply via email to