Hello community,

 

First of all - thanks for an awesome platform!  I'm brand new to this
community, but have been using Twisted a couple years.

 

Reason for posting:

I've hit a condition with ReconnectingClientFactory that I'm not sure is per
design.  I have a work around right now, but need your perspective.  Seems
like there should be a better/right way to do this.

 

Attempted design:

I'd like to have long running TCP clients (forever until stopped), with a
long running TCP server.  When a long running client hits a problem with a
dependency (database is down, kafka bus unavailable, external API not
responding, etc), I want the client to go offline for a while and then come
back online. an automated, self-recovery type action.  Since it's not ok to
start/stop/restart the Twisted Reactor, I am letting the client finish
whatever it can do, disconnect from the service, destruct the dependencies,
wait for a period of time, and then attempt a clean re-initialization of
those dependencies along with reconnecting to the Twisted Server.

 

Problem case:

I'm using the ReconnectingClientFactory in my client.  When the client hits
a problem, it calls transport.loseConnection().  But whenever the client
calls this, after the disconnect - it does not reconnect; stopFactory is
called and everything exits. 

 

Work around:

I noticed some Twisted source code that works off factory.numPorts.  If
numPorts is 1 and the client loses the connection, it goes to 0 and calls
the cleanup.  So I conditionally increase this number right before
intentionally disconnecting, and then reset that after reconnecting.  This
solves the problem, but it's a hack.  

 

I'll attach the test scripts to this post (if attachments are allowed), but
the main code is with these functions in the factory:

 

                def clientConnectionLost(self, connector, reason):

                                print('  factory clientConnectionLost:
reason: {}'.format(reason))

                                # if self.disconnectedOnPurpose:

                                #             ## Hack to keep reactor alive

                                #             print('  factory
clientConnectionLost: increasing numPorts')

                                #             self.numPorts += 1

                                #             self.numPortsChanged = True

                                #             self.disconnectedOnPurpose =
False

                                print('  ... simulate client going idle
before attempting restart...')

                                time.sleep(5)

 
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)

                                print('  factory clientConnectionLost:
end.\n')

 

                def clientConnectionMade(self):

                                print('  factory clientConnectionMade:
starting numPorts: {}'.format(self.numPorts))

                                # if self.numPortsChanged :

                                #             ## Resetting from hacked value

                                #             print('  factory
clientConnectionMade: decreasing numPorts')

                                #             self.numPorts -= 1

                                #             self.numPortsChanged = False

                                print('  factory clientConnectionMade:
finished numPorts: {}'.format(self.numPorts))

 

                def cleanup(self):

                                print('factory cleanup: calling
loseConnection')

                                if self.connectedClient is not None:

 
self.connectedClient.transport.loseConnection()

                                                self.disconnectedOnPurpose =
True

 

With the above lines commented out, once the cleanup call does
transport.loseConnection(), the factory stops at the end of
clientConnectionLost. 

 

 

Sample scripts/logs:

I've tried to create short test scripts and corresponding logs (with the
client failing, and then with it restarting when I use the workaround).
I've cut out several thousand lines to get down to something simple for the
example test scripts, but I know the client is still a little long.  Again,
I'm not sure if attachments work on the mailing list, but I'll attempt to
attach the client/server scripts with the corresponding pass/fail logs.

 

Thanks!

 

-Chris

 

import os, sys, traceback
import json, time, datetime, psutil
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor, task, defer, threads
from contextlib import suppress

class CustomLineReceiverProtocol(LineReceiver):
        delimiter = b':==:'

class ServiceClientProtocol(CustomLineReceiverProtocol):
        def connectionMade(self):
                print('    protocol connectionMade')
                self.factory.connectedClient = self
                self.factory.clientConnectionMade()

        def lineReceived(self, line):
                dataDict = json.loads(line)
                if dataDict.get('action') == 'healthRequest':
                        self.factory.enterSimulateJob()

        def connectionLost(self, reason):
                print('    protocol connectionLost')
                self.factory.connectedClient = None
        
        def constructAndSendData(self, action, content):
                message = {}
                message['action'] = action
                message['content'] = content
                jsonMessage = json.dumps(message)
                msg = jsonMessage.encode('utf-8')
                print('    protocol constructAndSendData: {}'.format(msg))
                self.sendLine(msg)

class ServiceClientFactory(ReconnectingClientFactory):
        continueTrying = True

        def __init__(self):
                print('factory constructor')
                self.connectedClient = None
                self.health = {}
                self.loopingSystemHealth = 
task.LoopingCall(self.enterSystemHealthCheck)
                self.loopingSystemHealth.start(10)
                self.numPortsChanged = False
                self.disconnectedOnPurpose = False
                super().__init__()

        def buildProtocol(self, addr):
                print('  factory buildProtocol')
                self.resetDelay()
                protocol = ServiceClientProtocol()
                protocol.factory = self
                return protocol

        def clientConnectionLost(self, connector, reason):
                print('  factory clientConnectionLost: reason: 
{}'.format(reason))
                # if self.disconnectedOnPurpose:
                #       ## Hack to keep reactor alive
                #       print('  factory clientConnectionLost: increasing 
numPorts')
                #       self.numPorts += 1
                #       self.numPortsChanged = True
                #       self.disconnectedOnPurpose = False
                print('  ... simulate client going idle before attempting 
restart...')
                time.sleep(5)
                ReconnectingClientFactory.clientConnectionLost(self, connector, 
reason)
                print('  factory clientConnectionLost: end.\n')

        def clientConnectionMade(self):
                print('  factory clientConnectionMade: starting numPorts: 
{}'.format(self.numPorts))
                # if self.numPortsChanged :
                #       ## Resetting from hacked value
                #       print('  factory clientConnectionMade: decreasing 
numPorts')
                #       self.numPorts -= 1
                #       self.numPortsChanged = False
                print('  factory clientConnectionMade: finished numPorts: 
{}'.format(self.numPorts))
                print('  ..... pausing for <CTRL><C> test')
                time.sleep(3)

        def cleanup(self):
                print('factory cleanup: calling loseConnection')
                if self.connectedClient is not None:
                        self.connectedClient.transport.loseConnection()
                        self.disconnectedOnPurpose = True

        def stopFactory(self):
                print('stopFactory')
                self.stopTrying()
                with suppress(Exception):
                        self.loopingSystemHealth.stop()
                print('stopFactory end.')

        def enterSimulateJob(self):
                print('  factory enterSimulateJob')
                threadHandle = threads.deferToThread(self.simulateJob)
                return threadHandle

        def simulateJob(self):
                print('  factory simulateJob: starting job')
                time.sleep(2)
                self.connectedClient.constructAndSendData('jobResponse', 
self.health)
                
                print('  factory simulateJob: finished job... time to reset the 
client (diconnect/re-initialize)...')
                self.cleanup()

        def enterSystemHealthCheck(self):
                print('  factory enterSystemHealthCheck')
                threadHandle = threads.deferToThread(self.getSystemHealth)
                return threadHandle

        def getSystemHealth(self):
                print('  factory getSystemHealth')
                try:
                        currentTime = time.time()
                        process = psutil.Process(os.getpid())
                        startTime = process.create_time()
                        self.health = {
                                'processCpuPercent': process.cpu_percent(),
                                'processMemory': process.memory_full_info().uss,
                                'processRunTime': int(currentTime-startTime)
                        }
                        print('  factory getSystemHealth: system health: 
{}'.format(self.health))
                except:
                        exception = 
traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], 
sys.exc_info()[2])
                        print('  factory getSystemHealth: exception: 
{}'.format(exception))


if __name__ == '__main__':
        try:
                connector = reactor.connectTCP('127.0.0.1', 51841, 
ServiceClientFactory(), timeout=300)
                reactor.run()
        except:
                stacktrace = traceback.format_exception(sys.exc_info()[0], 
sys.exc_info()[1], sys.exc_info()[2])
                print('clientWrapper exception: {}'.format(stacktrace))
        print('exiting')
        sys.exit(0)
import sys, traceback
import json
from twisted.internet import reactor, task, defer, threads
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver


class CustomLineReceiverProtocol(LineReceiver):
        delimiter = b':==:'

class ServiceListener(CustomLineReceiverProtocol):
        def connectionMade(self):
                print('    protocol connectionMade')
                self.factory.activeClients.append(self)

        def connectionLost(self, reason):
                print('    protocol connectionLost')
                self.factory.removeClient(self)

        def lineReceived(self, line):
                print('    protocol lineReceived: {}'.format(line))

        def constructAndSendData(self, action):
                message = {'action': action}
                jsonMessage = json.dumps(message)
                msg = jsonMessage.encode('utf-8')
                print('    protocol constructAndSendData: {}'.format(msg))
                self.sendLine(msg)

class ServiceFactory(ServerFactory):
        protocol = ServiceListener

        def __init__(self):
                print('factory constructor')
                super().__init__()
                self.activeClients = []
                self.loopingHealthUpdates = 
task.LoopingCall(self.enterSystemHealthCheck)
                self.loopingHealthUpdates.start(15)

        def removeClient(self, client):
                print('  factory removeClient')
                self.activeClients.remove(client)

        def enterSystemHealthCheck(self):
                print('  factory enterSystemHealthCheck')
                threadHandle = threads.deferToThread(self.sendHealthRequest)
                return threadHandle

        def sendHealthRequest(self):
                if len(self.activeClients) <= 0:
                        print('  factory sendHealthRequest: no active clients 
to talk to')
                else:
                        for client in self.activeClients:
                                print('  factory sendHealthRequest: requesting 
from client...')
                                client.constructAndSendData('healthRequest')

if __name__ == '__main__':
        try:
                reactor.listenTCP(51841, ServiceFactory(), 
interface='127.0.0.1')
                reactor.run()
        except:
                stacktrace = traceback.format_exception(sys.exc_info()[0], 
sys.exc_info()[1], sys.exc_info()[2])
                print('clientWrapper exception: {}'.format(stacktrace))
        print('exiting')
        sys.exit(0)

Attachment: client_not_working.log
Description: Binary data

Attachment: client_working_with_hack.log
Description: Binary data

Attachment: server.log
Description: Binary data

_______________________________________________
Twisted-Python mailing list
Twisted-Python@twistedmatrix.com
https://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

Reply via email to