Tim Andersson has proposed merging 
~andersson123/autopkgtest-cloud:stop-tests-from-webpage into 
autopkgtest-cloud:master.

Requested reviews:
  Canonical's Ubuntu QA (canonical-ubuntu-qa)

For more details, see:
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654
-- 
Your team Canonical's Ubuntu QA is requested to review the proposed merge of 
~andersson123/autopkgtest-cloud:stop-tests-from-webpage into 
autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
new file mode 100644
index 0000000..a0dab6a
--- /dev/null
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
@@ -0,0 +1,154 @@
+#!/usr/bin/python3
+"""Kills running tests."""
+
+import configparser
+import json
+import logging
+import subprocess
+import time
+import os
+import socket
+import sqlite3
+import urllib.parse
+
+import amqplib.client_0_8 as amqp
+from helpers.utils import init_db
+
+
+WRITER_EXCHANGE_NAME = "stop-running.fanout"
+# needs rabbitmq.cred
+RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred"
+MSG_ONLY_KEYS = [
+    "uuid",
+    "not-running-on",
+]
+
+RABBIT_CFG = configparser.ConfigParser()
+with open(RABBIT_CREDS, "r") as f:
+    RABBIT_CFG.read_string("[rabbit]\n" + f.read())
+
+
+
+def amqp_connect():
+    amqp_con = amqp.Connection(
+        RABBIT_CFG["rabbit"]["RABBIT_HOST"],
+        userid=RABBIT_CFG["rabbit"]["RABBIT_USER"],
+        password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"],
+        confirm_publish=True,
+    )
+    return amqp_con
+
+
+def check_message(msg):
+    return list(msg.keys()) == MSG_ONLY_KEYS
+
+
+def get_test_pid(uuid):
+    # ps aux | grep runner | grep uuid
+    ps_aux = "ps aux"
+    grep_runner = "grep runner"
+    grep_for_uuid = "grep " + uuid
+    try:
+        ps_aux_run = subprocess.run(
+            ps_aux,
+            stdout=subprocess.PIPE,
+            check=True
+        )
+        runner_run = subprocess.run(
+            grep_runner,
+            stdin=ps_aux_run.stdout,
+            stdout=subprocess.PIPE,
+            check=True,
+        )
+        # If this one fails, the test isn't running on this worker
+        uuid_run = subprocess.run(
+            grep_for_uuid,
+            stdin=runner_run.stdout,
+            capture_output=True,
+            check=True
+        )
+    except subprocess.CalledProcessError as _:
+        return None
+    search_for_test_output = uuid_run.stdout
+    search_me = search_for_test_output.splitlines()
+    for line in search_me:
+        if uuid in line:
+            line = line.split(" ")
+            line = [x for x in line if x]
+            pid = line[1]
+            return int(pid)
+
+
+def place_new_message_in_queue(info: dict, amqp_con: amqp.Connection):
+    complete_amqp = amqp_con.channel()
+    complete_amqp.access_request(
+        "/complete", active=True, read=False, write=True
+    )
+    complete_amqp.exchange_declare(
+        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
+    )
+    complete_amqp.basic_publish(
+        amqp.Message(json.dumps(info), delivery_mode=2),
+        WRITER_EXCHANGE_NAME,
+        ""
+    )
+
+
+def kill_process(pid: int):
+    kill_cmd = "kill %i -15" % pid
+    _ = subprocess.run(
+        kill_cmd.split(" "),
+        check=True,
+    )
+
+
+def process_message(msg, amqp_con):
+    body = msg.body
+    if isinstance(body, bytes):
+        body = body.decode("UTF-8", errors="replace")
+    info = json.loads(body)
+    logging.info("Message is: \n%s", json.dumps(info, indent=2))
+    if not check_message(info):
+        logging.error("Message %s is invalid. Ignoring.", json.dumps(info, indent=2))
+        # removes from queue!
+        msg.channel.basic_reject(msg.delivery_tag)
+        return
+    if socket.getfqdn() in info["not-running-on"]:
+        # places back into queue as is
+        msg.channel.basic_nack(msg.delivery_tag)
+        time.sleep(30) # <- needs some thought
+    pid = get_test_pid(info["uuid"])
+    if pid is None:
+        info["not-running-on"].append(socket.getfqdn())
+        place_new_message_in_queue(info, amqp_con)
+        # removes from queue without saying it failed or anything
+        msg.channel.basic_ack(msg.delivery_tag)
+    kill_process(pid)
+    while get_test_pid(info["uuid"]) is not None:
+        time.sleep(3)
+    # grats, we killed the test, we shilling
+    msg.channel.basic_ack(msg.delivery_tag)
+    # all left to do is make sure worker handles the kill command properly.
+
+
+if __name__ == "__main__":
+    logging.basicConfig(level=logging.INFO)
+    amqp_con = amqp_connect()
+    status_ch = amqp_con.channel()
+    status_ch.access_request("/complete", active=True, read=True, write=True)
+    # need to go back and check the functionality of durable=True and auto_delete=False
+    status_ch.exchange_declare(
+        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
+    )
+    queue_name = "tests-to-kill"
+    status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
+    status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name)
+    logging.info("Listening to requests on %s", queue_name)
+    status_ch.basic_consume(
+        "", callback=lambda msg: process_message(msg, amqp_con)
+    )
+    while status_ch.callbacks:
+        status_ch.wait()
+
+
+    pass
\ No newline at end of file
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
index 9ff2ff1..147a331 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
@@ -219,8 +219,16 @@ def getglob(d, glob, default=None):
 
     return default
 
+# signal.
+# https://www.ibm.com/docs/en/aix/7.2?topic=management-process-termination
 
 def term_handler(signum, frame):
+    # kill -15 sends TERM
+    # so we gotta modify this to kill test immediately
+    # well, it just has to basic_ack the message
+    # need to modify this to terminate the test immediately?
+    # and make sure the amqp message is ack'd so it doesn't go back into queue
+    # maybe it needs to be hup_handler instead
     """SIGTERM handler, for clean exit after current test"""
 
     logging.info("Caught SIGTERM, requesting exit")
diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/__init__.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/__init__.py
diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
new file mode 100644
index 0000000..45a9d8d
--- /dev/null
+++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
@@ -0,0 +1,137 @@
+"""
+This'll be for stopping already running tests
+workflow is
+running page will show a stop test option to privileged users OR the requester of that specific test
+clicking that button will redirect to
+a.u.c/test-manager.cgi?uuid=$uuid
+test_manager module submits message with just uuid to rabbitmq queue - perhaps called kill-these-tests
+systemd service on worker - test-killer or something
+absorbs rabbitmq messages
+kills the test with uuid specified
+removes uuid item from queue when done
+need to make sure when worker exits, it ack's the message so the test doesn't go back into the queue.
+ezpz
+"""
+
+
+from flask import Flask, redirect, request, session
+from flask_openid import OpenID
+from html import escape as _escape
+import os
+import json
+import configparser
+import logging
+import urllib
+from werkzeug.middleware.proxy_fix import ProxyFix
+import amqplib.client_0_8 as amqp
+
+
+from helpers.utils import setup_key
+
+RUNNING_FP = "/run/amqp-status-collector/running.json"
+WRITER_EXCHANGE_NAME = "stop-running.fanout"
+
+
+def maybe_escape(value):
+    """Escape the value if it is True-ish"""
+    return _escape(value) if value else value
+
+
+def submit_to_queue(message):
+    amqp_con = amqp_connect()
+    complete_amqp = amqp_con.channel()
+    complete_amqp.access_request(
+        "/complete", active=True, read=False, write=True
+    )
+    complete_amqp.exchange_declare(
+        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
+    )
+    complete_amqp.basic_publish(
+        amqp.Message(json.dumps(message), delivery_mode=2),
+        WRITER_EXCHANGE_NAME,
+        "",
+    )
+
+
+def amqp_connect():
+    """Connect to AMQP server"""
+    cp = configparser.ConfigParser()
+    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
+    amqp_uri = cp["amqp"]["uri"]
+    parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
+    amqp_con = amqp.Connection(
+        parts.hostname, userid=parts.username, password=parts.password
+    )
+    logging.info(
+        "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
+    )
+    return amqp_con
+
+
+# Initialize app
+PATH = os.path.join(
+    os.path.sep, os.getenv("XDG_RUNTIME_DIR", "/run"), "autopkgtest_webcontrol"
+)
+os.makedirs(PATH, exist_ok=True)
+app = Flask("request")
+app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
+# keep secret persistent between CGI invocations
+secret_path = os.path.join(PATH, "secret_key")
+setup_key(app, secret_path)
+oid = OpenID(app, os.path.join(PATH, "openid"), safe_roots=[])
+
+
+@app.route("/", methods=["POST"])
+def index_root():
+    """Handle stop test requests"""
+    session.permanent = True
+    nick = maybe_escape(session.get("nickname"))
+    params = {
+        maybe_escape(k): maybe_escape(v) for k, v in request.args.items()
+    }
+    base = ["uuid"]
+    if list(params.keys()) != base:
+        # all we need is the uuid tyvm
+        return
+    if not os.path.isfile(RUNNING_FP):
+        return
+    running_data = {}
+    with open(RUNNING_FP, "r") as f:
+        running_data = json.load(f)
+    str_running_data = str(running_data)
+    if params["uuid"] not in str_running_data:
+        # test uuid doesn't correspond with any test listed as running in running.json
+        return
+    # now we submit to queue...
+    queue_message = {
+        "uuid": params["uuid"],
+        "not-running-on": [],
+    }
+    submit_to_queue(queue_message)
+    # need to add return here with message to the user
+    
+
+    # running_info = 
+    # check args
+    # check uuid is valid?
+    # check uuid is on running page? acc in running.json
+    # then submit to deleter queue
+    # right?
+    # what should the queue name be?
+    # 2 cloud workers, 2 web workers ...
+    # need to figure out ...
+    # gotta be one queue, as it could be either cloud worker
+    # maybe some tags?
+    # separate queue for cloud and lxd worker
+    # check arch - if armhf, separate queue for lxd worker.
+    # maybe don't need to check, and can have one queue
+    # add hostname to tried...
+    # message goes to cloud worker without the test, append hostname to not-running-on if
+    # the test isn't running on that cloud worker.
+    # place the message back in the queue
+    # sleep a bit? wait for other cloud worker to get the message?
+    # if hostname in "not-running-on" just place message back in the queue and sleep.
+    # {
+    #     "uuid": str,
+    #     "not-running-on": [""]
+    # }
-- 
Mailing list: https://launchpad.net/~canonical-ubuntu-qa
Post to     : canonical-ubuntu-qa@lists.launchpad.net
Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa
More help   : https://help.launchpad.net/ListHelp

Reply via email to