inline comments all addressed Diff comments:
> diff --git > a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer > b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer > new file mode 100755 > index 0000000..b1bc37e > --- /dev/null > +++ > b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer > @@ -0,0 +1,211 @@ > +#!/usr/bin/python3 > +"""Kills running tests.""" > + > +import configparser > +import json > +import logging > +import socket > +import subprocess > +import time > + > +import amqplib.client_0_8 as amqp > +import requests > + > +WRITER_EXCHANGE_NAME = "stop-running.fanout" > +RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred" > +MSG_ONLY_KEYS = [ > + "uuid", > + "not-running-on", > +] > +NUM_WORKERS = 2 done > + > +RABBIT_CFG = configparser.ConfigParser() > +with open(RABBIT_CREDS, "r") as f: > + RABBIT_CFG.read_string("[rabbit]\n" + f.read().replace('"', "")) > + > + > +def amqp_connect(): > + amqp_con = amqp.Connection( > + RABBIT_CFG["rabbit"]["RABBIT_HOST"], > + userid=RABBIT_CFG["rabbit"]["RABBIT_USER"], > + password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"], > + confirm_publish=True, > + ) > + return amqp_con > + > + > +def check_message(msg): > + return list(msg.keys()) == MSG_ONLY_KEYS > + > + > +def get_test_pid(uuid): > + try: > + # get list of running processes > + ps_aux_run = subprocess.run( > + ["ps", "aux"], > + stdout=subprocess.PIPE, > + check=True, > + ) > + # Filter the list for only 'runner' processes > + runner_run = subprocess.run( > + ["grep", "runner"], > + input=ps_aux_run.stdout, > + stdout=subprocess.PIPE, > + check=True, > + ) > + # Check all runner processes for the given uuid > + # If this one fails, the test isn't running on this worker > + uuid_run = subprocess.run( > + ["grep", uuid], > + input=runner_run.stdout, > + capture_output=True, > + check=True, > + ) > + except subprocess.CalledProcessError as _: > + # We hit this exception if the test with the given uuid > + # isn't running on this cloud worker > + return None > + search_for_test_output = uuid_run.stdout > + search_me = search_for_test_output.splitlines() > + # We have to assert the length is 1 otherwise we'll only kill > + # the first one in the list - which may be the incorrect one > + # if there's two processes with same uuid - something is wrong! > + assert len(search_me) == 1 > + line = search_me[0].decode("utf-8") > + if uuid in line: i think is fine > + line = line.split(" ") > + line = [x for x in line if x] > + pid = line[1] > + return int(pid) > + > + > +def place_message_in_queue(info: dict, amqp_con: amqp.Connection): > + complete_amqp = amqp_con.channel() > + complete_amqp.access_request( > + "/complete", active=True, read=False, write=True > + ) > + complete_amqp.exchange_declare( > + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False > + ) > + complete_amqp.basic_publish( > + amqp.Message(json.dumps(info), delivery_mode=2), > + WRITER_EXCHANGE_NAME, > + "", > + ) > + > + > +def kill_process(pid: int, uuid: str): > + # sends SIGUSR1 to worker > + # This causes autopkgtest to exit with code -10 > + # which the worker then detects, exits the test and kills > + # the openstack server, then the worker goes on to the next > + # test in the queue > + kill_cmd = "kill -USR1 %i" % pid > + try: > + _ = subprocess.run( > + kill_cmd.split(" "), > + check=True, > + ) > + while get_test_pid(uuid) is not None: > + time.sleep(3) done > + return True > + except subprocess.CalledProcessError as _: > + return False > + > + > +def test_is_queued(uuid: str): done > + influx_cfg = configparser.ConfigParser() > + with open("/home/ubuntu/influx.cred", "r") as f: > + influx_cfg.read_string("[influx]\n" + f.read()) > + if influx_cfg["influx"]["INFLUXDB_CONTEXT"] == "staging": > + autopkgtest_url = "https://autopkgtest.staging.ubuntu.com" > + else: > + autopkgtest_url = "https://autopkgtest.ubuntu.com" > + queue_req = requests.get(autopkgtest_url) done > + if uuid in queue_req.content.decode("utf-8"): > + return True > + return False > + > + > +def already_checked_this_host(hostnames): done > + if socket.getfqdn() in hostnames: > + return True > + return False > + > + > +def process_message(msg, amqp_con): > + body = msg.body > + if isinstance(body, bytes): > + body = body.decode("UTF-8", errors="replace") > + info = json.loads(body) > + logging.info("Received request to kill test: %s" % json.dumps(info)) > + if not check_message(info): > + logging.error( > + "Message %s is invalid. Ignoring.", json.dumps(info, indent=2) > + ) > + # Remove the message from the queue > + msg.channel.basic_ack(msg.delivery_tag) > + return > + if len(info["not-running-on"]) == NUM_WORKERS: > + # If the test hasn't been found on any of the workers, we reach this > + # Check if the test is currently queued - this could happen in the > case need to talk to team about this change methinks > + # of infinite looping. > + if test_is_queued(info["uuid"]): > + msg.channel.basic_ack(msg.delivery_tag) > + info["not-running-on"] = [] > + place_message_in_queue(info, amqp_con) > + else: > + msg.channel.basic_ack(msg.delivery_tag) > + return > + > + if already_checked_this_host(info["not-running-on"]): > + # We check to see if we've already checked for the job on this cloud > worker unit. > + msg.channel.basic_ack(msg.delivery_tag) > + logging.info( > + "Test already found to not be running on this host, placing back > into queue." > + ) > + place_message_in_queue(info, amqp_con) > + return > + # get the test pid > + pid = get_test_pid(info["uuid"]) > + if pid is None: > + # The test isn't running on this unit > + # append this hostname to not-running-on > + msg.channel.basic_ack(msg.delivery_tag) > + info["not-running-on"].append(socket.getfqdn()) done > + if len(info["not-running-on"]) == NUM_WORKERS: > + logging.info( > + "Job %s not found on any workers, not re-queueing." > + % json.dumps(info) > + ) > + return > + place_message_in_queue(info, amqp_con) > + return > + # Kill the process > + if kill_process(pid, info["uuid"]): done > + msg.channel.basic_ack(msg.delivery_tag) > + logging.info("Job %s has been killed." % json.dumps(info)) > + else: > + logging.error( > + "Job %s couldn't be killed! Ignoring." % json.dumps(info) > + ) > + msg.channel.basic_ack(msg.delivery_tag) > + > + > +if __name__ == "__main__": > + logging.basicConfig(level=logging.INFO) > + amqp_con = amqp_connect() > + status_ch = amqp_con.channel() > + status_ch.access_request("/complete", active=True, read=True, write=True) > + status_ch.exchange_declare( > + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False > + ) > + queue_name = "tests-to-kill" > + status_ch.queue_declare(queue_name, durable=True, auto_delete=False) > + status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name) > + logging.info("Listening to requests on %s", queue_name) > + status_ch.basic_consume( > + "", callback=lambda msg: process_message(msg, amqp_con) > + ) > + while status_ch.callbacks: > + status_ch.wait() > diff --git a/charms/focal/autopkgtest-web/webcontrol/browse.cgi > b/charms/focal/autopkgtest-web/webcontrol/browse.cgi > index 309fb82..959fdcd 100755 > --- a/charms/focal/autopkgtest-web/webcontrol/browse.cgi > +++ b/charms/focal/autopkgtest-web/webcontrol/browse.cgi > @@ -42,6 +32,14 @@ SUPPORTED_UBUNTU_RELEASES = get_supported_releases() > INDEXED_PACKAGES_FP = "" > AMQP_QUEUE_CACHE = "/var/lib/cache-amqp/queued.json" > RUNNING_CACHE = "/run/amqp-status-collector/running.json" > +ADMIN_NICKS = [ make shared function actually EDIT: done > + "brian-murray", > + "andersson123", > + "paride", > + "hyask", > + "vorlon", > + "sil2000", > +] > > > def init_config(): > diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py > b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py > new file mode 100644 > index 0000000..7848e2e > --- /dev/null > +++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py > @@ -0,0 +1,131 @@ > +""" > +# NEW DOCS done > +test-manager is an app for autopkgtest-web which sends kill requests to > +the worker units, detailing the test uuid. > + > +The worker units then kill the test with the matching uuid. > + > +# NEED TO ADD THE LINK TO RESULTS PAGES TOO? done > +On the running page, admins will have a hyperlink under running jobs which, > when clicked, > +will send the kill request. > + > +Before sending the kill request, test_manager checks that the uuid is indeed > in running.json > + > +After sending the kill request, the request is picked up by a systemd unit > named "test-killer" > +on the cloud worker units. > + > +This unit, through all the cloud worker units, will find which unit the test > is running on, > +and kill the test, removing the test request from the queue and making the > worker unit move > +on to the next test in the queue. > +""" > + > + > +import configparser > +import json > +import logging > +import os > +import pathlib > +import urllib > + > +import amqplib.client_0_8 as amqp > +from flask import request, session > +from helpers.exceptions import RunningJSONNotFound > +from helpers.utils import HTML, get_all_releases, initialise_app, > maybe_escape > + > +ALL_UBUNTU_RELEASES = get_all_releases() > +# This should actually be loaded dynamically. Not in global namespace done > +ADMIN_NICKS_PATH = "/home/ubuntu/.config/autopkgtest-web/admin-nicks" > +try: > + ADMIN_NICKS = pathlib.Path(ADMIN_NICKS_PATH).read_text().split(",") > +except FileNotFoundError as _: > + ADMIN_NICKS = [] > + > +RUNNING_FP = "/run/amqp-status-collector/running.json" > +RUNNING_FILE = pathlib.Path("/run/amqp-status-collector/running.json") > +WRITER_EXCHANGE_NAME = "stop-running.fanout" > + > + > +def submit_to_queue(message): > + amqp_con = amqp_connect() > + complete_amqp = amqp_con.channel() > + complete_amqp.access_request( > + "/complete", active=True, read=False, write=True > + ) > + complete_amqp.exchange_declare( > + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False > + ) > + complete_amqp.basic_publish( > + amqp.Message(json.dumps(message), delivery_mode=2), > + WRITER_EXCHANGE_NAME, > + "", > + ) > + > + > +def amqp_connect(): > + """Connect to AMQP server""" > + cp = configparser.ConfigParser() > + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf")) > + amqp_uri = cp["amqp"]["uri"] > + parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False) > + amqp_con = amqp.Connection( > + parts.hostname, userid=parts.username, password=parts.password > + ) > + logging.info( > + "Connected to AMQP server at %s@%s" % (parts.username, > parts.hostname) > + ) > + return amqp_con > + > + > +PATH, app, secret_path, oid = initialise_app("test_manager") > + > + > +@app.route("/", methods=["GET", "POST"]) > +def index_root(): > + """Handle stop test requests""" > + session.permanent = True > + nick = maybe_escape(session.get("nickname")) > + if nick not in ADMIN_NICKS: > + return ( > + HTML.format( > + ( > + "<p>You are not an admin. You are not " > + "allowed to use this endpoint.</p>" > + ) > + ), > + 200, > + ) > + params = { > + maybe_escape(k): maybe_escape(v) for k, v in request.args.items() > + } > + base = ["uuid"] done > + if list(params.keys()) != base: > + return ( > + HTML.format( > + "<p>You have passed %s, please only pass the uuid</p>" > + % ",".join(params.keys()) > + ), > + 200, > + ) > + if not RUNNING_FILE.is_file(): > + raise RunningJSONNotFound > + running_data = json.loads(RUNNING_FILE.read_text()) > + if params["uuid"] not in json.dumps(running_data): > + return ( > + HTML.format( > + "<p>uuid %s not found in running jobs</p>" % params["uuid"] > + ), > + 200, > + ) > + queue_message = { > + "uuid": params["uuid"], > + "not-running-on": [], > + } > + submit_to_queue(queue_message) > + while params["uuid"] in RUNNING_FILE.read_text(): > + pass > + return ( > + HTML.format( > + "<p>Test with uuid %s has been killed.</p>" % params["uuid"] > + ), > + 200, > + ) -- https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654 Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master. -- Mailing list: https://launchpad.net/~canonical-ubuntu-qa Post to : canonical-ubuntu-qa@lists.launchpad.net Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa More help : https://help.launchpad.net/ListHelp