Thank you. I was thinking along those lines, but wasn't sure how the producer/consumer methods related to the streams model. I guess I still don't know, but I now have more confidence that a solution is there to find.
I was considering a klunky workaround, where the db output is piped to a throttling program which pipes to the web app. I still have the problem of telling that filter how fast to go, but that is more of a unix domain problem and less of a twisted domain problem. Doable, but yuck. I guess it's time to dig into the reactor.spawnProcess code and find where the producer-related method calls are. David On Tue, Dec 21, 2010 at 11:51 AM, <exar...@twistedmatrix.com> wrote: > On 06:14 pm, dvkee...@gmail.com wrote: >>We are streaming a large amount of program output to the browser via a >>twisted app, and we are seeing huge memory consumption. >> >>We have a database process that generates large amounts of data to >>stdout, and we are streaming that to the browser through a twisted >>web2 app. We are using web2 because it supports upload streaming as >>well. >> >>Our code looks like: >> >> env = {} >> input = stream.MemoryStream('') >> SQLDUMP = '/usr/bin/dump' >> >> pstream = stream.ProcessStreamer(input, SQLDUMP, >> [SQLDUMP,'--format=p','--clean', >> '--host='+cfg.SOCKET, >> '--username='+self.role,dbname], >> env) >> pstream.run() >> >> outstream = WatchedStream(pstream.outStream) >> >> response = http.Response( headers=headers, stream=outstream) >> >> >> class WatchedStream(object): >> >> def __init__(self,stream): >> self.stream = stream >> def split(self, point): >> ... some implementation >> def close(self): >> ... some implementation >> def read(self): >> d = self.stream.read() >> bufSize = sum( [len(b) for b in self.stream.buffer if >>b]) >> log.msg('buffer size: %s'%bufSize) >> return d >> >>Watching the log shows us that the stream (a web2.ProducerStream) >>buffer is growing continuously to hundreds of MB. Doesn't a stream >>object have a bufferSize attribute and the ability to throttle the >>flow of data based on buffer fullness? Does that throttling behavior >>have to be triggered explicitly? >> >>Yes, I know that web2 is deprecated, but I don't know that the problem >>is in the web2 components. The reactor.spawnProcess documentation >>does not seem to address the matter of modulating the read speed. Any >>assistance will be appreciated. > > You probably want to pause Twisted's child process reader when you > notice your buffer is getting too large. Off hand, I can't say how you > might translate this advice into specific stream API calls, but > ultimately you want to call `pauseProducing` on something. > > It's also possible you'll have to go through some not-quite-documented > interfaces(/implementations) to get there. IProcessTransport has no > pauseProducing method, but the POSIX implementation of that interface > has a `pipes` dictionary where the values are `IProducer` providers, so > you can call `pauseProducing` on them (or just the one for, say, stdout, > if that's where you're getting bytes from). > > Later, of course, you'll want to undo the pause with a call to > `resumeProducing`. > > Jean-Paul > > _______________________________________________ > Twisted-Python mailing list > Twisted-Python@twistedmatrix.com > http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python > -- dkee...@travelbyroad.net Rdbhost -> SQL databases as a webservice [www.rdbhost.com] _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python