We use Hadoop Streaming and needed to assign unique ids to rows/records.  I 
wrote some code in
Python to do this and thought I'd share it.  This works in both the map & 
reduce phase.  This was
inspired by how MySQL handles auto-increment in a master-master setup (with 
auto_increment_increment
and auto_increment_offset).  This could easily be adapted to any other 
language.  I've only tested
it with 0.19.0.

Michael

#!/usr/local/bin/python
''' example demonstrating partitioned auto-increment for hadoop streaming'''
import os, sys

class AutoIncr:
  ''' iterator to generate auto-increment ids in parallel (using partitioning) 
'''

  def __init__(self, start=0):

    if os.environ.has_key("mapred_task_is_map"):

      # we're inside hadoop and will use a partitioned auto-increment
      self.__task = long(os.environ['mapred_task_id'].split("_")[-2]) + start

      if os.environ["mapred_task_is_map"] == "true":

        # we're inside a map task
        self.__tasks = int(os.environ['mapred_map_tasks'])

      else:

        # we're inside a reduce task
        self.__tasks = int(os.environ['mapred_reduce_tasks'])

    else:

      # we're running outside of hadoop (for testing?)
      self.__tasks = 1
      self.__task = 0L + start

    # initialize first id
    self.__start = self.__task

  def next(self):
    c = self.__start
    self.__start += self.__tasks  # increment for next iteration
    return c

  def __iter__(self):
    return self


if __name__ == "__main__":

  counter = AutoIncr(start=1)
  for line in sys.stdin:
    sys.stdout.write("%s\t%s" % (counter.next(), line))




Reply via email to