Hello community,
First of all - thanks for an awesome platform! I'm brand new to this community, but have been using Twisted a couple years. Reason for posting: I've hit a condition with ReconnectingClientFactory that I'm not sure is per design. I have a work around right now, but need your perspective. Seems like there should be a better/right way to do this. Attempted design: I'd like to have long running TCP clients (forever until stopped), with a long running TCP server. When a long running client hits a problem with a dependency (database is down, kafka bus unavailable, external API not responding, etc), I want the client to go offline for a while and then come back online. an automated, self-recovery type action. Since it's not ok to start/stop/restart the Twisted Reactor, I am letting the client finish whatever it can do, disconnect from the service, destruct the dependencies, wait for a period of time, and then attempt a clean re-initialization of those dependencies along with reconnecting to the Twisted Server. Problem case: I'm using the ReconnectingClientFactory in my client. When the client hits a problem, it calls transport.loseConnection(). But whenever the client calls this, after the disconnect - it does not reconnect; stopFactory is called and everything exits. Work around: I noticed some Twisted source code that works off factory.numPorts. If numPorts is 1 and the client loses the connection, it goes to 0 and calls the cleanup. So I conditionally increase this number right before intentionally disconnecting, and then reset that after reconnecting. This solves the problem, but it's a hack. I'll attach the test scripts to this post (if attachments are allowed), but the main code is with these functions in the factory: def clientConnectionLost(self, connector, reason): print(' factory clientConnectionLost: reason: {}'.format(reason)) # if self.disconnectedOnPurpose: # ## Hack to keep reactor alive # print(' factory clientConnectionLost: increasing numPorts') # self.numPorts += 1 # self.numPortsChanged = True # self.disconnectedOnPurpose = False print(' ... simulate client going idle before attempting restart...') time.sleep(5) ReconnectingClientFactory.clientConnectionLost(self, connector, reason) print(' factory clientConnectionLost: end.\n') def clientConnectionMade(self): print(' factory clientConnectionMade: starting numPorts: {}'.format(self.numPorts)) # if self.numPortsChanged : # ## Resetting from hacked value # print(' factory clientConnectionMade: decreasing numPorts') # self.numPorts -= 1 # self.numPortsChanged = False print(' factory clientConnectionMade: finished numPorts: {}'.format(self.numPorts)) def cleanup(self): print('factory cleanup: calling loseConnection') if self.connectedClient is not None: self.connectedClient.transport.loseConnection() self.disconnectedOnPurpose = True With the above lines commented out, once the cleanup call does transport.loseConnection(), the factory stops at the end of clientConnectionLost. Sample scripts/logs: I've tried to create short test scripts and corresponding logs (with the client failing, and then with it restarting when I use the workaround). I've cut out several thousand lines to get down to something simple for the example test scripts, but I know the client is still a little long. Again, I'm not sure if attachments work on the mailing list, but I'll attempt to attach the client/server scripts with the corresponding pass/fail logs. Thanks! -Chris
import os, sys, traceback import json, time, datetime, psutil from twisted.internet.protocol import ReconnectingClientFactory from twisted.protocols.basic import LineReceiver from twisted.internet import reactor, task, defer, threads from contextlib import suppress class CustomLineReceiverProtocol(LineReceiver): delimiter = b':==:' class ServiceClientProtocol(CustomLineReceiverProtocol): def connectionMade(self): print(' protocol connectionMade') self.factory.connectedClient = self self.factory.clientConnectionMade() def lineReceived(self, line): dataDict = json.loads(line) if dataDict.get('action') == 'healthRequest': self.factory.enterSimulateJob() def connectionLost(self, reason): print(' protocol connectionLost') self.factory.connectedClient = None def constructAndSendData(self, action, content): message = {} message['action'] = action message['content'] = content jsonMessage = json.dumps(message) msg = jsonMessage.encode('utf-8') print(' protocol constructAndSendData: {}'.format(msg)) self.sendLine(msg) class ServiceClientFactory(ReconnectingClientFactory): continueTrying = True def __init__(self): print('factory constructor') self.connectedClient = None self.health = {} self.loopingSystemHealth = task.LoopingCall(self.enterSystemHealthCheck) self.loopingSystemHealth.start(10) self.numPortsChanged = False self.disconnectedOnPurpose = False super().__init__() def buildProtocol(self, addr): print(' factory buildProtocol') self.resetDelay() protocol = ServiceClientProtocol() protocol.factory = self return protocol def clientConnectionLost(self, connector, reason): print(' factory clientConnectionLost: reason: {}'.format(reason)) # if self.disconnectedOnPurpose: # ## Hack to keep reactor alive # print(' factory clientConnectionLost: increasing numPorts') # self.numPorts += 1 # self.numPortsChanged = True # self.disconnectedOnPurpose = False print(' ... simulate client going idle before attempting restart...') time.sleep(5) ReconnectingClientFactory.clientConnectionLost(self, connector, reason) print(' factory clientConnectionLost: end.\n') def clientConnectionMade(self): print(' factory clientConnectionMade: starting numPorts: {}'.format(self.numPorts)) # if self.numPortsChanged : # ## Resetting from hacked value # print(' factory clientConnectionMade: decreasing numPorts') # self.numPorts -= 1 # self.numPortsChanged = False print(' factory clientConnectionMade: finished numPorts: {}'.format(self.numPorts)) print(' ..... pausing for <CTRL><C> test') time.sleep(3) def cleanup(self): print('factory cleanup: calling loseConnection') if self.connectedClient is not None: self.connectedClient.transport.loseConnection() self.disconnectedOnPurpose = True def stopFactory(self): print('stopFactory') self.stopTrying() with suppress(Exception): self.loopingSystemHealth.stop() print('stopFactory end.') def enterSimulateJob(self): print(' factory enterSimulateJob') threadHandle = threads.deferToThread(self.simulateJob) return threadHandle def simulateJob(self): print(' factory simulateJob: starting job') time.sleep(2) self.connectedClient.constructAndSendData('jobResponse', self.health) print(' factory simulateJob: finished job... time to reset the client (diconnect/re-initialize)...') self.cleanup() def enterSystemHealthCheck(self): print(' factory enterSystemHealthCheck') threadHandle = threads.deferToThread(self.getSystemHealth) return threadHandle def getSystemHealth(self): print(' factory getSystemHealth') try: currentTime = time.time() process = psutil.Process(os.getpid()) startTime = process.create_time() self.health = { 'processCpuPercent': process.cpu_percent(), 'processMemory': process.memory_full_info().uss, 'processRunTime': int(currentTime-startTime) } print(' factory getSystemHealth: system health: {}'.format(self.health)) except: exception = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]) print(' factory getSystemHealth: exception: {}'.format(exception)) if __name__ == '__main__': try: connector = reactor.connectTCP('127.0.0.1', 51841, ServiceClientFactory(), timeout=300) reactor.run() except: stacktrace = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]) print('clientWrapper exception: {}'.format(stacktrace)) print('exiting') sys.exit(0)
import sys, traceback import json from twisted.internet import reactor, task, defer, threads from twisted.internet.protocol import ServerFactory from twisted.protocols.basic import LineReceiver class CustomLineReceiverProtocol(LineReceiver): delimiter = b':==:' class ServiceListener(CustomLineReceiverProtocol): def connectionMade(self): print(' protocol connectionMade') self.factory.activeClients.append(self) def connectionLost(self, reason): print(' protocol connectionLost') self.factory.removeClient(self) def lineReceived(self, line): print(' protocol lineReceived: {}'.format(line)) def constructAndSendData(self, action): message = {'action': action} jsonMessage = json.dumps(message) msg = jsonMessage.encode('utf-8') print(' protocol constructAndSendData: {}'.format(msg)) self.sendLine(msg) class ServiceFactory(ServerFactory): protocol = ServiceListener def __init__(self): print('factory constructor') super().__init__() self.activeClients = [] self.loopingHealthUpdates = task.LoopingCall(self.enterSystemHealthCheck) self.loopingHealthUpdates.start(15) def removeClient(self, client): print(' factory removeClient') self.activeClients.remove(client) def enterSystemHealthCheck(self): print(' factory enterSystemHealthCheck') threadHandle = threads.deferToThread(self.sendHealthRequest) return threadHandle def sendHealthRequest(self): if len(self.activeClients) <= 0: print(' factory sendHealthRequest: no active clients to talk to') else: for client in self.activeClients: print(' factory sendHealthRequest: requesting from client...') client.constructAndSendData('healthRequest') if __name__ == '__main__': try: reactor.listenTCP(51841, ServiceFactory(), interface='127.0.0.1') reactor.run() except: stacktrace = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]) print('clientWrapper exception: {}'.format(stacktrace)) print('exiting') sys.exit(0)
client_not_working.log
Description: Binary data
client_working_with_hack.log
Description: Binary data
server.log
Description: Binary data
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com https://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python