Here is my code for Plug 'n Play detection of serial devices. It's not
yet ready for showtime, but it works with my serial attached Wacom.

Best regards,
eulores


# -*- coding: utf-8 -*-

import sys
if sys.platform == 'win32':
    from twisted.internet import win32eventreactor
    win32eventreactor.install()
from twisted.internet import reactor, protocol
from twisted.internet.task import deferLater, LoopingCall
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from twisted.internet.serialport import *
import re

def wait(seconds):
    return deferLater(reactor, seconds, lambda:None)

class StructuredPnP(object):
    ''' The structured PnP field is available with these fields:
            data:    string, required. Original unformatted string.
            other:   string, optional
            rev:     integer, required. Revision of the PnP standard.
100 is standard version 1.00
            eisa:    string, required
            product: string, required
            serial:  string, optional
            klass:   string, optional
            compat:  string, optional. Other older products with
similar functionality
            user:    string, optional. Free flowing field useful for
the end-user
    '''
    def __init__(self, data):
        #data = '\\96,N,8,1(\x01$WAC0608\\\\PEN\\WAC0000\\WACOM
UD\r\nUD-0608-R,V1.4-4\r\nF4)'
        # test string for a 7-bit character string
        #data = 'aaa(bbcccdddd\\eeeeeeee\\fff\\gggg\\hhhhii)'
        # test string for a 6-bit character string
        #data = 'AAA\x08BBCCCDDDD<EEEEEEEE<FFF<GGGG<HHHHII\x09'
        self.data = data
        for key in "other eisa product serial klass compat user".split():
            setattr(self, key, '')
        self.rev = '\0\0'
        prologue = r'(?P<other>[^(]{,16}?)'
        matrix =
r'(?P<rev>..)(?P<eisa>...)(?P<product>....)(?:@(?P<serial>[...@]{,8}))?(?:@(?P<klass>[...@]{,32}))?(?:@(?P<compat>[...@]{,40}))?(?:@(?P<user>.{,40}?))?(?:..)?'
        needle1 = prologue + '\\(' + matrix.replace('@','\\\\') + '\\)'
        needle2 = prologue + '\\\x08' + matrix.replace('@','\\\x3C') + '\\\x09'
        dct = dict()
        mo = re.match(needle1, data, re.S)
        if mo:
            dct = mo.groupdict()
        else:
            mo = re.match(needle2, data, re.S)
            if mo:
                dct = mo.groupdict()
                for k in "eisa product serial klass compat user".split():
                    v = dct[k]
                    dct[k] = ''.join([chr(ord(ch)+0x20) for ch in list(v)])
        for k,v in dct.items():
            setattr(self, k, v)
        self.rev = ((ord(self.rev[0])&0x3f)<<6) + (ord(self.rev[1])&0x3f)
    def __str__(self):
        return self.data
    def __repr__(self):
        l = ['<StructuredPnP object> %r' % self.data]
        for k in "other rev eisa product serial klass compat user".split():
            l.append('  %-8s %r' % (k, getattr(self,k,False)))
        return '\n'.join(l)

class PnPProtocol(protocol.Protocol):
    """ See Plug and Play External COM device Specification, rev 1.00
from Microsoft & Hayes, 1994-1995"""
    def __init__(self, deferred):
        self.deferred = deferred
        self.data = ''
        self.timeout = 1.4
        self.T5 = reactor.callLater(self.timeout, self.deliverPnP)
    def deliverPnP(self):
        self.transport.loseConnection()
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.callback(StructuredPnP(self.data))
    def dataReceived(self, data):
        self.T5.reset(self.timeout)
        self.data += data
        if len(self.data)>=256:
            self.T5.reset(0)
    @inlineCallbacks
    def connectionMade(self):
        while 1:
            # print "2.1.1"
            self.transport.setDTR(1)
            self.transport.setRTS(0)
            yield wait(0.2)
            if not self.transport.getDSR():
                break
            # print "2.1.3 part A"
            self.transport.setDTR(0)
            yield wait(0.2)
            # print "2.1.3 part B"
            self.transport.setDTR(1)
            yield wait(0.2)
            # print "2.1.4"
            self.transport.setRTS(1)
            # timer T5 is now used for per-character timeout
            self.timeout = 0.2
            yield wait(0.2)
            if self.data:
                break
            # print "2.1.5"
            self.transport.setDTR(0)
            self.transport.setRTS(0)
            yield wait(0.2)
            # print "2.1.6"
            self.transport.setDTR(1)
            self.transport.setRTS(1)
            yield wait(0.2)
            break
        if not self.data:
            self.T5.reset(0)
        returnValue(None)
    def connectionLost(self, reason='connectionDone'):
        print "Connection lost:", reason

def pnpString(port=0):
    d = Deferred()
    protocol = PnPProtocol(d)
    try:
        #SerialPort(protocol, port, reactor, baudrate=1200,
bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE,
timeout=0, xonxoff=0, rtscts=0)
        SerialPort(protocol, port, reactor, baudrate=1200,
bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE,
xonxoff=0, rtscts=0)
    except serial.SerialException:
        d.callback(StructuredPnP(''))
    return d

@inlineCallbacks
def imRunning():
    print "PnP string: %r" % (yield pnpString(3))
    reactor.stop()

if __name__ == "__main__":
    reactor.callWhenRunning(imRunning)
    reactor.run()



On Tue, Sep 14, 2010 at 11:24 AM, Markus Hubig <mhu...@imko.de> wrote:
> Hi @all!
> I'm trying to write a python library module for a special
> serial communication protocol called IMPBUS. To use the serial
> interface for sending and receiving packets as for now I'm
> sub-classing pyserial. My code looks like this:
>
> But the problem is that I can't use select with pyserial on Windows,
> because it don't provide the fileno() methode. So after some googling
> I found twisted.internet.serialport "A select()able serial device, acting
> as a transport."
> I never used twisted before so I'm a little overwhelmed by how I can
> replace pyserial with twisted in the code above ... maybe someone can
> point me to the right direction. It seems I need a "Protocol" and a
> "receiver" ...
> - Markus
> --
_______________________________________________
Twisted-Python mailing list
Twisted-Python@twistedmatrix.com
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

Reply via email to