left to do: - test again - finish some minor refactoring, make sure in sensible commits - use pathlib instead of alternatives in test_manager/app.py - knife emoji for kill test hyperlink?
Diff comments: > diff --git > a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer > b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer > new file mode 100755 > index 0000000..7eefac5 > --- /dev/null > +++ > b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer > @@ -0,0 +1,181 @@ > +#!/usr/bin/python3 > +"""Kills running tests.""" > + > +import configparser > +import json > +import logging > +import os > +import socket > +import subprocess > +import time > + > +import amqplib.client_0_8 as amqp > + > +# from helpers.utils import init_db > + > + > +WRITER_EXCHANGE_NAME = "stop-running.fanout" > +# needs rabbitmq.cred done > +RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred" > +MSG_ONLY_KEYS = [ > + "uuid", > + "not-running-on", > +] > +NUM_WORKERS = 2 > +QUEUE_PATH = "/run/amqp-status-collector/running.json" > + > +RABBIT_CFG = configparser.ConfigParser() > +with open(RABBIT_CREDS, "r") as f: > + RABBIT_CFG.read_string("[rabbit]\n" + f.read().replace('"', "")) > + > + > +def amqp_connect(): > + amqp_con = amqp.Connection( > + RABBIT_CFG["rabbit"]["RABBIT_HOST"], > + userid=RABBIT_CFG["rabbit"]["RABBIT_USER"], > + password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"], > + confirm_publish=True, > + ) > + return amqp_con > + > + > +def check_message(msg): > + return list(msg.keys()) == MSG_ONLY_KEYS > + > + > +def get_test_pid_and_server_name(uuid): > + # ps aux | grep runner | grep uuid done > + ps_aux = "ps aux" done > + grep_runner = "grep runner" > + grep_for_uuid = "grep " + uuid > + try: done > + ps_aux_run = subprocess.run( > + ps_aux.split(" "), stdout=subprocess.PIPE, check=True > + ) > + runner_run = subprocess.run( > + grep_runner.split(" "), > + input=ps_aux_run.stdout, > + stdout=subprocess.PIPE, > + check=True, > + ) > + # If this one fails, the test isn't running on this worker > + uuid_run = subprocess.run( > + grep_for_uuid.split(" "), > + input=runner_run.stdout, > + capture_output=True, > + check=True, > + ) > + except subprocess.CalledProcessError as _: > + return None, None > + search_for_test_output = uuid_run.stdout > + search_me = search_for_test_output.splitlines() > + assert len(search_me) == 1 done > + line = search_me[0].decode("utf-8") > + if uuid in line: > + line = line.split(" ") > + line = [x for x in line if x] > + pid = line[1] > + return int(pid) > + > + > +def place_new_message_in_queue(info: dict, amqp_con: amqp.Connection): done > + complete_amqp = amqp_con.channel() > + complete_amqp.access_request( > + "/complete", active=True, read=False, write=True > + ) > + complete_amqp.exchange_declare( > + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False > + ) > + complete_amqp.basic_publish( > + amqp.Message(json.dumps(info), delivery_mode=2), > + WRITER_EXCHANGE_NAME, > + "", > + ) > + > + > +def kill_process(pid: int): > + # kill_cmd = "kill %i -15" % pid done > + # sends SIGUSR1 to worker done > + kill_cmd = "kill -USR1 %i" % pid > + _ = subprocess.run( done > + kill_cmd.split(" "), > + check=True, > + ) > + > + > +def kill_server(server_name: str): done > + pass > + > + > +def process_message(msg, amqp_con): > + body = msg.body > + if isinstance(body, bytes): > + body = body.decode("UTF-8", errors="replace") > + info = json.loads(body) > + logging.info("Message is: \n%s", json.dumps(info, indent=2)) done > + if not check_message(info): > + logging.error( > + "Message %s is invalid. Ignoring.", json.dumps(info, indent=2) > + ) > + # removes from queue! > + msg.channel.basic_reject(msg.delivery_tag, requeue=False) done > + return > + if len(info["not-running-on"]) == NUM_WORKERS: done > + queue_data = {} > + if os.path.isfile(QUEUE_PATH): > + with open(QUEUE_PATH, "r") as f: > + queue_data = json.load(f) > + if info["uuid"] in str(queue_data): > + msg.channel.basic_ack(msg.delivery_tag) > + # we need to reconstruct message with not-running-on = [] > + info["not-running-on"] = [] > + place_new_message_in_queue(info, amqp_con) > + else: > + # looks like the test has finished before we've had a chance > to kill it > + msg.channel.basic_ack(msg.delivery_tag) > + return > + else: done > + msg.channel.basic_ack(msg.delivery_tag) > + > + if socket.getfqdn() in info["not-running-on"]: I don't think functions necessary, done though > + # places back into queue as is > + msg.channel.basic_ack(msg.delivery_tag) > + logging.info( > + "Test already found to not be running on this host, placing back > into queue." > + ) > + place_new_message_in_queue(info, amqp_con) done > + return > + pid = get_test_pid_and_server_name(info["uuid"]) > + if pid is None: > + info["not-running-on"].append(socket.getfqdn()) > + msg.channel.basic_ack(msg.delivery_tag) > + place_new_message_in_queue(info, amqp_con) > + # removes from queue without saying it failed or anything done > + return > + kill_process(pid) > + ########################################## done > + # after killing the test, we need to remove the openstack server done > + # need to save the output of ps_aux > + while get_test_pid_and_server_name(info["uuid"]) is not None: done > + time.sleep(3) > + msg.channel.basic_ack(msg.delivery_tag) > + > + > +if __name__ == "__main__": > + logging.basicConfig(level=logging.INFO) > + amqp_con = amqp_connect() > + status_ch = amqp_con.channel() > + status_ch.access_request("/complete", active=True, read=True, write=True) > + # need to go back and check the functionality of durable=True and > auto_delete=False done > + status_ch.exchange_declare( > + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False > + ) > + queue_name = "tests-to-kill" > + status_ch.queue_declare(queue_name, durable=True, auto_delete=False) > + status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name) > + logging.info("Listening to requests on %s", queue_name) > + status_ch.basic_consume( > + "", no_ack=False, callback=lambda msg: process_message(msg, amqp_con) > + ) > + while status_ch.callbacks: > + status_ch.wait() > diff --git > a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker > b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker > index bfa35e7..9f60bce 100755 > --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker > +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker > @@ -222,6 +229,30 @@ def getglob(d, glob, default=None): > return default > > > +# signal. done > +# https://www.ibm.com/docs/en/aix/7.2?topic=management-process-termination > + > + > +# looks like we don't even need this lol... not that it even works. done > +def usr1_handler(signum, frame): > + """SIGUSR1 handler, for killing a test""" > + # okay so this doesn't seem to work at all > + # no log messaged, exit_requested doesn't get modified > + # oh wait nvm it actually does work? > + # sending this RN, it gets interpreted as a testbed failure > + # OH SHIT THAT'S ACTUALLY REALLY GOOD HAHAHAHAHAH > + # YAAAAAAAAAAAAAAAAAAAAAAAAAAY > + logging.error("Caught SIGUSR1, requesting exit") > + global autopkgtest > + global exit_requested > + autopkgtest.kill() > + _, _ = autopkgtest.communicate() > + shutil.rmtree(out_dir) > + os.mkdir(out_dir) > + logging.info("Exiting...") > + sys.exit(0) > + > + > def term_handler(signum, frame): > """SIGTERM handler, for clean exit after current test""" > > @@ -554,6 +585,8 @@ def call_autopkgtest( > """ > # set up status AMQP exchange > global amqp_con > + ########################################### done > + global autopkgtest > status_amqp = amqp_con.channel() > status_amqp.access_request("/data", active=True, read=False, write=True) > status_amqp.exchange_declare( > @@ -585,6 +618,7 @@ def call_autopkgtest( > ) > > ret = autopkgtest.wait() > + # looks like when we sigterm ret is -10, we can use this. done > send_status_info( > status_amqp, > release, > @@ -628,6 +662,7 @@ def request(msg): > for extra in list(systemd_logging_handler._extra.keys()): > if extra.startswith("ADT_"): > del systemd_logging_handler._extra[extra] > + global out_dir done > > # Re-read in case the big/long/no run lists changed, would be better to > # this only when needed via inotify. > @@ -1174,12 +1211,66 @@ def request(msg): > elif code == 16 or code < 0: > contents = log_contents(out_dir) > if exit_requested is not None: > - logging.warning( > - "Testbed failure and exit %i requested. Log > follows:", > - exit_requested, > - ) > - logging.error(contents) > - sys.exit(exit_requested) > + if exit_requested != 99: > + logging.warning( > + "Testbed failure and exit %i requested. Log > follows:", > + exit_requested, > + ) > + logging.error(contents) > + sys.exit(exit_requested) > + else: done > + # okay, this works, but the server is still > leftover :/ done > + # we can use python-novaclient here to kill the > server. > + logging.info( > + "Test has been killed by test-killer, > exiting." > + ) > + # for some reason, the novaclient doesn't work > here. Need to figure out why. done > + running_test = False > + msg.channel.basic_ack(msg.delivery_tag) > + shutil.rmtree(out_dir) > + # get server name > + argv_iter = iter(argv) > + while next(argv_iter) != "--name": > + pass > + openstack_server_name = next(argv_iter) > + logging.info( > + "Killing openstack server %s", > + openstack_server_name, > + ) > + if ( done > + > int(os.environ.get("OS_IDENTITY_API_VERSION")) > + == 3 > + ): > + auth = v3.Password( > + auth_url=os.environ["OS_AUTH_URL"], > + username=os.environ["OS_USERNAME"], > + password=os.environ["OS_PASSWORD"], > + > project_name=os.environ["OS_PROJECT_NAME"], > + user_domain_name=os.environ[ > + "OS_USER_DOMAIN_NAME" > + ], > + project_domain_name=os.environ[ > + "OS_PROJECT_DOMAIN_NAME" > + ], > + ) > + else: > + auth = v2.Password( > + auth_url=os.environ["OS_AUTH_URL"], > + username=os.environ["OS_USERNAME"], > + password=os.environ["OS_PASSWORD"], > + tenant_name=os.environ["OS_TENANT_NAME"], > + ) > + sess = session.Session(auth=auth) > + nova = novaclient.client.Client( > + "2", > + session=sess, > + region_name=os.environ["OS_REGION_NAME"], > + ) > + for instance in nova.servers.list(): > + if instance.name == openstack_server_name: > + instance.delete() > + logging.info("Deleted %s", openstack_server_name) > + return > # Get the package-specific string for triggers too, > since they might have broken the run > trigs = [ > t.split("/", 1)[0] for t in params.get("triggers", > []) > @@ -1507,6 +1598,7 @@ def main(): > > signal.signal(signal.SIGTERM, term_handler) > signal.signal(signal.SIGHUP, hup_handler) > + signal.signal(signal.SIGUSR1, usr1_handler) done > > # load configuration > cfg = configparser.ConfigParser( > diff --git a/charms/focal/autopkgtest-cloud-worker/units/test-killer.service > b/charms/focal/autopkgtest-cloud-worker/units/test-killer.service > new file mode 100644 > index 0000000..3c5ed87 > --- /dev/null > +++ b/charms/focal/autopkgtest-cloud-worker/units/test-killer.service > @@ -0,0 +1,13 @@ > +[Unit] > +Description=Test killer > +StartLimitIntervalSec=60s > +StartLimitBurst=60 done > + > +[Service] > +User=ubuntu > +ExecStart=/home/ubuntu/autopkgtest-cloud/tools/test-killer > +Restart=on-failure > +RestartSec=1s > + > +[Install] > +WantedBy=autopkgtest.target > diff --git a/charms/focal/autopkgtest-web/webcontrol/browse.cgi > b/charms/focal/autopkgtest-web/webcontrol/browse.cgi > index 309fb82..55b0a8d 100755 > --- a/charms/focal/autopkgtest-web/webcontrol/browse.cgi > +++ b/charms/focal/autopkgtest-web/webcontrol/browse.cgi > @@ -573,7 +584,17 @@ def running(): > running_count = 0 > for pkg in packages: > running_count += len(running_info[pkg].keys()) > - > + ######################################### > + # NEED TO GO BACK N MAKE SURE THAT I AMEND ALL USES OF THIS > + # modify the above. > + show_stop = False > + if session.get("nickname"): > + # modify running_info done > + nick = session.get("nickname") > + if nick in ADMIN_NICKS: done > + show_stop = True > + > + # maybe need to pass site url too? done > return render( > "browse-running.html", > releases=releases, > diff --git a/charms/focal/autopkgtest-web/webcontrol/templates/macros.html > b/charms/focal/autopkgtest-web/webcontrol/templates/macros.html > index 941dc77..3556acb 100644 > --- a/charms/focal/autopkgtest-web/webcontrol/templates/macros.html > +++ b/charms/focal/autopkgtest-web/webcontrol/templates/macros.html > @@ -7,14 +7,19 @@ > <tr><th>Architecture:</th><td>{{ arch }}</td></tr> > {% for param, v in params.items() %} > {% if param == "requester" %} > - <tr><th>{{ param|capitalize }}:</th><td><a > href="https://launchpad.net/~{{ v }}">{{ v }}</a></td></tr> > - {% elif param == "uuid" %} > - <tr><th>{{ param|upper }}:</th><td>{{ v }}</td></tr> > - {% else %} > - <tr><th>{{ param|capitalize }}:</th><td>{{ v }}</td></tr> > - {% endif %} > + <tr><th>{{ param|capitalize }}:</th><td><a > href="https://launchpad.net/~{{ v }}">{{ v }}</a></td></tr> > + {% elif param == "uuid" %} > + <tr><th>{{ param|upper }}:</th><td>{{ v }}</td></tr> > + {% else %} > + <tr><th>{{ param|capitalize }}:</th><td>{{ v }}</td></tr> > + {% endif %} > {% endfor %} > <tr><th>Running for:</th><td>{{ duration//3600 }}h {{ duration % > 3600//60 }}m {{ duration % 60 }}s ({{ duration }}s)</td></tr> > + {% if "uuid" in params.keys() %} done > + {% if show_stop %} > + <tr><td><a href="{{ base_url }}test-manager.cgi?uuid={{ > params.get("uuid") }}">Stop this test</a></td></tr> > + {% endif %} > + {% endif %} > </table> > <pre> > {{ logtail }} > diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py > b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py > new file mode 100644 > index 0000000..9bda8c6 > --- /dev/null > +++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py > @@ -0,0 +1,205 @@ > +""" done > + Method doc embedded here for now because why not > + Beware ye who read this code, this could all go so horribly wrong <3 > + test-manager is a new app in autopkgtest-web > + It takes a request like dis: > + { > + "uuid": uuid > + } > + the requests will be generated on the running page, with a hyperlink / > button much like the > + retries fing. > + peeps can also just manually run the post request if they want > + The request first checks that the test of the specified uuid is actually > running > + If it is currently running, test-manager then places a request like this in > the > + stop-running.fanout exchange > + { > + "uuid": uuid, > + "not-running-on": [] > + } > + The queue message will then get picked up by a worker unit, running the > + test-killer script as a systemd service > + test-killer checks if the test with that uuid is running on that specific > cloud worker > + if it is, test-killer kills the test * > + if it is not running on that specific cloud worker, test killer modifies > the message like dis: > + { > + "uuid": uuid, > + "not-running-on": [cloud-worker-hostname] > + } > + (and then any cloud worker that doesn't find it also adds their hostname to > the list) > + (test killer first checks this list and then sleeps so we don't check the > same cloud worker twice) > + test-killer acks the original message, and then places the new message, at > the back of the queue > + I know what you're thinking. Back of the queue? But there could be issues > with that, what if the > + test finishes before a test-killer systemd unit kills it? And to that I say: > + queue iz smol , dusnt matta > + test-killer should also ack the message if no cloud worker units find it > (len(not-running-on) == 3) > + Using one queue means we don't need to differentiate between cloud worker > and lxd worker or > + whatever > + what if found on no cloud workers ? > + - check queued.json. See if uuid in there > + - if is in there, test likely looping. Sleep for some time and place > message back into queue > + - if not, the test finished already. ack the message and place back into > queue. > + * NEED TO ENSURE WORKER EXITS PROPERLY AND ACKS TEST REQUEST MESSAGE FROM > QUEUE, instead of > + placing it back > + > + NEEDS TESTING STILL OVVI! > +""" > + > + > +import configparser > +import json > +import logging > +import os > +import urllib > +from collections import OrderedDict > +from html import escape as _escape > + > +import amqplib.client_0_8 as amqp > +import flask > +from flask import Flask, request, session # , redirect > +from flask_openid import OpenID > +from helpers.utils import get_all_releases, setup_key > +from werkzeug.middleware.proxy_fix import ProxyFix > + > +ALL_UBUNTU_RELEASES = get_all_releases() > +ADMIN_NICKS = [ > + "brian-murray", > + "andersson123", > + "paride", > + "hyask", > + "vorlon", > + "sil2000", > +] > +RUNNING_FP = "/run/amqp-status-collector/running.json" > +WRITER_EXCHANGE_NAME = "stop-running.fanout" > + > + > +# can move to utils in the future or something. When we refactor. > +# COPIED FUNCTIONS AND VARIABLES! > +def maybe_escape(value): > + """Escape the value if it is True-ish""" > + return _escape(value) if value else value > + > + > +def render(template, code=200, **kwargs): > + # sort the values passed in, so that releases are in the right order > + try: > + release_arches = OrderedDict() > + for k in sorted( > + kwargs["release_arches"], key=ALL_UBUNTU_RELEASES.index > + ): > + release_arches[k] = kwargs["release_arches"][k] > + kwargs["release_arches"] = release_arches > + except KeyError: > + pass > + try: > + kwargs["releases"] = sorted( > + kwargs["releases"], key=ALL_UBUNTU_RELEASES.index > + ) > + except KeyError: > + pass > + return ( > + flask.render_template( > + template, > + base_url=flask.url_for("index_root"), > + static_url=flask.url_for("static", filename=""), > + **kwargs, > + ), > + code, > + ) > + > + > +HTML = """ done > +<!doctype html> > +<html> > +<head> > +<meta charset="utf-8"> > +<title>Autopkgtest Test Request</title> > +</head> > +<body> > +{} > +</body> > +</html> > +""" > + > + > +def submit_to_queue(message): > + amqp_con = amqp_connect() > + complete_amqp = amqp_con.channel() > + complete_amqp.access_request( > + "/complete", active=True, read=False, write=True > + ) > + complete_amqp.exchange_declare( > + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False > + ) > + complete_amqp.basic_publish( > + amqp.Message(json.dumps(message), delivery_mode=2), > + WRITER_EXCHANGE_NAME, > + "", > + ) > + > + > +def amqp_connect(): > + """Connect to AMQP server""" > + cp = configparser.ConfigParser() > + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf")) > + amqp_uri = cp["amqp"]["uri"] > + parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False) > + amqp_con = amqp.Connection( > + parts.hostname, userid=parts.username, password=parts.password > + ) > + logging.info( > + "Connected to AMQP server at %s@%s" % (parts.username, > parts.hostname) > + ) > + return amqp_con > + > + > +# Initialize app done > +PATH = os.path.join( > + os.path.sep, os.getenv("XDG_RUNTIME_DIR", "/run"), > "autopkgtest_webcontrol" > +) > +os.makedirs(PATH, exist_ok=True) > +app = Flask("test_manager") > +app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1) > +# keep secret persistent between CGI invocations > +secret_path = os.path.join(PATH, "secret_key") > +setup_key(app, secret_path) > +oid = OpenID(app, os.path.join(PATH, "openid"), safe_roots=[]) > + > + > +@app.route("/", methods=["GET", "POST"]) > +def index_root(): > + """Handle stop test requests""" > + session.permanent = True doesn't need refactor I think > + nick = maybe_escape(session.get("nickname")) > + # amend me > + if nick not in ADMIN_NICKS: done > + return > + # check that nick is acceptable > + params = { I don't think refactor necessary > + maybe_escape(k): maybe_escape(v) for k, v in request.args.items() > + } > + base = ["uuid"] > + if list(params.keys()) != base: > + # all we need is the uuid tyvm > + return > + if not os.path.isfile(RUNNING_FP): > + # return properly > + return > + running_data = {} > + with open(RUNNING_FP, "r") as f: > + running_data = json.load(f) > + str_running_data = str(running_data) done > + if params["uuid"] not in str_running_data: > + # test uuid doesn't correspond with any test listed as running in > running.json > + return > + # now we submit to queue... > + queue_message = { > + "uuid": params["uuid"], > + "not-running-on": [], > + } > + submit_to_queue(queue_message) > + myhtml = "<p>Submitted %s to exchange %s</p>" % ( > + json.dumps(queue_message), > + WRITER_EXCHANGE_NAME, > + ) > + return HTML.format(myhtml), 200 -- https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654 Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master. -- Mailing list: https://launchpad.net/~canonical-ubuntu-qa Post to : canonical-ubuntu-qa@lists.launchpad.net Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa More help : https://help.launchpad.net/ListHelp