Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master.
Requested reviews: Canonical's Ubuntu QA (canonical-ubuntu-qa) For more details, see: https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654 -- Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer new file mode 100644 index 0000000..a0dab6a --- /dev/null +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer @@ -0,0 +1,154 @@ +#!/usr/bin/python3 +"""Kills running tests.""" + +import configparser +import json +import logging +import subprocess +import time +import os +import socket +import sqlite3 +import urllib.parse + +import amqplib.client_0_8 as amqp +from helpers.utils import init_db + + +WRITER_EXCHANGE_NAME = "stop-running.fanout" +# needs rabbitmq.cred +RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred" +MSG_ONLY_KEYS = [ + "uuid", + "not-running-on", +] + +RABBIT_CFG = configparser.ConfigParser() +with open(RABBIT_CREDS, "r") as f: + RABBIT_CFG.read_string("[rabbit]\n" + f.read()) + + + +def amqp_connect(): + amqp_con = amqp.Connection( + RABBIT_CFG["rabbit"]["RABBIT_HOST"], + userid=RABBIT_CFG["rabbit"]["RABBIT_USER"], + password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"], + confirm_publish=True, + ) + return amqp_con + + +def check_message(msg): + return list(msg.keys()) == MSG_ONLY_KEYS + + +def get_test_pid(uuid): + # ps aux | grep runner | grep uuid + ps_aux = "ps aux" + grep_runner = "grep runner" + grep_for_uuid = "grep " + uuid + try: + ps_aux_run = subprocess.run( + ps_aux, + stdout=subprocess.PIPE, + check=True + ) + runner_run = subprocess.run( + grep_runner, + stdin=ps_aux_run.stdout, + stdout=subprocess.PIPE, + check=True, + ) + # If this one fails, the test isn't running on this worker + uuid_run = subprocess.run( + grep_for_uuid, + stdin=runner_run.stdout, + capture_output=True, + check=True + ) + except subprocess.CalledProcessError as _: + return None + search_for_test_output = uuid_run.stdout + search_me = search_for_test_output.splitlines() + for line in search_me: + if uuid in line: + line = line.split(" ") + line = [x for x in line if x] + pid = line[1] + return int(pid) + + +def place_new_message_in_queue(info: dict, amqp_con: amqp.Connection): + complete_amqp = amqp_con.channel() + complete_amqp.access_request( + "/complete", active=True, read=False, write=True + ) + complete_amqp.exchange_declare( + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False + ) + complete_amqp.basic_publish( + amqp.Message(json.dumps(info), delivery_mode=2), + WRITER_EXCHANGE_NAME, + "" + ) + + +def kill_process(pid: int): + kill_cmd = "kill %i -15" % pid + _ = subprocess.run( + kill_cmd.split(" "), + check=True, + ) + + +def process_message(msg, amqp_con): + body = msg.body + if isinstance(body, bytes): + body = body.decode("UTF-8", errors="replace") + info = json.loads(body) + logging.info("Message is: \n%s", json.dumps(info, indent=2)) + if not check_message(info): + logging.error("Message %s is invalid. Ignoring.", json.dumps(info, indent=2)) + # removes from queue! + msg.channel.basic_reject(msg.delivery_tag) + return + if socket.getfqdn() in info["not-running-on"]: + # places back into queue as is + msg.channel.basic_nack(msg.delivery_tag) + time.sleep(30) # <- needs some thought + pid = get_test_pid(info["uuid"]) + if pid is None: + info["not-running-on"].append(socket.getfqdn()) + place_new_message_in_queue(info, amqp_con) + # removes from queue without saying it failed or anything + msg.channel.basic_ack(msg.delivery_tag) + kill_process(pid) + while get_test_pid(info["uuid"]) is not None: + time.sleep(3) + # grats, we killed the test, we shilling + msg.channel.basic_ack(msg.delivery_tag) + # all left to do is make sure worker handles the kill command properly. + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + amqp_con = amqp_connect() + status_ch = amqp_con.channel() + status_ch.access_request("/complete", active=True, read=True, write=True) + # need to go back and check the functionality of durable=True and auto_delete=False + status_ch.exchange_declare( + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False + ) + queue_name = "tests-to-kill" + status_ch.queue_declare(queue_name, durable=True, auto_delete=False) + status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name) + logging.info("Listening to requests on %s", queue_name) + status_ch.basic_consume( + "", callback=lambda msg: process_message(msg, amqp_con) + ) + while status_ch.callbacks: + status_ch.wait() + + + pass \ No newline at end of file diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker index 9ff2ff1..147a331 100755 --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker @@ -219,8 +219,16 @@ def getglob(d, glob, default=None): return default +# signal. +# https://www.ibm.com/docs/en/aix/7.2?topic=management-process-termination def term_handler(signum, frame): + # kill -15 sends TERM + # so we gotta modify this to kill test immediately + # well, it just has to basic_ack the message + # need to modify this to terminate the test immediately? + # and make sure the amqp message is ack'd so it doesn't go back into queue + # maybe it needs to be hup_handler instead """SIGTERM handler, for clean exit after current test""" logging.info("Caught SIGTERM, requesting exit") diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/__init__.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/__init__.py diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py new file mode 100644 index 0000000..45a9d8d --- /dev/null +++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py @@ -0,0 +1,137 @@ +""" +This'll be for stopping already running tests +workflow is +running page will show a stop test option to privileged users OR the requester of that specific test +clicking that button will redirect to +a.u.c/test-manager.cgi?uuid=$uuid +test_manager module submits message with just uuid to rabbitmq queue - perhaps called kill-these-tests +systemd service on worker - test-killer or something +absorbs rabbitmq messages +kills the test with uuid specified +removes uuid item from queue when done +need to make sure when worker exits, it ack's the message so the test doesn't go back into the queue. +ezpz +""" + + +from flask import Flask, redirect, request, session +from flask_openid import OpenID +from html import escape as _escape +import os +import json +import configparser +import logging +import urllib +from werkzeug.middleware.proxy_fix import ProxyFix +import amqplib.client_0_8 as amqp + + +from helpers.utils import setup_key + +RUNNING_FP = "/run/amqp-status-collector/running.json" +WRITER_EXCHANGE_NAME = "stop-running.fanout" + + +def maybe_escape(value): + """Escape the value if it is True-ish""" + return _escape(value) if value else value + + +def submit_to_queue(message): + amqp_con = amqp_connect() + complete_amqp = amqp_con.channel() + complete_amqp.access_request( + "/complete", active=True, read=False, write=True + ) + complete_amqp.exchange_declare( + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False + ) + complete_amqp.basic_publish( + amqp.Message(json.dumps(message), delivery_mode=2), + WRITER_EXCHANGE_NAME, + "", + ) + + +def amqp_connect(): + """Connect to AMQP server""" + cp = configparser.ConfigParser() + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf")) + amqp_uri = cp["amqp"]["uri"] + parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False) + amqp_con = amqp.Connection( + parts.hostname, userid=parts.username, password=parts.password + ) + logging.info( + "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname) + ) + return amqp_con + + +# Initialize app +PATH = os.path.join( + os.path.sep, os.getenv("XDG_RUNTIME_DIR", "/run"), "autopkgtest_webcontrol" +) +os.makedirs(PATH, exist_ok=True) +app = Flask("request") +app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1) +# keep secret persistent between CGI invocations +secret_path = os.path.join(PATH, "secret_key") +setup_key(app, secret_path) +oid = OpenID(app, os.path.join(PATH, "openid"), safe_roots=[]) + + +@app.route("/", methods=["POST"]) +def index_root(): + """Handle stop test requests""" + session.permanent = True + nick = maybe_escape(session.get("nickname")) + params = { + maybe_escape(k): maybe_escape(v) for k, v in request.args.items() + } + base = ["uuid"] + if list(params.keys()) != base: + # all we need is the uuid tyvm + return + if not os.path.isfile(RUNNING_FP): + return + running_data = {} + with open(RUNNING_FP, "r") as f: + running_data = json.load(f) + str_running_data = str(running_data) + if params["uuid"] not in str_running_data: + # test uuid doesn't correspond with any test listed as running in running.json + return + # now we submit to queue... + queue_message = { + "uuid": params["uuid"], + "not-running-on": [], + } + submit_to_queue(queue_message) + # need to add return here with message to the user + + + # running_info = + # check args + # check uuid is valid? + # check uuid is on running page? acc in running.json + # then submit to deleter queue + # right? + # what should the queue name be? + # 2 cloud workers, 2 web workers ... + # need to figure out ... + # gotta be one queue, as it could be either cloud worker + # maybe some tags? + # separate queue for cloud and lxd worker + # check arch - if armhf, separate queue for lxd worker. + # maybe don't need to check, and can have one queue + # add hostname to tried... + # message goes to cloud worker without the test, append hostname to not-running-on if + # the test isn't running on that cloud worker. + # place the message back in the queue + # sleep a bit? wait for other cloud worker to get the message? + # if hostname in "not-running-on" just place message back in the queue and sleep. + # { + # "uuid": str, + # "not-running-on": [""] + # }
-- Mailing list: https://launchpad.net/~canonical-ubuntu-qa Post to : canonical-ubuntu-qa@lists.launchpad.net Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa More help : https://help.launchpad.net/ListHelp