Here is my code for Plug 'n Play detection of serial devices. It's not yet ready for showtime, but it works with my serial attached Wacom.
Best regards, eulores # -*- coding: utf-8 -*- import sys if sys.platform == 'win32': from twisted.internet import win32eventreactor win32eventreactor.install() from twisted.internet import reactor, protocol from twisted.internet.task import deferLater, LoopingCall from twisted.internet.defer import Deferred, inlineCallbacks, returnValue from twisted.internet.serialport import * import re def wait(seconds): return deferLater(reactor, seconds, lambda:None) class StructuredPnP(object): ''' The structured PnP field is available with these fields: data: string, required. Original unformatted string. other: string, optional rev: integer, required. Revision of the PnP standard. 100 is standard version 1.00 eisa: string, required product: string, required serial: string, optional klass: string, optional compat: string, optional. Other older products with similar functionality user: string, optional. Free flowing field useful for the end-user ''' def __init__(self, data): #data = '\\96,N,8,1(\x01$WAC0608\\\\PEN\\WAC0000\\WACOM UD\r\nUD-0608-R,V1.4-4\r\nF4)' # test string for a 7-bit character string #data = 'aaa(bbcccdddd\\eeeeeeee\\fff\\gggg\\hhhhii)' # test string for a 6-bit character string #data = 'AAA\x08BBCCCDDDD<EEEEEEEE<FFF<GGGG<HHHHII\x09' self.data = data for key in "other eisa product serial klass compat user".split(): setattr(self, key, '') self.rev = '\0\0' prologue = r'(?P<other>[^(]{,16}?)' matrix = r'(?P<rev>..)(?P<eisa>...)(?P<product>....)(?:@(?P<serial>[...@]{,8}))?(?:@(?P<klass>[...@]{,32}))?(?:@(?P<compat>[...@]{,40}))?(?:@(?P<user>.{,40}?))?(?:..)?' needle1 = prologue + '\\(' + matrix.replace('@','\\\\') + '\\)' needle2 = prologue + '\\\x08' + matrix.replace('@','\\\x3C') + '\\\x09' dct = dict() mo = re.match(needle1, data, re.S) if mo: dct = mo.groupdict() else: mo = re.match(needle2, data, re.S) if mo: dct = mo.groupdict() for k in "eisa product serial klass compat user".split(): v = dct[k] dct[k] = ''.join([chr(ord(ch)+0x20) for ch in list(v)]) for k,v in dct.items(): setattr(self, k, v) self.rev = ((ord(self.rev[0])&0x3f)<<6) + (ord(self.rev[1])&0x3f) def __str__(self): return self.data def __repr__(self): l = ['<StructuredPnP object> %r' % self.data] for k in "other rev eisa product serial klass compat user".split(): l.append(' %-8s %r' % (k, getattr(self,k,False))) return '\n'.join(l) class PnPProtocol(protocol.Protocol): """ See Plug and Play External COM device Specification, rev 1.00 from Microsoft & Hayes, 1994-1995""" def __init__(self, deferred): self.deferred = deferred self.data = '' self.timeout = 1.4 self.T5 = reactor.callLater(self.timeout, self.deliverPnP) def deliverPnP(self): self.transport.loseConnection() if self.deferred is not None: d, self.deferred = self.deferred, None d.callback(StructuredPnP(self.data)) def dataReceived(self, data): self.T5.reset(self.timeout) self.data += data if len(self.data)>=256: self.T5.reset(0) @inlineCallbacks def connectionMade(self): while 1: # print "2.1.1" self.transport.setDTR(1) self.transport.setRTS(0) yield wait(0.2) if not self.transport.getDSR(): break # print "2.1.3 part A" self.transport.setDTR(0) yield wait(0.2) # print "2.1.3 part B" self.transport.setDTR(1) yield wait(0.2) # print "2.1.4" self.transport.setRTS(1) # timer T5 is now used for per-character timeout self.timeout = 0.2 yield wait(0.2) if self.data: break # print "2.1.5" self.transport.setDTR(0) self.transport.setRTS(0) yield wait(0.2) # print "2.1.6" self.transport.setDTR(1) self.transport.setRTS(1) yield wait(0.2) break if not self.data: self.T5.reset(0) returnValue(None) def connectionLost(self, reason='connectionDone'): print "Connection lost:", reason def pnpString(port=0): d = Deferred() protocol = PnPProtocol(d) try: #SerialPort(protocol, port, reactor, baudrate=1200, bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=0, xonxoff=0, rtscts=0) SerialPort(protocol, port, reactor, baudrate=1200, bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, xonxoff=0, rtscts=0) except serial.SerialException: d.callback(StructuredPnP('')) return d @inlineCallbacks def imRunning(): print "PnP string: %r" % (yield pnpString(3)) reactor.stop() if __name__ == "__main__": reactor.callWhenRunning(imRunning) reactor.run() On Tue, Sep 14, 2010 at 11:24 AM, Markus Hubig <mhu...@imko.de> wrote: > Hi @all! > I'm trying to write a python library module for a special > serial communication protocol called IMPBUS. To use the serial > interface for sending and receiving packets as for now I'm > sub-classing pyserial. My code looks like this: > > But the problem is that I can't use select with pyserial on Windows, > because it don't provide the fileno() methode. So after some googling > I found twisted.internet.serialport "A select()able serial device, acting > as a transport." > I never used twisted before so I'm a little overwhelmed by how I can > replace pyserial with twisted in the code above ... maybe someone can > point me to the right direction. It seems I need a "Protocol" and a > "receiver" ... > - Markus > --
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python