Changeset: 116a59fdb4e6 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/116a59fdb4e6
Modified Files:
        testing/Mtest.py.in
        testing/malmapi.py
        testing/process.py
        testing/sqllogictest.py
        testing/sqltest.py
Branch: Aug2024
Log Message:

Reworked timeout handling in Mtest.


diffs (truncated from 825 to 300 lines):

diff --git a/testing/Mtest.py.in b/testing/Mtest.py.in
--- a/testing/Mtest.py.in
+++ b/testing/Mtest.py.in
@@ -1272,7 +1272,12 @@ def PerformDir(env, testdir, testlist, t
                 else:
                     vaultopt = []
                 if not oneserver:
-                    pSrvr = ServerClass(env['exe']['mserver5'] + 
[f'--dbpath={LogDBdir}'] + vaultopt + mserver5_opts, open(os.devnull, 'w'), 
open(os.devnull, 'w'), par['TIMEOUT'], os.path.join(LogDBdir, '.started'), 
TSTDB, dbg=env.get('DBG'))
+                    pSrvr = ServerClass(env['exe']['mserver5'] + 
[f'--dbpath={LogDBdir}'] + vaultopt + mserver5_opts,
+                                        open(os.devnull, 'w'),
+                                        open(os.devnull, 'w'),
+                                        30 if par['TIMEOUT'] else 0,
+                                        os.path.join(LogDBdir, '.started'),
+                                        TSTDB, dbg=env.get('DBG'))
                     pSrvr.LaunchIt()
                     pSrvr.terminate()
         if not os.path.exists(TSTTRGDIR):
@@ -1345,8 +1350,12 @@ def PerformDir(env, testdir, testlist, t
                     except:
                         pass
                     else:
+                        dbh.settimeout(10)
                         crs = dbh.cursor()
-                        crs.execute("call 
logging.setcomplevel('SQL_EXECUTION', 'INFO')")
+                        try:
+                            crs.execute("call 
logging.setcomplevel('SQL_EXECUTION', 'INFO')")
+                        except socket.timeout:
+                            print('\nTimeout setting log level.\n')
                         crs.close()
                         dbh.close()
                     os.environ['MAPIPORT'] = env['MAPIPORT'] = pSrvr.port
@@ -1656,12 +1665,12 @@ def GetBitsAndModsAndThreads(env) :
         proc.stderr = process._BufferedPipe(proc.stderr)
         proc.killed = False
         proc.onechild = True
-        t = Timer(float(par['TIMEOUT']), killProc, args = [proc, stderr, cmd])
         qOut = qErr = ''
         mods = []
+        timeout = par['TIMEOUT'] != 0
         try:
-            t.start()
-            while True:
+            starttime = time.time()
+            while not timeout or time.time() < starttime + 30:
                 proc.poll()
                 if proc.returncode is not None:
                     qOut = proc.stdout.read()
@@ -1669,7 +1678,7 @@ def GetBitsAndModsAndThreads(env) :
                     break
                 if os.path.exists(os.path.join(dbpath, '.started')):
                     break
-                time.sleep(0.001)
+                time.sleep(0.1)
             if proc.returncode is None:
                 connurl = open(os.path.join(dbpath, '.conn')).read()
                 res = mapiportre.search(connurl)
@@ -1679,28 +1688,38 @@ def GetBitsAndModsAndThreads(env) :
                                             hostname=HOST,
                                             port=int(res.group('port')),
                                             database=TSTPREF,
-                                            autocommit=True)
+                                            autocommit=True,
+                                            connect_timeout=1.0)
                 except KeyboardInterrupt:
                     raise
                 except:
                     pass
                 else:
+                    if timeout:
+                        dbh.settimeout(30)
                     crs = dbh.cursor()
-                    crs.execute('select distinct module from 
sys.malfunctions() order by module')
-                    mods = crs.fetchall()
-                    mods = [x[0] for x in mods]
                     try:
-                        mods.remove('user')
-                    except ValueError:
+                        crs.execute('select distinct module from 
sys.malfunctions() order by module')
+                    except socket.timeout:
                         pass
+                    else:
+                        mods = crs.fetchall()
+                        mods = [x[0] for x in mods]
+                        try:
+                            mods.remove('user')
+                        except ValueError:
+                            pass
                     crs.close()
                     dbh.close()
                 proc.terminate()
-                qOut = proc.stdout.read()
-                qErr = proc.stderr.read()
-                proc.wait()
+                qOut = proc.stdout.read(timeout=5)
+                qErr = proc.stderr.read(timeout=5)
+                try:
+                    proc.wait(timeout=5)
+                except TimeoutExpired:
+                    proc.kill()
+                    proc.wait()
         finally:
-            t.cancel()
             if proc.returncode is None:
                 killProc(proc, stderr, cmd)
                 proc.wait()
@@ -2572,15 +2591,15 @@ class ServerClass:
         return self.proc.poll()
 
     def terminate(self):
-        self.timer.cancel()
-        t = Timer(60, killProc, args=[self.proc, self.errfile, self.cmd, self])
-        t.start()
         if os.name == 'nt':
             self.proc.send_signal(signal.CTRL_BREAK_EVENT)
         else:
             self.proc.terminate()
-        self.proc.wait()
-        t.cancel()
+        try:
+            self.proc.wait(timeout=60)
+        except TimeoutExpired:
+            self.proc.kill()
+            self.wait()
         self.code = returnCode(self.proc, self.errfile)
         if self.pollfile is None:
             self.outfile.write(self.proc.stdout.read())
@@ -2611,28 +2630,34 @@ class ServerClass:
 
     def stopsessions(self):
         if self.lock.acquire(blocking=False):
-            if self.running is not None:
-                self.stacktrace()
-                try:
-                    dbh = pymonetdb.connect(username='monetdb',
-                                            password='monetdb',
-                                            hostname=HOST,
-                                            port=int(self.port),
-                                            database=self.dbname,
-                                            connect_timeout=1.0)
-                    crs = dbh.cursor()
-                    crs.execute('select sessionid from sys.sessions() where 
sessionid <> sys.current_sessionid()')
-                    ids = crs.fetchall()
-                    for x in ids:
-                        if procdebug:
-                            print(f'stopping session {x[0]}')
-                            crs.execute(f'call sys.stopsession({x[0]})')
-                    if procdebug and not ids:
-                        print('no sessions to stop')
-                except:
-                    pass
-                self.running = None
-            self.lock.release()
+            try:
+                if self.running is not None:
+                    self.stacktrace()
+                    try:
+                        dbh = pymonetdb.connect(username='monetdb',
+                                                password='monetdb',
+                                                hostname=HOST,
+                                                port=int(self.port),
+                                                database=self.dbname,
+                                                connect_timeout=1.0)
+                        dbh.settimeout(20)
+                        crs = dbh.cursor()
+                        crs.execute('select sessionid from sys.sessions() 
where sessionid <> sys.current_sessionid()')
+                        ids = crs.fetchall()
+                        dbh.settimeout(10)
+                        for x in ids:
+                            if procdebug:
+                                print(f'stopping session {x[0]}')
+                                crs.execute(f'call sys.stopsession({x[0]})')
+                        if procdebug and not ids:
+                            print('no sessions to stop')
+                    except socket.timeout:
+                        self.proc.kill()
+                    crs.close()
+                    dbh.close()
+                    self.running = None
+            finally:
+                self.lock.release()
 
     def LaunchIt(self):
         global setpgrp
@@ -2649,6 +2674,7 @@ class ServerClass:
         stderr = self.errfile
         if self.inmem:
             stdout = process.PIPE
+            self.pollfile = None
         elif self.pollfile:
             try:
                 os.unlink(self.pollfile)
@@ -2673,9 +2699,7 @@ class ServerClass:
             proc.stderr = process._BufferedPipe(proc.stderr)
         proc.killed = False
         proc.onechild = True
-        self.timer = Timer(self.timeout, killProc, args=[proc, self.errfile, 
self.cmd, self])
-        if self.timeout > 0:
-            self.timer.start()
+        starttime = time.time()
         self.proc = proc
 
         port = None
@@ -2685,19 +2709,39 @@ class ServerClass:
                 proc.poll()
                 if proc.returncode is not None:
                     # exited
-                    proc.wait()
-                    self.timer.cancel()
+                    if procdebug:
+                        print(f'server exited during startup with code 
{proc.returncode}')
                     return
                 if os.path.exists(self.pollfile):
                     break
-                time.sleep(0.001)
+                # wait at most 30 seconds for the server to start
+                if time.time() > starttime + 30:
+                    if procdebug:
+                        print('timeout starting server')
+                    if os.name == 'nt':
+                        proc.send_signal(signal.CTRL_BREAK_EVENT)
+                    else:
+                        proc.terminate()
+                    try:
+                        # wait 5 seconds for termination
+                        proc.communicate(timeout=5)
+                    except process.TimeoutExpired:
+                        # really kill
+                        proc.kill()
+                        proc.wait()
+                    return
+                time.sleep(0.1)
             connurl = open(os.path.join(os.path.split(self.pollfile)[0], 
'.conn')).read()
             res = mapiportre.search(connurl)
             port = res.group('port')
         else:
             loadedseen = False
+            endtime = time.time() + 30
             while port is None or not loadedseen:
-                line = proc.stdout.readline()
+                curtime = time.time()
+                if curtime >= endtime:
+                    break
+                line = proc.stdout.readline(timeout=endtime - curtime)
                 if not line:
                     break
                 self.outfile.write(line)
@@ -2713,7 +2757,6 @@ class ServerClass:
             else:
                 proc.terminate()
             proc.wait()
-            self.timer.cancel()
             return
         self.started = True
         self.port = port
@@ -2738,18 +2781,19 @@ def RunIt(cmd, onechild, TestIn, TestOut
                        stderr=TestErr, text=True) as proc:
         proc.killed = False
         proc.onechild = onechild
-        t = Timer(TimeOut, killProc, args = [proc, TestErr, cmd, pSrvr])
         try:
-            t.start()
             # since both stdout and stderr are redirected to files,
             # communicate will not return any useful data
-            proc.communicate(input = TestInput)
-            t.cancel()
+            try:
+                proc.communicate(input=TestInput, timeout=TimeOut if TimeOut 
else None)
+            except process.TimeoutExpired:
+                killProc(proc, TestErr, cmd)
+                proc.wait()
             if procdebug:
                 print('RunIt: process exited "%s" (%s)\n' % ('" "'.join(cmd), 
proc.returncode))
         except KeyboardInterrupt:
-            t.cancel()
             killProc(proc, TestErr, cmd)
+            proc.wait()
             if procdebug:
                 print('RunIt: process killed "%s"\n' % '" "'.join(cmd))
             raise
@@ -2880,8 +2924,6 @@ def DoIt(env, SERVER, CALL, TST, EXT, Te
                 SetExecEnv(exe,pSrvr.port,verbosity > 1)
             else:
                 PSRVR.start(TIMEOUT)
-                # PSRVR.timer.settimeout(TIMEOUT)
-                # PSRVR.timer.start()
         else:
             ClntOut = openutf8(TestOutFile, 'a')
             ClntErr = openutf8(TestErrFile, 'a')
@@ -3010,7 +3052,6 @@ def DoIt(env, SERVER, CALL, TST, EXT, Te
     finally:
         if PSRVR is not None:
             PSRVR.stop()
-            # PSRVR.timer.cancel()
         if SERVER == 'SQL' and pSrvr is not None:
             if PSRVR is None and pSrvr.started:
                 pSrvr.terminate()
diff --git a/testing/malmapi.py b/testing/malmapi.py
--- a/testing/malmapi.py
+++ b/testing/malmapi.py
@@ -164,6 +164,10 @@ class Connection(object):
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to