On 06:14 pm, dvkee...@gmail.com wrote: >We are streaming a large amount of program output to the browser via a >twisted app, and we are seeing huge memory consumption. > >We have a database process that generates large amounts of data to >stdout, and we are streaming that to the browser through a twisted >web2 app. We are using web2 because it supports upload streaming as >well. > >Our code looks like: > > env = {} > input = stream.MemoryStream('') > SQLDUMP = '/usr/bin/dump' > > pstream = stream.ProcessStreamer(input, SQLDUMP, > [SQLDUMP,'--format=p','--clean', > '--host='+cfg.SOCKET, > '--username='+self.role,dbname], > env) > pstream.run() > > outstream = WatchedStream(pstream.outStream) > > response = http.Response( headers=headers, stream=outstream) > > > class WatchedStream(object): > > def __init__(self,stream): > self.stream = stream > def split(self, point): > ... some implementation > def close(self): > ... some implementation > def read(self): > d = self.stream.read() > bufSize = sum( [len(b) for b in self.stream.buffer if >b]) > log.msg('buffer size: %s'%bufSize) > return d > >Watching the log shows us that the stream (a web2.ProducerStream) >buffer is growing continuously to hundreds of MB. Doesn't a stream >object have a bufferSize attribute and the ability to throttle the >flow of data based on buffer fullness? Does that throttling behavior >have to be triggered explicitly? > >Yes, I know that web2 is deprecated, but I don't know that the problem >is in the web2 components. The reactor.spawnProcess documentation >does not seem to address the matter of modulating the read speed. Any >assistance will be appreciated.
You probably want to pause Twisted's child process reader when you notice your buffer is getting too large. Off hand, I can't say how you might translate this advice into specific stream API calls, but ultimately you want to call `pauseProducing` on something. It's also possible you'll have to go through some not-quite-documented interfaces(/implementations) to get there. IProcessTransport has no pauseProducing method, but the POSIX implementation of that interface has a `pipes` dictionary where the values are `IProducer` providers, so you can call `pauseProducing` on them (or just the one for, say, stdout, if that's where you're getting bytes from). Later, of course, you'll want to undo the pause with a call to `resumeProducing`. Jean-Paul _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python