Il 18/10/2010 17:21, Stefano Debenedetti ha scritto:
> Anyway, I ran Twisted tests on my installation after the patch I
> mentioned in my previous mail and I got the same results as before
> applying it so at least it seems it doesn't break any obvious stuff.


Sorry for replying to myself but for the record: the patch I sent
does break stuff (connections are sometimes closed before all data
has been sent) so don't use it.

The partially good news is that I managed to write a self-contained
and quite short example that can reproduce the exact same problem
I'm witnessing in my app. The bad news is that it does so only about
50% of the times but I thought I would share it while I keep on
trying to make it more reliable.

Please find attached one .sh file and one .py file, save somewhere
and make them executable. You will also need netcat (nc).

If you run the .sh file and after three seconds type in a short line
of text followed by the enter key, you should see the same line you
typed printed back many times on your terminal and quite a lot of
network activity going through the localhost interface for about a
minute. Don't redirect the .sh output to /dev/null, the problem
seems to occur only when the terminal application you run it in gets
to 100% CPU while it's printing data received by netcat. Hopefully
you have a multicore machine and this won't disrupt your desktop.

If you're lucky and nothing bad happens, after a while the .sh
script will terminate and all connections opened by it and the .py
file will be closed. Please remember to kill the three python
processes launched by the script before trying again.

If you're unlucky like I am, after a while all connections will be
closed except the one between netcat and one of the three servers
powered by the .py file.

That connection will be in this state according to netstat:

# netstat -np --inet 2> /dev/null | grep 127.0.0.1
tcp        0      0 127.0.0.1:8080          127.0.0.1:36815
ESTABLISHED 10042/python2.6
tcp        0      0 127.0.0.1:36815         127.0.0.1:8080
ESTABLISHED 10051/nc

If you then CTRL-C the .sh script so that netcat gets terminated,
you will get to the dreaded CLOSE_WAIT forever state:

# netstat -np --inet 2> /dev/null | grep 127.0.0.1
tcp        1      0 127.0.0.1:8080          127.0.0.1:36815
CLOSE_WAIT  10042/python2.6


Please note that even though the .py file is called three times and
launches a different server application each time, the only one I'm
interested in is the first one ("one"), the other two are just there
to simulate the third-party apps that my server is dealing with.
This is why servers "two" and "three" do seemingly silly stuff
including closing some of their connections at some point.

My goal is that no matter how and when the client and the "two" and
"three" servers close their connections to "one", the client
connection to "one" is always properly terminated and does never get
stuck in CLOSE_WAIT state.

Thanks for any feedback you might have,

ciao
ste



Attachment: test_producer.sh
Description: Bourne shell script

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import os, sys
from twisted.internet import reactor
from twisted.internet.protocol import ServerFactory, ClientFactory, Protocol

state = dict()

def forwardWithProducer(_from, to, streaming=True):
	#def debugPrint(data):
		#print _from.__class__.__name__, "-->", to.__class__.__name__, len(data), repr(data)
		#to.transport.write(data)
	#_from.dataReceived = debugPrint #to.transport.write
	_from.dataReceived = to.transport.write
	to.transport.registerProducer(_from.transport, streaming)

def forward(_from, to):
	#def debugPrint(data):
		#print _from.__class__.__name__, "-->", to.__class__.__name__, len(data), repr(data)
		#to.transport.write(data)
	#_from.dataReceived = debugPrint #to.transport.write
	_from.dataReceived = to.transport.write

def multiforward(_from, to, streaming=True):
	def multi(data):
		to.transport.write(data*100)
		reactor.callLater(1, to.transport.write, data*100)
	_from.dataReceived = multi
	to.transport.registerProducer(_from.transport, streaming)

def loseConnectionWithProducer(proto, onlost=lambda *args: None):
#def loseConnection(proto, onlost=None):
	#print "LOSING CONNECTION", proto
	#if onlost is None:
		#proto.connectionLost = proto.expectedConnectionLost
	#else:
	proto.connectionLost = onlost
	proto.transport.unregisterProducer()
	proto.transport.loseConnection()

def loseConnection(proto, onlost=lambda *args: None):
#def loseConnection(proto, onlost=None):
	#print "LOSING CONNECTION", proto
	#if onlost is None:
		#proto.connectionLost = proto.expectedConnectionLost
	#else:
	proto.connectionLost = onlost
	proto.transport.loseConnection()

def OneStart():
	forwardWithProducer(state["OneA"], state["OneB"])
	forwardWithProducer(state["OneD"], state["OneC"])
	forwardWithProducer(state["OneC"], state["OneD"])
	forwardWithProducer(state["OneE"], state["OneA"])

def TwoStart():
	multiforward(state["TwoB"], state["TwoD"])
	multiforward(state["TwoD"], state["TwoE"])

# one
class OneA(Protocol):
	def connectionMade(self):
		state[self.__class__.__name__] = self
		reactor.connectTCP("127.0.0.1", 8081, OneBFactory())
		reactor.connectTCP("127.0.0.1", 8084, OneCFactory())
class OneB(Protocol):
	def connectionMade(self):
		state[self.__class__.__name__] = self
		if len(state) == 5:
			OneStart()
class OneC(Protocol):
	def connectionMade(self):
		state[self.__class__.__name__] = self
		if len(state) == 5:
			OneStart()
	def connectionLost(self, reason):
		state["OneE"].connectionLost = lambda *args: loseConnectionWithProducer(state["OneA"])
		loseConnectionWithProducer(state["OneB"])
		loseConnectionWithProducer(state["OneD"])

class OneD(Protocol):
	def connectionMade(self):
		state[self.__class__.__name__] = self
		if len(state) == 5:
			OneStart()
class OneE(Protocol):
	def connectionMade(self):
		state[self.__class__.__name__] = self
		if len(state) == 5:
			OneStart()

# two
class TwoB(Protocol):
	def connectionMade(self):
		state[self.__class__.__name__] = self
		reactor.connectTCP("127.0.0.1", 8082, TwoDFactory())
		reactor.connectTCP("127.0.0.1", 8083, TwoEFactory())
		#reactor.callLater(5, os.kill, os.getpid(), 9)
		reactor.callLater(8, loseConnectionWithProducer, self)
		#reactor.callLater(5, reactor.crash)
	def connectionLost(self, reason):
		pass
class TwoD(Protocol):
	def connectionMade(self):
		state[self.__class__.__name__] = self
		if "TwoE" in state:
			TwoStart()
	def connectionLost(self, reason):
		loseConnectionWithProducer(state["TwoE"])
class TwoE(Protocol):
	def connectionMade(self):
		state[self.__class__.__name__] = self
		if "TwoD" in state:
			TwoStart()
	def connectionLost(self, reason):
		pass

# three
class ThreeC(Protocol):
	def connectionMade(self):
		#forwardWithProducer(self, self)
		multiforward(self, self)
		reactor.callLater(10, loseConnectionWithProducer, self)
		#reactor.callLater(10, reactor.crash)

# server factories
class OneAFactory(ServerFactory):
	protocol = OneA
class OneDFactory(ServerFactory):
	protocol = OneD
class OneEFactory(ServerFactory):
	protocol = OneE
class TwoBFactory(ServerFactory):
	protocol = TwoB
class ThreeCFactory(ServerFactory):
	protocol = ThreeC

# client factories
class OneBFactory(ClientFactory):
	protocol = OneB
class OneCFactory(ClientFactory):
	protocol = OneC
class TwoDFactory(ClientFactory):
	protocol = TwoD
class TwoEFactory(ClientFactory):
	protocol = TwoE

def main(what):
	if what == "one":
		reactor.listenTCP(8080, OneAFactory())
		reactor.listenTCP(8082, OneDFactory())
		reactor.listenTCP(8083, OneEFactory())
	elif what == "two":
		reactor.listenTCP(8081, TwoBFactory())
	elif what == "three":
		reactor.listenTCP(8084, ThreeCFactory())

if __name__ == "__main__":
	reactor.callWhenRunning(main, sys.argv[1])
	reactor.run()

_______________________________________________
Twisted-Python mailing list
[email protected]
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

Reply via email to