Hi.

Recently i needed some code to be able to listen on the public IP
address outside my modem router. Eventually, i came up with a
minimal UPnP implementation and because it seems to work and i'm
happy about it, i've decided to post it here at clpy in case
anybody else may have a use for it. You never know....

------------
# NAT Traversal via UPnP Port Mapping
# Written by Nikos Fotoulis <niko...@gmx.com>
# This code is public domain.
#
# Tested on Thomsom TG858v7 modem router.
# UPnP is hairy. May not work with other routers
# Feedback is welcome.

import re, thread, socket, traceback as tb, random
from time import sleep
from urlparse import urlparse
from urllib import urlopen
import urllib2

VERBOSE = VVERBOSE = False
DEFAULT_ADDR = UPNPS = None

# regexes
rWANIP = re.compile (r"ST:[^\n]*(WAN(IP|PPP)Connection:\d+)", re.I).search
rLOCATION = re.compile (r"LoCaTiON:([^\n]+)", re.I).search
def rTAG (t):
        return re.compile ("<%s>(.+?)</%s>"%(t, t), re.I|re.DOTALL)
rSERVICE = rTAG ("service").findall

for tag in ["controlURL", "URLBase", "NewExternalIPAddress", 
"NewLeaseDuration", "NewProtocol",
        "NewInternalClient", "NewExternalPort", "NewInternalPort"]:
        def f (txt, r=rTAG (tag).search):
                x = r (txt)
                if x:
                        return x. groups ()[0].strip ()
        if tag.startswith ("New"):
                tag = tag [3:]
        globals () ["r" + tag.upper ()] = f

# multicast and discover UPnP gateways
# Returns a dictionary where the keys are our "external IP" addresses
def DiscoverUPnP ():
        global UPNPS, DEFAULTGW, DEFAULTIFACE, DEFAULT_ADDR
        S = {}
        UPNPS = {}

        s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
        s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
        #s.setsockopt (socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        R = "M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: 
ssdp:discover\r\nMX: 10\r\nST: ssdp:all\r\n\r\n"
        try: s.sendto (R, ("239.255.255.250", 1900))
        except:
                print "UPnP gateways unreachable"
                return

        timeout = 5
        while 1:
                s.settimeout (timeout)
                try:
                        data, addr = s.recvfrom (4096)
                except:
                        break
                timeout = max (timeout * 0.5, 0.01)
                r = rWANIP (data)
                if r:
                        service = r.groups ()[0]
                        r = rLOCATION (data)
                        if r:
                                location = r.groups () [0].strip ()
                                if VERBOSE:
                                        print "server:", addr, "supports", 
service, "at", location
                                S [addr] = service, location
                if VVERBOSE: print "+"

        for userver, (service, location) in S.items ():
                up = urlparse (location)
                netloc = up.netloc
                if ":" in netloc: server, _, port = netloc.partition (":")
                else: server, port = netloc, "80"
                data = urlopen (location).read ()

                URLBase = rURLBASE (data) or "http://%s:%s"%(server, port)
                controlURL = None
                for x in rSERVICE (data):
                        if service in x:
                                controlURL = rCONTROLURL (x)
                                break
                if controlURL:
                        addr = GetExternalIP (service, URLBase + controlURL)
                        if addr:
                                s = socket.socket (socket.AF_INET, 
socket.SOCK_STREAM)
                                s.connect ((server, int (port)))
                                thishost = s.getsockname () [0]
                                s.close ()
                                UPNPS [server] = addr, service, URLBase + 
controlURL, thishost
                                if VERBOSE:
                                        print "for server:", server, 
"controlURL:", controlURL
                else:
                        print "No controlURL found for server:", server

        # set defaults
        if len (UPNPS) == 1:
                k = UPNPS.items ()[0]
                DEFAULT_ADDR, DEFAULTGW, DEFAULTIFACE = k [1][0], k [0], k 
[1][3]
        else:
                print "Multiple UPnP gateways!"

        return UPNPS


# generic request POST data
def envelope (request, service, **kw):
        return """<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"; 
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/";>
<s:Body>
<u:%s xmlns:u="urn:schemas-upnp-org:service:%s">
"""%(request, service) + "\n".join (["<%s>%s</%s>"%(k,v,k) for k, v in kw.items 
()]) + """ </u:%s>
</s:Body>
</s:Envelope>"""%request

def Request (service, URL, request, **kw):
        req = urllib2.Request (URL)
        req.add_header ("content-type",'text/xml; charset="utf-8"')
        req.add_header ("SOAPACTION", 
'"urn:schemas-upnp-org:service:%s#%s"'%(service, request))
        req.add_data (envelope (request, service, **kw))
        try: return urllib2.build_opener ().open (req).read ()
        except: return

def GetExternalIP (service, URL):
        answer = Request (service, URL, "GetExternalIPAddress")

        addr = answer and rEXTERNALIPADDRESS (answer)
        if not addr:
                print "Couldn't get external IP address!"
        return addr

## The 3 basic actions of UPnP : list entries, add a mapping, delete a mapping
## Notes (tested on Thomson TG585v7):
## - Some times AddMapping returns a fail code (500) but the
##   mapping *is* done and that can be seen by listing the entries (?!)
##   So, the only way to be sure is to: list entries, add mapping, list entries
##   and see the difference.
## - Returned LeaseDuration seems to be in deci-seconds

def getEntries (service, URL):
        pmi = 0
        while 1:
                answer = Request (service, URL, "GetGenericPortMappingEntry", 
NewPortMappingIndex=pmi)
                if not answer:
                        break
                yield answer
                pmi += 1

def listMappings (gw=None):
        _, service, URL, iface = UPNPS [gw or DEFAULTGW]
        L = []
        for a in getEntries (service, URL):
                if rPROTOCOL (a) == "TCP" and rINTERNALCLIENT (a) == iface:
                        L.append ((int (rEXTERNALPORT (a)), int (rINTERNALPORT 
(a)),
                                 int (rLEASEDURATION (a)) / 10.0))
                else: print "strange entry response!", a
        return L

def addMapping (local_port, public_port, ttl, gw=None):
        _, service, URL, iface = UPNPS [gw or DEFAULTGW]

        # test if port already mapped. Result of AddMapping is unreliable
        for eport, iport, _ in listMappings (gw):
                if eport == public_port and iport != local_port:
                        return

        answer = Request (service, URL, "AddPortMapping",
                NewEnabled="1", NewRemoteHost="", NewLeaseDuration=ttl, 
NewInternalPort=local_port,
                NewExternalPort=public_port, NewProtocol="TCP", 
NewInternalClient=iface,
                NewPortMappingDescription="IndependNet")
        if answer:
                return True

        # test if mapped. Result of AddMapping is unreliable
        for eport, iport, _ in listMappings (gw):
                if eport == public_port and iport == local_port:
                        return True

def delMapping (public_port, gw=None):
        _, service, URL, _ = UPNPS [gw or DEFAULTGW]
        if public_port != "all":
                Request (service, URL, "DeletePortMapping",
                        NewRemoteHost="", NewExternalPort=public_port, 
NewProtocol="TCP")
        else:
                for public_port, _, _ in listMappings (gw):
                        Request (service, URL, "DeletePortMapping",
                                NewRemoteHost="", NewExternalPort=public_port, 
NewProtocol="TCP")

##
## Socket compatible interface for accepting connections on an external port.
## Does mapping keepalive every 60sec to make sure the mapping is not kept
## indefinately if our application crashes and didn't manage to remove it.
##

LEASE_DURATION = 60

def Accept (port):
        if not port:
                port = random.randint (2000, 60000)

        if UPNPS is None:
                DiscoverUPnP ()

        if not UPNPS:
                raise Error ("No UPnP gateway found. Can't listen ouside the 
modem")

        s = socket.socket ()
        s.bind ((DEFAULTIFACE, 0))
        inport = s.getsockname ()[1]
        if not addMapping (inport, port, LEASE_DURATION):
                raise Error ("Port Mapping to external port %i Failed"%port)
        s.listen (2)
        return Acceptor (s, port, inport)

class UPnPError:
        pass

class Acceptor:
        def __init__ (self, sock, eport, iport):
                self.sock, self.eport, self.iport = sock, eport, iport
                self.port = eport
                self.active = True
                thread.start_new_thread (self.keepalive, ())

        def __iter__ (self):
                while self.active:
                        yield self.sock.accept ()

        def keepalive (self):
                while 1:
                        ttl = None
                        for eport, iport, ttl in listMappings ():
                                if eport == self.eport and iport == self.iport:
                                        break
                        ##print "Lease up for:", ttl
                        st = 0.1
                        if ttl is not None:
                                st = max (ttl - 0.1, 0.1)
                        sleep (st)
                        if not self.active: break
                        if not addMapping (self.iport, self.eport, 
LEASE_DURATION):
                                if ttl is None:
                                        self.active = False
                                        print "Failed to Keepalive the lease"

        def __del__ (self):
                self.active = False
                self.sock.close ()
                delMapping (self.eport)

## main. UPnP manager & testing

USAGE = """UPnP NAT Traversal (port mapping) test
Usage: python upnp.py [-gw gw] {list|bind|del} <arguments>

 upnp list
        list mappings
 upnp bind internal-port external-port time-to-live
        map public port to local port for some time
 upnp del external-port|"all"
        remove a port mapping
 upnp
        discover gateways and external IP addresses

Common options:
 -gw            : select UPnP gateway (if more than one -- NOT IMPLEMENTED)
"""

if __name__ == "__main__":
        import sys
        args = sys.argv [1:]
        if "--help"  in args:
                print USAGE
                exit ()

        VERBOSE = True
        print "Discovering UPnP gateways..."
        DiscoverUPnP ()
        for gw, v in UPNPS.items ():
                ip, service, URL, iface = v
                print "External IP:", ip 
                print "\tgateway:", gw
                print "\tservice:", service
                print "\tcontrol URL:", URL
                print "\tinterface:", iface


        if not UPNPS:
                exit ("No UPnP gateway found")
        if not args:
                exit ()

        cmd = args.pop (0)
        gw = None

        if cmd == "list":
                print "Port Mappings:"
                for ep, ip, ttl in listMappings (gw):
                        print "\t%i <- %i (ttl=%i)"%(ip, ep, ttl)
        elif cmd == "bind":
                iport, eport, ttl = args
                iport, eport, ttl = int (iport), int (eport), int (ttl)
                if addMapping (iport, eport, ttl, gw):
                        print "OK"
                else: print "Failed. Port already used, or implementation error"
        elif cmd == "del":
                eport, = args
                delMapping (eport, gw)
        else:
                print USAGE

-- 
http://mail.python.org/mailman/listinfo/python-list

Reply via email to