Sorry, but I want to share my experiences. I hope this help to you.
I think that specialized MSWindows based services too complicated. They have to many bug possibilites.
So I trying with normal, "in python accessable" pipes. I see that with flush(), and some of the bintotext tricks I can use the subprocess/masterprocess communication.
Ok, this is not asynchronous. I can send some job to sp(s), and I can receive the report from sp(s).
But with threading I can create non-blocking communication.
You can see in the example: the PipeBPPThr define a pipe based process-pool thread.
This can communicate with a subprocess, can send/receive jobs, etc.
If you collect these threads, and write a process pool object, you can handle all of the communications with one object.
I hope to these examples can help to you.
If not, you can try with wm_copydata messages in Windows.
http://msdn.microsoft.com/library/default.asp?url=""
This way of data exchanging is based on the message handling/sending.
dd
import os, sys, threading, Queue, time pp_SPTHR_READY = 1 pp_SPTHR_SENDING = 2 pp_SPTHR_RECEIVING = 3
class CustomProcessThread(threading.Thread):
def __init__(self,UniqID,ThreadID,Params):
threading.Thread.__init__(self)
self.UniqID=UniqID
self.ThreadID=ThreadID
self.Params=Params
self.IPCObj=self._CreateIPCObj(UniqID,ThreadID)
self.ProcessObj=self._CreateProcess(UniqID,ThreadID,self.IPCObj)
self.State=pp_SPTHR_READY
self._Input=Queue.Queue()
self._Output=Queue.Queue()
def _CreateIPCObj(self,UniqID,ThreadID):
pass
def _CreateProcess(self,UniqID,ThreadID,IPCObj):
pass
def SendJob(self,JobID,Data):
self._Input.put([JobID,Data])
def HaveFinishedJob(self):
return not self._Output.empty()
def ReceiveJob(self):
return self._Output.get()
def Abort(self):
self._Input.put(None)
def _SendToSP(self,JobID,Data):
pass
def _ReceiveFromSP(self):
pass
def run(self):
while 1:
inp=self._Input.get()
if inp==None:
break
jobid,data=inp
self._SendToSP(jobid,data)
rdata=self._ReceiveFromSP()
self._Output.put([jobid,rdata])
def Wait(self):
while self.isAlive():
time.sleep(0.001)
import PipeBPPThr, sys
while 1:
jobid,data=PipeBPPThr.ReadBinPacket(sys.stdin)
if jobid==-1:
PipeBPPThr.WriteBinPacket(sys.stdout,None)
break
PipeBPPThr.WriteBinPacket(sys.stdout,[jobid,[1,data]])
import os, sys, threading, Queue, subprocess, CustPPThread
from cPickle import dumps, loads
from binascii import hexlify, unhexlify
def ReadTextPacket(SourceStream):
packet=SourceStream.read(6)
psize=int(packet)
packet=SourceStream.read(psize)
return packet
def WriteTextPacket(DestStream,Packet):
Packet=str(Packet)
DestStream.write('%06d'%len(Packet))
DestStream.write(Packet)
DestStream.flush()
def ReadBinPacket(SourceStream):
txtpacket=ReadTextPacket(SourceStream)
obj=loads(unhexlify(txtpacket))
return obj
def WriteBinPacket(DestStream,Obj):
pckpacket=hexlify(dumps(Obj,1))
WriteTextPacket(DestStream,pckpacket)
class PIPEBasedProcessThread(CustPPThread.CustomProcessThread):
def _CreateIPCObj(self,UniqID,ThreadID):
return None
def _CreateProcess(self,UniqID,ThreadID,IPCObj):
spfn = self.Params['scriptfilename']
cmd = [r'c:\python24\python.exe', spfn]
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
return p
def _SendToSP(self,JobID,Data):
WriteBinPacket(self.ProcessObj.stdin,[JobID,Data])
def _ReceiveFromSP(self):
return ReadBinPacket(self.ProcessObj.stdout)
if __name__=='__main__':
print "Start"
'''
spfn='ppbpt_sub.py'
cmd = [r'c:\python24\python.exe', spfn]
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
for i in range(10):
WriteBinPacket(p.stdin,[1,'AAA'])
print ReadBinPacket(p.stdout)
WriteBinPacket(p.stdin,[-1,None])
print ReadBinPacket(p.stdout)
sys.exit()
'''
import time
st=time.time()
thr=PIPEBasedProcessThread(1,1,{'scriptfilename':'ppbpt_sub.py'})
thr.start()
for i in range(100):
thr.SendJob(i,'AAABBB')
print thr.ReceiveJob()
thr.SendJob(-1,None)
print thr.ReceiveJob()
thr.Abort()
thr.Wait()
print "End"
st=time.time()-st
print st
-- http://mail.python.org/mailman/listinfo/python-list
