We use Hadoop Streaming and needed to assign unique ids to rows/records. I
wrote some code in
Python to do this and thought I'd share it. This works in both the map &
reduce phase. This was
inspired by how MySQL handles auto-increment in a master-master setup (with
auto_increment_increment
and auto_increment_offset). This could easily be adapted to any other
language. I've only tested
it with 0.19.0.
Michael
#!/usr/local/bin/python
''' example demonstrating partitioned auto-increment for hadoop streaming'''
import os, sys
class AutoIncr:
''' iterator to generate auto-increment ids in parallel (using partitioning)
'''
def __init__(self, start=0):
if os.environ.has_key("mapred_task_is_map"):
# we're inside hadoop and will use a partitioned auto-increment
self.__task = long(os.environ['mapred_task_id'].split("_")[-2]) + start
if os.environ["mapred_task_is_map"] == "true":
# we're inside a map task
self.__tasks = int(os.environ['mapred_map_tasks'])
else:
# we're inside a reduce task
self.__tasks = int(os.environ['mapred_reduce_tasks'])
else:
# we're running outside of hadoop (for testing?)
self.__tasks = 1
self.__task = 0L + start
# initialize first id
self.__start = self.__task
def next(self):
c = self.__start
self.__start += self.__tasks # increment for next iteration
return c
def __iter__(self):
return self
if __name__ == "__main__":
counter = AutoIncr(start=1)
for line in sys.stdin:
sys.stdout.write("%s\t%s" % (counter.next(), line))