Hi !

Sorry, but I want to share my experiences. I hope this help to you.

I think that specialized MSWindows based services too complicated. They have to many bug possibilites.
So I trying with normal, "in python accessable" pipes. I see that with flush(), and some of the bintotext tricks I can use the subprocess/masterprocess communication.
Ok, this is not asynchronous. I can send some job to sp(s), and I can receive the report from sp(s).
But with threading I can create non-blocking communication.

You can see in the example: the PipeBPPThr define a pipe based process-pool thread.
This can communicate with a subprocess, can send/receive jobs, etc.

If you collect these threads, and write a process pool object, you can handle all of the communications with one object.

I hope to these examples can help to you.

If not, you can try with wm_copydata messages in Windows.
http://msdn.microsoft.com/library/default.asp?url=""
This way of data exchanging is based on the message handling/sending.

dd








import os, sys, threading, Queue, time

pp_SPTHR_READY      = 1
pp_SPTHR_SENDING    = 2
pp_SPTHR_RECEIVING  = 3

class CustomProcessThread(threading.Thread):

    def __init__(self,UniqID,ThreadID,Params):
        threading.Thread.__init__(self)
        self.UniqID=UniqID
        self.ThreadID=ThreadID
        self.Params=Params
        self.IPCObj=self._CreateIPCObj(UniqID,ThreadID)
        self.ProcessObj=self._CreateProcess(UniqID,ThreadID,self.IPCObj)
        self.State=pp_SPTHR_READY
        self._Input=Queue.Queue()
        self._Output=Queue.Queue()

    def _CreateIPCObj(self,UniqID,ThreadID):
        pass
    
    def _CreateProcess(self,UniqID,ThreadID,IPCObj):
        pass
    
    def SendJob(self,JobID,Data):
        self._Input.put([JobID,Data])

    def HaveFinishedJob(self):
        return not self._Output.empty()
    
    def ReceiveJob(self):
        return self._Output.get()
    
    def Abort(self):
        self._Input.put(None)
        
    def _SendToSP(self,JobID,Data):
        pass
    
    def _ReceiveFromSP(self):
        pass
    
    def run(self):
        while 1:
            inp=self._Input.get()
            if inp==None:
                break
            jobid,data=inp
            self._SendToSP(jobid,data)
            rdata=self._ReceiveFromSP()
            self._Output.put([jobid,rdata])
            
    def Wait(self):
        while self.isAlive():
            time.sleep(0.001)
            


import PipeBPPThr, sys

while 1:
    jobid,data=PipeBPPThr.ReadBinPacket(sys.stdin)
    if jobid==-1:
        PipeBPPThr.WriteBinPacket(sys.stdout,None)
        break
    PipeBPPThr.WriteBinPacket(sys.stdout,[jobid,[1,data]])
    
import os, sys, threading, Queue, subprocess, CustPPThread
from cPickle import dumps, loads
from binascii import hexlify, unhexlify


def ReadTextPacket(SourceStream):
    packet=SourceStream.read(6)
    psize=int(packet)
    packet=SourceStream.read(psize)
    return packet

def WriteTextPacket(DestStream,Packet):
    Packet=str(Packet)
    DestStream.write('%06d'%len(Packet))
    DestStream.write(Packet)
    DestStream.flush()

def ReadBinPacket(SourceStream):
    txtpacket=ReadTextPacket(SourceStream)
    obj=loads(unhexlify(txtpacket))
    return obj

def WriteBinPacket(DestStream,Obj):
    pckpacket=hexlify(dumps(Obj,1))
    WriteTextPacket(DestStream,pckpacket)


class PIPEBasedProcessThread(CustPPThread.CustomProcessThread):

    def _CreateIPCObj(self,UniqID,ThreadID):
        return None

    def _CreateProcess(self,UniqID,ThreadID,IPCObj):
        spfn = self.Params['scriptfilename']
        cmd = [r'c:\python24\python.exe', spfn]
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        return p

    def _SendToSP(self,JobID,Data):
        WriteBinPacket(self.ProcessObj.stdin,[JobID,Data])

    def _ReceiveFromSP(self):
        return ReadBinPacket(self.ProcessObj.stdout)
    

if __name__=='__main__':
    print "Start"
    '''
    spfn='ppbpt_sub.py'
    cmd = [r'c:\python24\python.exe', spfn]
    p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    for i in range(10):
        WriteBinPacket(p.stdin,[1,'AAA'])
        print ReadBinPacket(p.stdout)
    WriteBinPacket(p.stdin,[-1,None])
    print ReadBinPacket(p.stdout)
    sys.exit()
    '''
    import time
    st=time.time()
    thr=PIPEBasedProcessThread(1,1,{'scriptfilename':'ppbpt_sub.py'})
    thr.start()
    for i in range(100):
        thr.SendJob(i,'AAABBB')
        print thr.ReceiveJob()
    thr.SendJob(-1,None)
    print thr.ReceiveJob()
    thr.Abort()
    thr.Wait()
    print "End"
    st=time.time()-st
    print st


-- 
http://mail.python.org/mailman/listinfo/python-list

Reply via email to