Sorry, but I want to share my experiences. I hope this help to you.
I think that specialized MSWindows based services too complicated. They have to many bug possibilites.
So I trying with normal, "in python accessable" pipes. I see that with flush(), and some of the bintotext tricks I can use the subprocess/masterprocess communication.
Ok, this is not asynchronous. I can send some job to sp(s), and I can receive the report from sp(s).
But with threading I can create non-blocking communication.
You can see in the example: the PipeBPPThr define a pipe based process-pool thread.
This can communicate with a subprocess, can send/receive jobs, etc.
If you collect these threads, and write a process pool object, you can handle all of the communications with one object.
I hope to these examples can help to you.
If not, you can try with wm_copydata messages in Windows.""
This way of data exchanging is based on the message handling/sending.
import os, sys, threading, Queue, time pp_SPTHR_READY = 1 pp_SPTHR_SENDING = 2 pp_SPTHR_RECEIVING = 3
class CustomProcessThread(threading.Thread): def __init__(self,UniqID,ThreadID,Params): threading.Thread.__init__(self) self.UniqID=UniqID self.ThreadID=ThreadID self.Params=Params self.IPCObj=self._CreateIPCObj(UniqID,ThreadID) self.ProcessObj=self._CreateProcess(UniqID,ThreadID,self.IPCObj) self.State=pp_SPTHR_READY self._Input=Queue.Queue() self._Output=Queue.Queue() def _CreateIPCObj(self,UniqID,ThreadID): pass def _CreateProcess(self,UniqID,ThreadID,IPCObj): pass def SendJob(self,JobID,Data): self._Input.put([JobID,Data]) def HaveFinishedJob(self): return not self._Output.empty() def ReceiveJob(self): return self._Output.get() def Abort(self): self._Input.put(None) def _SendToSP(self,JobID,Data): pass def _ReceiveFromSP(self): pass def run(self): while 1: inp=self._Input.get() if inp==None: break jobid,data=inp self._SendToSP(jobid,data) rdata=self._ReceiveFromSP() self._Output.put([jobid,rdata]) def Wait(self): while self.isAlive(): time.sleep(0.001)
import PipeBPPThr, sys while 1: jobid,data=PipeBPPThr.ReadBinPacket(sys.stdin) if jobid==-1: PipeBPPThr.WriteBinPacket(sys.stdout,None) break PipeBPPThr.WriteBinPacket(sys.stdout,[jobid,[1,data]])
import os, sys, threading, Queue, subprocess, CustPPThread from cPickle import dumps, loads from binascii import hexlify, unhexlify def ReadTextPacket(SourceStream): psize=int(packet) return packet def WriteTextPacket(DestStream,Packet): Packet=str(Packet) DestStream.write('%06d'%len(Packet)) DestStream.write(Packet) DestStream.flush() def ReadBinPacket(SourceStream): txtpacket=ReadTextPacket(SourceStream) obj=loads(unhexlify(txtpacket)) return obj def WriteBinPacket(DestStream,Obj): pckpacket=hexlify(dumps(Obj,1)) WriteTextPacket(DestStream,pckpacket) class PIPEBasedProcessThread(CustPPThread.CustomProcessThread): def _CreateIPCObj(self,UniqID,ThreadID): return None def _CreateProcess(self,UniqID,ThreadID,IPCObj): spfn = self.Params['scriptfilename'] cmd = [r'c:\python24\python.exe', spfn] p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) return p def _SendToSP(self,JobID,Data): WriteBinPacket(self.ProcessObj.stdin,[JobID,Data]) def _ReceiveFromSP(self): return ReadBinPacket(self.ProcessObj.stdout) if __name__=='__main__': print "Start" ''' spfn='' cmd = [r'c:\python24\python.exe', spfn] p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) for i in range(10): WriteBinPacket(p.stdin,[1,'AAA']) print ReadBinPacket(p.stdout) WriteBinPacket(p.stdin,[-1,None]) print ReadBinPacket(p.stdout) sys.exit() ''' import time st=time.time() thr=PIPEBasedProcessThread(1,1,{'scriptfilename':''}) thr.start() for i in range(100): thr.SendJob(i,'AAABBB') print thr.ReceiveJob() thr.SendJob(-1,None) print thr.ReceiveJob() thr.Abort() thr.Wait() print "End" st=time.time()-st print st