Skia has proposed merging ~hyask/autopkgtest-cloud:skia/switch_worker_to_amqp into autopkgtest-cloud:master.
Requested reviews: Canonical's Ubuntu QA (canonical-ubuntu-qa) For more details, see: https://code.launchpad.net/~hyask/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/473908 Switch the worker part to use `python3-amqp` instead of `python3-amqplib`. Take the occasion to modernize some tools. -- Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~hyask/autopkgtest-cloud:skia/switch_worker_to_amqp into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp index a6f7f09..d70dd69 100755 --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp @@ -1,35 +1,54 @@ #!/usr/bin/python3 # Filter out AMQP requests that match a given regex +import argparse import configparser import logging -import optparse # pylint: disable=deprecated-module +import os import re import sys import time -import urllib.parse -import amqplib.client_0_8 as amqp +import amqp import distro_info -def filter_amqp(options, host, queue_name, regex): - url_parts = urllib.parse.urlsplit(host, allow_fragments=False) - filter_re = re.compile(regex.encode("UTF-8"), re.DOTALL) - amqp_con = amqp.Connection( - url_parts.hostname, - userid=url_parts.username, - password=url_parts.password, - ) - ch = amqp_con.channel() +def get_amqp_channel(): + try: + cp = configparser.ConfigParser() + with open("/home/ubuntu/rabbitmq.cred", "r") as f: + cp.read_string("[rabbit]\n" + f.read().replace('"', "")) + amqp_con = amqp.Connection( + cp["rabbit"]["RABBIT_HOST"], + cp["rabbit"]["RABBIT_USER"], + cp["rabbit"]["RABBIT_PASSWORD"], + ) + except FileNotFoundError: + amqp_con = amqp.Connection( + os.environ["RABBIT_HOST"], + userid=os.environ["RABBIT_USER"], + password=os.environ["RABBIT_PASSWORD"], + ) + amqp_con.connect() + return amqp_con.channel() + + +def filter_amqp(options, queue_name, regex): num_items_deleted = 0 + filter_re = re.compile(regex.encode("UTF-8"), re.DOTALL) + + channel = get_amqp_channel() while True: - r = ch.basic_get(queue_name) + try: + r = channel.basic_get(queue_name) + except amqp.NotFound: + logging.warning(f"Queue {queue_name} not found") + return None if r is None: - logging.debug("r is none, exiting") - ch.close() - amqp_con.close() + logging.info( + "Message empty, we probably reached the end of the queue" + ) break if isinstance(r.body, str): body = r.body.encode("UTF-8") @@ -45,7 +64,7 @@ def filter_amqp(options, host, queue_name, regex): logging.info("queue item: %s (would delete)", body) else: logging.info("queue item: %s (deleting)", body) - ch.basic_ack(r.delivery_tag) + channel.basic_ack(r.delivery_tag) num_items_deleted += 1 return num_items_deleted @@ -69,25 +88,31 @@ def generate_queue_names(): def main(): - parser = optparse.OptionParser( - usage="usage: %prog [options] queue_name regex\n" - "Pass `all` for queue_name to filter all queues" + parser = argparse.ArgumentParser( + description="""Filter queue based on a regex + +This script can be used whenever a new upload happens, obsoleting a previous +one, and that previous upload still had a lot of tests scheduled. To avoid +processing useless jobs, the queue can be filtered on the trigger of the +obsolete upload. +""", + formatter_class=argparse.RawTextHelpFormatter, ) - parser.add_option( + parser.add_argument( "-n", "--dry-run", default=False, action="store_true", help="only show the operations that would be performed", ) - parser.add_option( + parser.add_argument( "-v", "--verbose", default=False, action="store_true", help="additionally show queue items that are not removed", ) - parser.add_option( + parser.add_argument( "-a", "--all-items-in-queue", default=False, @@ -97,25 +122,21 @@ def main(): "When using this option, the provided regex will be ignored." ), ) - cp = configparser.ConfigParser() - with open("/home/ubuntu/rabbitmq.cred", "r") as f: - cp.read_string("[rabbit]\n" + f.read().replace('"', "")) - creds = "amqp://%s:%s@%s" % ( - cp["rabbit"]["RABBIT_USER"], - cp["rabbit"]["RABBIT_PASSWORD"], - cp["rabbit"]["RABBIT_HOST"], + parser.add_argument( + "queue_name", + help="The name of the queue to filter. `all` is a valid value.", + ) + parser.add_argument( + "regex", help="The regex with which to filter the queue" ) - opts, args = parser.parse_args() + opts = parser.parse_args() logging.basicConfig( level=logging.DEBUG if opts.verbose else logging.INFO, format="%(asctime)s - %(message)s", ) - if len(args) != 2: - parser.error("Need to specify queue name and regex") - if opts.all_items_in_queue: print("""Do you really want to flush this queue? [yN]""", end="") sys.stdout.flush() @@ -123,16 +144,25 @@ def main(): if not response.strip().lower().startswith("y"): print("""Exiting""") sys.exit(1) - queues = [args[0]] if args[0] != "all" else generate_queue_names() + queues = ( + [opts.queue_name] + if opts.queue_name != "all" + else generate_queue_names() + ) - deletion_count_history = [] for this_queue in queues: + deletion_count_history = [] while True: - num_deleted = filter_amqp(opts, creds, this_queue, args[1]) + num_deleted = filter_amqp(opts, this_queue, opts.regex) + if num_deleted is None: + logging.info("Skipping") + break deletion_count_history.append(num_deleted) if opts.dry_run: break - if all([x == 0 for x in deletion_count_history[-5:]]): + if len(deletion_count_history) >= 5 and all( + [x == 0 for x in deletion_count_history[-5:]] + ): logging.info( "Finished filtering queue objects, run history:\n%s" % "\n".join(str(x) for x in deletion_count_history) diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream index c255239..e7441cc 100755 --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream @@ -1,14 +1,14 @@ #!/usr/bin/python3 # Filter out all but the latest request for a given upstream PR +import argparse +import configparser import json import logging -import optparse # pylint: disable=deprecated-module import os -import urllib.parse from collections import defaultdict -import amqplib.client_0_8 as amqp +import amqp import dateutil.parser import distro_info @@ -19,13 +19,23 @@ SUPPORTED_UBUNTU_RELEASES = sorted( ) -def filter_amqp(options, host): - url_parts = urllib.parse.urlsplit(host, allow_fragments=False) - amqp_con = amqp.Connection( - url_parts.hostname, - userid=url_parts.username, - password=url_parts.password, - ) +def filter_amqp(options): + try: + cp = configparser.ConfigParser() + with open("/home/ubuntu/rabbitmq.cred", "r") as f: + cp.read_string("[rabbit]\n" + f.read().replace('"', "")) + amqp_con = amqp.Connection( + cp["rabbit"]["RABBIT_HOST"], + cp["rabbit"]["RABBIT_USER"], + cp["rabbit"]["RABBIT_PASSWORD"], + ) + except FileNotFoundError: + amqp_con = amqp.Connection( + os.environ["RABBIT_HOST"], + userid=os.environ["RABBIT_USER"], + password=os.environ["RABBIT_PASSWORD"], + ) + amqp_con.connect() dry_run = "[dry-run] " if options.dry_run else "" queues = ( @@ -40,10 +50,7 @@ def filter_amqp(options, host): while True: try: r = ch.basic_get(queue_name) - except amqp.AMQPChannelException as e: - (code, _, _, _) = e.args - if code != 404: - raise + except amqp.NotFound: logging.debug(f"No such queue {queue_name}") break if r is None: @@ -52,7 +59,7 @@ def filter_amqp(options, host): body = r.body.decode("UTF-8") else: body = r.body - (pkg, params) = body.split(" ", 1) + (pkg, params) = body.split("\n", 1) params_j = json.loads(params) submit_time = dateutil.parser.parse(params_j["submit-time"]) pr = [ @@ -80,37 +87,35 @@ def filter_amqp(options, host): def main(): - parser = optparse.OptionParser( - usage="usage: %prog [options] amqp://user:pass@host queue_name regex" + parser = argparse.ArgumentParser( + description="""Deduplicates jobs in the upstream queue. + +The upstream integration is different than regular jobs pushed by Britney. +If a developer pushes two times in a row on a pull request, then two test +requests get queued. This script is here to deduplicate those requests. +""", + formatter_class=argparse.RawTextHelpFormatter, ) - parser.add_option( - "-n", + parser.add_argument( "--dry-run", - default=False, action="store_true", help="only show the operations that would be performed", ) - parser.add_option( + parser.add_argument( "-v", "--verbose", - default=False, action="store_true", help="additionally show queue items that are not removed", ) - # pylint: disable=unused-variable - opts, args = parser.parse_args() + opts = parser.parse_args() logging.basicConfig( level=logging.DEBUG if opts.verbose else logging.INFO, format="%(asctime)s - %(message)s", ) - user = os.environ["RABBIT_USER"] - password = os.environ["RABBIT_PASSWORD"] - host = os.environ["RABBIT_HOST"] - uri = f"amqp://{user}:{password}@{host}" - filter_amqp(opts, uri) + filter_amqp(opts) if __name__ == "__main__": diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp index cdda67a..fbd2092 100755 --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp @@ -2,10 +2,11 @@ import argparse import configparser +import os import re import sys -import amqplib.client_0_8 as amqp +import amqp def parse_args(): @@ -45,14 +46,22 @@ You can alter the queue messages however you please, but be careful :) def main(): args = parse_args() - cp = configparser.ConfigParser() - with open("/home/ubuntu/rabbitmq.cred", "r") as f: - cp.read_string("[rabbit]\n" + f.read().replace('"', "")) - amqp_con = amqp.Connection( - cp["rabbit"]["RABBIT_HOST"], - cp["rabbit"]["RABBIT_USER"], - cp["rabbit"]["RABBIT_PASSWORD"], - ) + try: + cp = configparser.ConfigParser() + with open("/home/ubuntu/rabbitmq.cred", "r") as f: + cp.read_string("[rabbit]\n" + f.read().replace('"', "")) + amqp_con = amqp.Connection( + cp["rabbit"]["RABBIT_HOST"], + cp["rabbit"]["RABBIT_USER"], + cp["rabbit"]["RABBIT_PASSWORD"], + ) + except FileNotFoundError: + amqp_con = amqp.Connection( + os.environ["RABBIT_HOST"], + userid=os.environ["RABBIT_USER"], + password=os.environ["RABBIT_PASSWORD"], + ) + amqp_con.connect() if args.regex is not None: filter_re = re.compile(args.regex.encode("UTF-8"), re.DOTALL) with amqp_con.channel() as ch: diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp index 80f70e2..10d94a1 100755 --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp @@ -3,9 +3,10 @@ import argparse import ast import configparser +import os import sys -import amqplib.client_0_8 as amqp +import amqp def parse_args(): @@ -68,14 +69,22 @@ def main(): file=sys.stderr, ) - cp = configparser.ConfigParser() - with open("/home/ubuntu/rabbitmq.cred", "r") as f: - cp.read_string("[rabbit]\n" + f.read().replace('"', "")) - amqp_con = amqp.Connection( - cp["rabbit"]["RABBIT_HOST"], - cp["rabbit"]["RABBIT_USER"], - cp["rabbit"]["RABBIT_PASSWORD"], - ) + try: + cp = configparser.ConfigParser() + with open("/home/ubuntu/rabbitmq.cred", "r") as f: + cp.read_string("[rabbit]\n" + f.read().replace('"', "")) + amqp_con = amqp.Connection( + cp["rabbit"]["RABBIT_HOST"], + cp["rabbit"]["RABBIT_USER"], + cp["rabbit"]["RABBIT_PASSWORD"], + ) + except FileNotFoundError: + amqp_con = amqp.Connection( + os.environ["RABBIT_HOST"], + userid=os.environ["RABBIT_USER"], + password=os.environ["RABBIT_PASSWORD"], + ) + amqp_con.connect() ch = amqp_con.channel() queue_name = args.queue_name if args.message: @@ -103,10 +112,7 @@ def main(): continue try: push(message, queue_name, ch) - except ( - amqp.AMQPChannelException, - amqp.AMQPConnectionException, - ) as _: + except amqp.AMQPError: print( f"Pushing message `{message}` to queue {queue_name} failed.", file=sys.stderr, diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest index 9383aa2..06ab51b 100755 --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest @@ -9,9 +9,9 @@ import os import sys import urllib.parse import uuid -from datetime import datetime +from datetime import datetime, timezone -import amqplib.client_0_8 as amqp +import amqp my_dir = os.path.dirname(os.path.realpath(sys.argv[0])) @@ -175,7 +175,7 @@ if __name__ == "__main__": except KeyError: pass params["submit-time"] = datetime.strftime( - datetime.utcnow(), "%Y-%m-%d %H:%M:%S%z" + datetime.now().astimezone(timezone.utc), "%Y-%m-%d %H:%M:%S%z" ) params["uuid"] = str(uuid.uuid4()) params = "\n" + json.dumps(params) diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock index 442e914..aaca3d2 100755 --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock @@ -15,7 +15,7 @@ import os import subprocess import sys -import amqplib.client_0_8 as amqp +import amqp @contextlib.contextmanager @@ -33,15 +33,14 @@ def amqp_lock(name): userid=os.environ["RABBIT_USER"], password=os.environ["RABBIT_PASSWORD"], ) + amqp_con.connect() channel = amqp_con.channel() - channel.queue_declare( - name, arguments={"args.queue.x-single-active-consumer": True} - ) + channel.queue_declare(name, arguments={"x-single-active-consumer": True}) channel.basic_publish(amqp.Message(""), routing_key=name) consumer_tag = channel.basic_consume(queue=name, callback=callback) while channel.callbacks and not callback.called: - channel.wait() + amqp_con.drain_events() try: yield diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker index 09ade16..893f860 100755 --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker @@ -2,7 +2,7 @@ # autopkgtest cloud worker # Author: Martin Pitt <martin.p...@ubuntu.com> # -# Requirements: python3-amqplib python3-swiftclient python3-influxdb +# Requirements: python3-amqp python3-swiftclient python3-influxdb # Requirements for running autopkgtest from git: python3-debian libdpkg-perl # # pylint: disable=too-many-lines,line-too-long @@ -27,7 +27,7 @@ import urllib.request import uuid from urllib.error import HTTPError -import amqplib.client_0_8 as amqp +import amqp import distro_info import novaclient.client import novaclient.exceptions @@ -562,7 +562,6 @@ def call_autopkgtest( # set up status AMQP exchange global amqp_con status_amqp = amqp_con.channel() - status_amqp.access_request("/data", active=True, read=False, write=True) status_amqp.exchange_declare( status_exchange_name, "fanout", durable=False, auto_delete=True ) @@ -1576,9 +1575,6 @@ def request(msg): global amqp_con complete_amqp = amqp_con.channel() - complete_amqp.access_request( - "/complete", active=True, read=False, write=True - ) complete_amqp.exchange_declare( complete_exchange_name, "fanout", durable=True, auto_delete=False ) @@ -1629,6 +1625,7 @@ def amqp_connect(cfg, callback): password=os.environ["RABBIT_PASSWORD"], confirm_publish=True, ) + amqp_con.connect() queue = amqp_con.channel() # avoids greedy grabbing of the entire queue while being too busy queue.basic_qos(0, 1, True) @@ -1665,7 +1662,7 @@ def amqp_connect(cfg, callback): queue.queue_declare(queue_name, durable=True, auto_delete=False) queue.basic_consume(queue=queue_name, callback=request) - return queue + return amqp_con def main(): @@ -1740,13 +1737,13 @@ def main(): swiftclient.Connection(**swift_creds).close() # connect to AMQP queues - queue = amqp_connect(cfg, request) + amqp_con = amqp_connect(cfg, request) # process queues forever try: while exit_requested is None: logging.info("Waiting for and processing AMQP requests") - queue.wait() + amqp_con.drain_events() except IOError: if exit_requested is None: raise diff --git a/charms/focal/autopkgtest-cloud-worker/layer.yaml b/charms/focal/autopkgtest-cloud-worker/layer.yaml index d8f8c0a..fd41b99 100644 --- a/charms/focal/autopkgtest-cloud-worker/layer.yaml +++ b/charms/focal/autopkgtest-cloud-worker/layer.yaml @@ -18,7 +18,7 @@ options: - libdpkg-perl - lxd-client - make - - python3-amqplib + - python3-amqp - python3-debian - python3-distro-info - python3-glanceclient
-- Mailing list: https://launchpad.net/~canonical-ubuntu-qa Post to : canonical-ubuntu-qa@lists.launchpad.net Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa More help : https://help.launchpad.net/ListHelp