Diff comments:
> diff --git
> a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> new file mode 100755
> index 0000000..b1bc37e
> --- /dev/null
> +++
> b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> @@ -0,0 +1,211 @@
> +#!/usr/bin/python3
> +"""Kills running tests."""
> +
> +import configparser
> +import json
> +import logging
> +import socket
> +import subprocess
> +import time
> +
> +import amqplib.client_0_8 as amqp
> +import requests
> +
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred"
> +MSG_ONLY_KEYS = [
> + "uuid",
> + "not-running-on",
> +]
> +NUM_WORKERS = 2
this simply WONT DO!!!! hehehe
> +
> +RABBIT_CFG = configparser.ConfigParser()
> +with open(RABBIT_CREDS, "r") as f:
> + RABBIT_CFG.read_string("[rabbit]\n" + f.read().replace('"', ""))
> +
> +
> +def amqp_connect():
> + amqp_con = amqp.Connection(
> + RABBIT_CFG["rabbit"]["RABBIT_HOST"],
> + userid=RABBIT_CFG["rabbit"]["RABBIT_USER"],
> + password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"],
> + confirm_publish=True,
> + )
> + return amqp_con
> +
> +
> +def check_message(msg):
> + return list(msg.keys()) == MSG_ONLY_KEYS
> +
> +
> +def get_test_pid(uuid):
> + try:
> + # get list of running processes
> + ps_aux_run = subprocess.run(
> + ["ps", "aux"],
> + stdout=subprocess.PIPE,
> + check=True,
> + )
> + # Filter the list for only 'runner' processes
> + runner_run = subprocess.run(
> + ["grep", "runner"],
> + input=ps_aux_run.stdout,
> + stdout=subprocess.PIPE,
> + check=True,
> + )
> + # Check all runner processes for the given uuid
> + # If this one fails, the test isn't running on this worker
> + uuid_run = subprocess.run(
> + ["grep", uuid],
> + input=runner_run.stdout,
> + capture_output=True,
> + check=True,
> + )
> + except subprocess.CalledProcessError as _:
> + # We hit this exception if the test with the given uuid
> + # isn't running on this cloud worker
> + return None
> + search_for_test_output = uuid_run.stdout
> + search_me = search_for_test_output.splitlines()
> + # We have to assert the length is 1 otherwise we'll only kill
> + # the first one in the list - which may be the incorrect one
> + # if there's two processes with same uuid - something is wrong!
> + assert len(search_me) == 1
> + line = search_me[0].decode("utf-8")
> + if uuid in line:
> + line = line.split(" ")
> + line = [x for x in line if x]
> + pid = line[1]
> + return int(pid)
> +
> +
> +def place_message_in_queue(info: dict, amqp_con: amqp.Connection):
> + complete_amqp = amqp_con.channel()
> + complete_amqp.access_request(
> + "/complete", active=True, read=False, write=True
> + )
> + complete_amqp.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + complete_amqp.basic_publish(
> + amqp.Message(json.dumps(info), delivery_mode=2),
> + WRITER_EXCHANGE_NAME,
> + "",
> + )
> +
> +
> +def kill_process(pid: int, uuid: str):
> + # sends SIGUSR1 to worker
> + # This causes autopkgtest to exit with code -10
> + # which the worker then detects, exits the test and kills
> + # the openstack server, then the worker goes on to the next
> + # test in the queue
> + kill_cmd = "kill -USR1 %i" % pid
> + try:
> + _ = subprocess.run(
> + kill_cmd.split(" "),
> + check=True,
> + )
> + while get_test_pid(uuid) is not None:
> + time.sleep(3)
> + return True
> + except subprocess.CalledProcessError as _:
> + return False
> +
> +
> +def test_is_queued(uuid: str):
> + influx_cfg = configparser.ConfigParser()
> + with open("/home/ubuntu/influx.cred", "r") as f:
> + influx_cfg.read_string("[influx]\n" + f.read())
> + if influx_cfg["influx"]["INFLUXDB_CONTEXT"] == "staging":
> + autopkgtest_url = "https://autopkgtest.staging.ubuntu.com"
> + else:
> + autopkgtest_url = "https://autopkgtest.ubuntu.com"
> + queue_req = requests.get(autopkgtest_url)
> + if uuid in queue_req.content.decode("utf-8"):
> + return True
> + return False
> +
> +
> +def already_checked_this_host(hostnames):
> + if socket.getfqdn() in hostnames:
> + return True
> + return False
> +
> +
> +def process_message(msg, amqp_con):
> + body = msg.body
> + if isinstance(body, bytes):
> + body = body.decode("UTF-8", errors="replace")
> + info = json.loads(body)
> + logging.info("Received request to kill test: %s" % json.dumps(info))
> + if not check_message(info):
> + logging.error(
> + "Message %s is invalid. Ignoring.", json.dumps(info, indent=2)
> + )
> + # Remove the message from the queue
> + msg.channel.basic_ack(msg.delivery_tag)
> + return
> + if len(info["not-running-on"]) == NUM_WORKERS:
> + # If the test hasn't been found on any of the workers, we reach this
> + # Check if the test is currently queued - this could happen in the
> case
> + # of infinite looping.
> + if test_is_queued(info["uuid"]):
> + msg.channel.basic_ack(msg.delivery_tag)
> + info["not-running-on"] = []
> + place_message_in_queue(info, amqp_con)
> + else:
> + msg.channel.basic_ack(msg.delivery_tag)
> + return
> +
> + if already_checked_this_host(info["not-running-on"]):
> + # We check to see if we've already checked for the job on this cloud
> worker unit.
> + msg.channel.basic_ack(msg.delivery_tag)
> + logging.info(
> + "Test already found to not be running on this host, placing back
> into queue."
> + )
> + place_message_in_queue(info, amqp_con)
> + return
> + # get the test pid
> + pid = get_test_pid(info["uuid"])
> + if pid is None:
> + # The test isn't running on this unit
> + # append this hostname to not-running-on
> + msg.channel.basic_ack(msg.delivery_tag)
> + info["not-running-on"].append(socket.getfqdn())
> + if len(info["not-running-on"]) == NUM_WORKERS:
> + logging.info(
> + "Job %s not found on any workers, not re-queueing."
> + % json.dumps(info)
> + )
> + return
> + place_message_in_queue(info, amqp_con)
> + return
> + # Kill the process
> + if kill_process(pid, info["uuid"]):
> + msg.channel.basic_ack(msg.delivery_tag)
> + logging.info("Job %s has been killed." % json.dumps(info))
> + else:
> + logging.error(
> + "Job %s couldn't be killed! Ignoring." % json.dumps(info)
> + )
> + msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":
> + logging.basicConfig(level=logging.INFO)
> + amqp_con = amqp_connect()
> + status_ch = amqp_con.channel()
> + status_ch.access_request("/complete", active=True, read=True, write=True)
> + status_ch.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + queue_name = "tests-to-kill"
> + status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> + status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name)
> + logging.info("Listening to requests on %s", queue_name)
> + status_ch.basic_consume(
> + "", callback=lambda msg: process_message(msg, amqp_con)
> + )
> + while status_ch.callbacks:
> + status_ch.wait()
--
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654
Your team Canonical's Ubuntu QA is requested to review the proposed merge of
~andersson123/autopkgtest-cloud:stop-tests-from-webpage into
autopkgtest-cloud:master.
--
Mailing list: https://launchpad.net/~canonical-ubuntu-qa
Post to : canonical-ubuntu-qa@lists.launchpad.net
Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa
More help : https://help.launchpad.net/ListHelp