Hi. Recently i needed some code to be able to listen on the public IP address outside my modem router. Eventually, i came up with a minimal UPnP implementation and because it seems to work and i'm happy about it, i've decided to post it here at clpy in case anybody else may have a use for it. You never know....
------------ # NAT Traversal via UPnP Port Mapping # Written by Nikos Fotoulis <niko...@gmx.com> # This code is public domain. # # Tested on Thomsom TG858v7 modem router. # UPnP is hairy. May not work with other routers # Feedback is welcome. import re, thread, socket, traceback as tb, random from time import sleep from urlparse import urlparse from urllib import urlopen import urllib2 VERBOSE = VVERBOSE = False DEFAULT_ADDR = UPNPS = None # regexes rWANIP = re.compile (r"ST:[^\n]*(WAN(IP|PPP)Connection:\d+)", re.I).search rLOCATION = re.compile (r"LoCaTiON:([^\n]+)", re.I).search def rTAG (t): return re.compile ("<%s>(.+?)</%s>"%(t, t), re.I|re.DOTALL) rSERVICE = rTAG ("service").findall for tag in ["controlURL", "URLBase", "NewExternalIPAddress", "NewLeaseDuration", "NewProtocol", "NewInternalClient", "NewExternalPort", "NewInternalPort"]: def f (txt, r=rTAG (tag).search): x = r (txt) if x: return x. groups ()[0].strip () if tag.startswith ("New"): tag = tag [3:] globals () ["r" + tag.upper ()] = f # multicast and discover UPnP gateways # Returns a dictionary where the keys are our "external IP" addresses def DiscoverUPnP (): global UPNPS, DEFAULTGW, DEFAULTIFACE, DEFAULT_ADDR S = {} UPNPS = {} s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) #s.setsockopt (socket.SOL_SOCKET, socket.SO_BROADCAST, 1) R = "M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: ssdp:discover\r\nMX: 10\r\nST: ssdp:all\r\n\r\n" try: s.sendto (R, ("239.255.255.250", 1900)) except: print "UPnP gateways unreachable" return timeout = 5 while 1: s.settimeout (timeout) try: data, addr = s.recvfrom (4096) except: break timeout = max (timeout * 0.5, 0.01) r = rWANIP (data) if r: service = r.groups ()[0] r = rLOCATION (data) if r: location = r.groups () [0].strip () if VERBOSE: print "server:", addr, "supports", service, "at", location S [addr] = service, location if VVERBOSE: print "+" for userver, (service, location) in S.items (): up = urlparse (location) netloc = up.netloc if ":" in netloc: server, _, port = netloc.partition (":") else: server, port = netloc, "80" data = urlopen (location).read () URLBase = rURLBASE (data) or "http://%s:%s"%(server, port) controlURL = None for x in rSERVICE (data): if service in x: controlURL = rCONTROLURL (x) break if controlURL: addr = GetExternalIP (service, URLBase + controlURL) if addr: s = socket.socket (socket.AF_INET, socket.SOCK_STREAM) s.connect ((server, int (port))) thishost = s.getsockname () [0] s.close () UPNPS [server] = addr, service, URLBase + controlURL, thishost if VERBOSE: print "for server:", server, "controlURL:", controlURL else: print "No controlURL found for server:", server # set defaults if len (UPNPS) == 1: k = UPNPS.items ()[0] DEFAULT_ADDR, DEFAULTGW, DEFAULTIFACE = k [1][0], k [0], k [1][3] else: print "Multiple UPnP gateways!" return UPNPS # generic request POST data def envelope (request, service, **kw): return """<?xml version="1.0"?> <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"> <s:Body> <u:%s xmlns:u="urn:schemas-upnp-org:service:%s"> """%(request, service) + "\n".join (["<%s>%s</%s>"%(k,v,k) for k, v in kw.items ()]) + """ </u:%s> </s:Body> </s:Envelope>"""%request def Request (service, URL, request, **kw): req = urllib2.Request (URL) req.add_header ("content-type",'text/xml; charset="utf-8"') req.add_header ("SOAPACTION", '"urn:schemas-upnp-org:service:%s#%s"'%(service, request)) req.add_data (envelope (request, service, **kw)) try: return urllib2.build_opener ().open (req).read () except: return def GetExternalIP (service, URL): answer = Request (service, URL, "GetExternalIPAddress") addr = answer and rEXTERNALIPADDRESS (answer) if not addr: print "Couldn't get external IP address!" return addr ## The 3 basic actions of UPnP : list entries, add a mapping, delete a mapping ## Notes (tested on Thomson TG585v7): ## - Some times AddMapping returns a fail code (500) but the ## mapping *is* done and that can be seen by listing the entries (?!) ## So, the only way to be sure is to: list entries, add mapping, list entries ## and see the difference. ## - Returned LeaseDuration seems to be in deci-seconds def getEntries (service, URL): pmi = 0 while 1: answer = Request (service, URL, "GetGenericPortMappingEntry", NewPortMappingIndex=pmi) if not answer: break yield answer pmi += 1 def listMappings (gw=None): _, service, URL, iface = UPNPS [gw or DEFAULTGW] L = [] for a in getEntries (service, URL): if rPROTOCOL (a) == "TCP" and rINTERNALCLIENT (a) == iface: L.append ((int (rEXTERNALPORT (a)), int (rINTERNALPORT (a)), int (rLEASEDURATION (a)) / 10.0)) else: print "strange entry response!", a return L def addMapping (local_port, public_port, ttl, gw=None): _, service, URL, iface = UPNPS [gw or DEFAULTGW] # test if port already mapped. Result of AddMapping is unreliable for eport, iport, _ in listMappings (gw): if eport == public_port and iport != local_port: return answer = Request (service, URL, "AddPortMapping", NewEnabled="1", NewRemoteHost="", NewLeaseDuration=ttl, NewInternalPort=local_port, NewExternalPort=public_port, NewProtocol="TCP", NewInternalClient=iface, NewPortMappingDescription="IndependNet") if answer: return True # test if mapped. Result of AddMapping is unreliable for eport, iport, _ in listMappings (gw): if eport == public_port and iport == local_port: return True def delMapping (public_port, gw=None): _, service, URL, _ = UPNPS [gw or DEFAULTGW] if public_port != "all": Request (service, URL, "DeletePortMapping", NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP") else: for public_port, _, _ in listMappings (gw): Request (service, URL, "DeletePortMapping", NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP") ## ## Socket compatible interface for accepting connections on an external port. ## Does mapping keepalive every 60sec to make sure the mapping is not kept ## indefinately if our application crashes and didn't manage to remove it. ## LEASE_DURATION = 60 def Accept (port): if not port: port = random.randint (2000, 60000) if UPNPS is None: DiscoverUPnP () if not UPNPS: raise Error ("No UPnP gateway found. Can't listen ouside the modem") s = socket.socket () s.bind ((DEFAULTIFACE, 0)) inport = s.getsockname ()[1] if not addMapping (inport, port, LEASE_DURATION): raise Error ("Port Mapping to external port %i Failed"%port) s.listen (2) return Acceptor (s, port, inport) class UPnPError: pass class Acceptor: def __init__ (self, sock, eport, iport): self.sock, self.eport, self.iport = sock, eport, iport self.port = eport self.active = True thread.start_new_thread (self.keepalive, ()) def __iter__ (self): while self.active: yield self.sock.accept () def keepalive (self): while 1: ttl = None for eport, iport, ttl in listMappings (): if eport == self.eport and iport == self.iport: break ##print "Lease up for:", ttl st = 0.1 if ttl is not None: st = max (ttl - 0.1, 0.1) sleep (st) if not self.active: break if not addMapping (self.iport, self.eport, LEASE_DURATION): if ttl is None: self.active = False print "Failed to Keepalive the lease" def __del__ (self): self.active = False self.sock.close () delMapping (self.eport) ## main. UPnP manager & testing USAGE = """UPnP NAT Traversal (port mapping) test Usage: python upnp.py [-gw gw] {list|bind|del} <arguments> upnp list list mappings upnp bind internal-port external-port time-to-live map public port to local port for some time upnp del external-port|"all" remove a port mapping upnp discover gateways and external IP addresses Common options: -gw : select UPnP gateway (if more than one -- NOT IMPLEMENTED) """ if __name__ == "__main__": import sys args = sys.argv [1:] if "--help" in args: print USAGE exit () VERBOSE = True print "Discovering UPnP gateways..." DiscoverUPnP () for gw, v in UPNPS.items (): ip, service, URL, iface = v print "External IP:", ip print "\tgateway:", gw print "\tservice:", service print "\tcontrol URL:", URL print "\tinterface:", iface if not UPNPS: exit ("No UPnP gateway found") if not args: exit () cmd = args.pop (0) gw = None if cmd == "list": print "Port Mappings:" for ep, ip, ttl in listMappings (gw): print "\t%i <- %i (ttl=%i)"%(ip, ep, ttl) elif cmd == "bind": iport, eport, ttl = args iport, eport, ttl = int (iport), int (eport), int (ttl) if addMapping (iport, eport, ttl, gw): print "OK" else: print "Failed. Port already used, or implementation error" elif cmd == "del": eport, = args delMapping (eport, gw) else: print USAGE -- http://mail.python.org/mailman/listinfo/python-list