Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master.
Requested reviews: Canonical's Ubuntu QA (canonical-ubuntu-qa) For more details, see: https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/463146 -- Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-web/units/queue-merger.service b/charms/focal/autopkgtest-web/units/queue-merger.service new file mode 100644 index 0000000..5a89a65 --- /dev/null +++ b/charms/focal/autopkgtest-web/units/queue-merger.service @@ -0,0 +1,7 @@ +[Unit] +Description=Remove obsolete queue entries. + +[Service] +Type=oneshot +User=ubuntu +ExecStart=/home/ubuntu/webcontrol/queue-merger diff --git a/charms/focal/autopkgtest-web/units/queue-merger.timer b/charms/focal/autopkgtest-web/units/queue-merger.timer new file mode 100644 index 0000000..3f405cf --- /dev/null +++ b/charms/focal/autopkgtest-web/units/queue-merger.timer @@ -0,0 +1,8 @@ +[Unit] +Description=Remove obsolete queue entries. + +[Timer] +OnCalendar=*:0/5 + +[Install] +WantedBy=autopkgtest-web.target diff --git a/charms/focal/autopkgtest-web/webcontrol/queue-merger b/charms/focal/autopkgtest-web/webcontrol/queue-merger new file mode 100755 index 0000000..96395d7 --- /dev/null +++ b/charms/focal/autopkgtest-web/webcontrol/queue-merger @@ -0,0 +1,184 @@ +#!/usr/bin/python3 +# pylint: disable=c-extension-no-member + +import argparse +import configparser +import itertools +import json +import logging +import os +import urllib.parse +import uuid +from datetime import datetime + +import amqplib.client_0_8 as amqp +import apt.progress.base +import apt_pkg + + +def get_queue(release, arch, context, all_queues): + """Build a single queue of package, args tuples.""" + try: + queue = [ + (i.split()[0], json.loads(i.split(None, 1)[1])) + for i in all_queues["queues"][context][release][arch]["requests"] + ] + except KeyError as _: + return [] + return queue + + +def get_proposed_packages(): + """Get all valid triggers in proposed, returns set of package/version strings.""" + tmpdir = "merge-requests.tmp" + os.makedirs(os.path.join(tmpdir, "etc/apt"), exist_ok=True) + os.makedirs(os.path.join(tmpdir, "var/lib/apt"), exist_ok=True) + with open(os.path.join(tmpdir, "etc/apt/sources.list"), "w") as sources: + # pylint: disable=line-too-long + print( + "deb-src [signed-by=/usr/share/keyrings/ubuntu-archive-keyring.gpg] https://snapshot.ubuntu.com/ubuntu noble-proposed main universe restricted multiverse", + file=sources, + ) + + apt_pkg.config["Dir"] = tmpdir + apt_pkg.init() + sl = apt_pkg.SourceList() + sl.read_main_list() + cache = apt_pkg.Cache(None) + cache.update(apt.progress.base.AcquireProgress(), sl) + + srcrecords = apt_pkg.SourceRecords() + + proposed = set() + while srcrecords.step(): + proposed.add(srcrecords.package + "/" + srcrecords.version) + return proposed + + +def merge_queue(queue, proposed): + """Calculate the merged queue""" + out = [] + for package, runs in itertools.groupby( + sorted(queue, key=lambda i: i[0]), lambda i: i[0] + ): + triggers = set() + for package, args in runs: + for trigger in args["triggers"]: + if trigger in proposed or trigger == "migration-reference/0": + triggers.add(trigger) + # uuid in here I believe. + test_uuid = args.get("uuid", str(uuid.uuid4())) + + if "migration-reference/0" in triggers: + out.append( + ( + package, + {"triggers": ["migration-reference/0"], "uuid": test_uuid}, + ) + ) + triggers.remove("migration-reference/0") + if triggers: + out.append( + ( + package, + { + "all-proposed": "1", + "triggers": sorted(triggers), + "uuid": test_uuid, + }, + ) + ) + return out + + +def amqp_setup(): + cfg = configparser.ConfigParser() + with open("/home/ubuntu/autopkgtest-cloud.conf", "r") as f: + cfg.read_file(f) + amqp_url = cfg["amqp"]["uri"] + url_parts = urllib.parse.urlsplit(amqp_url, allow_fragments=False) + amqp_con = amqp.Connection( + url_parts.hostname, + userid=url_parts.username, + password=url_parts.password, + ) + return amqp_con + + +def clear_queue(amqp_con: amqp.Connection, queue_name: str): + with amqp_con.channel() as ch: + while True: + r = ch.basic_get(queue_name) + if r is None: + return + ch.basic_ack(r.delivery_tag) + + +def queue_a_test( + amqp_message: str, queue_name: str, amqp_con: amqp.Connection +): + with amqp_con.channel() as ch: + ch.basic_publish( + amqp.Message(amqp_message, delivery_mode=2), + routing_key=queue_name, + ) + + +def main(): + logging.basicConfig(level=logging.INFO) + logging.info("Starting queue merging...") + # just needs logging. + parser = argparse.ArgumentParser() + parser.add_argument("--dry-run", action="store_true", help="Dry run") + args = parser.parse_args() + with open("/var/lib/cache-amqp/queued.json", "r") as f: + all_queue_info = json.load(f) + # we're ignoring upstream requests for this. + contexts = ["huge", "ppa", "ubuntu"] + arches = all_queue_info.get("arches") + releases = all_queue_info.get("releases") + proposed_packages = get_proposed_packages() + amqp_con = amqp_setup() + for context in contexts: + for release in releases: + for arch in arches: + queue_name = ( + f"debci-{context}-{release}-{arch}" + if context != "ubuntu" + else f"debci-{release}-{arch}" + ) + individual_queue = get_queue( + release, arch, context, all_queue_info + ) + merged_queue = merge_queue(individual_queue, proposed_packages) + # We need to empty the specified queue here. + if args.dry_run: + logging.info( + f"Would empty {len(individual_queue)} items from {queue_name}" + ) + logging.info( + f"Would submit {len(merged_queue)} items to {queue_name}" + ) + else: + logging.info( + f"Emptying {len(individual_queue)} items from {queue_name}" + ) + clear_queue(amqp_con, queue_name) + # Add the pruned queue + logging.info( + f"Adding {len(merged_queue)} items to {queue_name}" + ) + for pkg, run_args in merged_queue: + message_params = run_args + message_params["submit-time"] = datetime.strftime( + datetime.utcnow(), "%Y-%m-%d %H:%M:%S%z" + ) + amqp_message = f"{pkg}\n{json.dumps(message_params)}" + if args.dry_run: + print(f"Would submit {amqp_message} to {queue_name}") + else: + queue_a_test(amqp_message, queue_name, amqp_con) + + +if __name__ == "__main__": + main()
-- Mailing list: https://launchpad.net/~canonical-ubuntu-qa Post to : canonical-ubuntu-qa@lists.launchpad.net Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa More help : https://help.launchpad.net/ListHelp