Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:d-a-r-make-me-faster into autopkgtest-cloud:master.
Requested reviews: Canonical's Ubuntu QA (canonical-ubuntu-qa) For more details, see: https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461146 -- Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:d-a-r-make-me-faster into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-web/units/sqlite-writer.service b/charms/focal/autopkgtest-web/units/sqlite-writer.service new file mode 100644 index 0000000..3a47c08 --- /dev/null +++ b/charms/focal/autopkgtest-web/units/sqlite-writer.service @@ -0,0 +1,13 @@ +[Unit] +Description=Write test results to db +StartLimitIntervalSec=60s +StartLimitBurst=60 + +[Service] +User=ubuntu +ExecStart=/home/ubuntu/webcontrol/sqlite-writer +Restart=on-failure +RestartSec=1s + +[Install] +WantedBy=autopkgtest-web.target diff --git a/charms/focal/autopkgtest-web/webcontrol/download-all-results b/charms/focal/autopkgtest-web/webcontrol/download-all-results index 1af7918..485f6bd 100755 --- a/charms/focal/autopkgtest-web/webcontrol/download-all-results +++ b/charms/focal/autopkgtest-web/webcontrol/download-all-results @@ -11,73 +11,58 @@ # script can be used to find any results which were missed and insert them. import configparser -import http import io +import itertools import json import logging import os -import random import sqlite3 -import sys import tarfile -import time import urllib.parse -from urllib.request import urlopen +import amqplib.client_0_8 as amqp +import swiftclient from distro_info import UbuntuDistroInfo from helpers.utils import get_test_id, init_db LOGGER = logging.getLogger(__name__) +WRITER_EXCHANGE_NAME = "sqlite-write-me.fanout" +SWIFT_CREDS_FILE = "/home/ubuntu/public-swift-creds" config = None db_con = None +amqp_con = None -def list_remote_container(container_url): - LOGGER.debug("Listing container %s", container_url) - out = [] +def amqp_connect(): + """Connect to AMQP server""" - def get_batch(start=None): - url = f"{container_url}/?format=json" - if start is not None: - url += f"&marker={urllib.parse.quote(start)}" - - LOGGER.debug('Retrieving "%s"', url) - for _ in range(5): - try: - resp = urlopen(url) - except http.client.RemoteDisconnected: - LOGGER.debug("Got disconnected, sleeping") - time.sleep(5) - continue - else: - break - json_string = resp.read() - json_data = json.loads(json_string) - - if not json_data: - return None - - out.extend([e["name"] for e in json_data]) - name = out[-1] - - return name + cp = configparser.ConfigParser() + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf")) + amqp_uri = cp["amqp"]["uri"] + parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False) + amqp_con = amqp.Connection( + parts.hostname, userid=parts.username, password=parts.password + ) + logging.info( + "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname) + ) - marker = get_batch() + return amqp_con - while True: - new_marker = get_batch(marker) - if not new_marker or new_marker == marker: - break - marker = new_marker - out = [name for name in out if name.endswith("result.tar")] - LOGGER.debug("Found %d items in %s", len(out), container_url) - ret = {} - for r in out: - (_, _, _, _, run_id, _) = r.split("/") - ret[run_id] = r - return ret +# def list_remote_container(container_url): +def list_remote_container(container_name, swift_conn): + LOGGER.debug("Listing container %s", container_name) + _, object_list = swift_conn.get_container( + container_name, full_listing=True + ) + ret_me = {} + for obj in object_list: + if "result.tar" in obj["name"]: + obj_splitname = obj["name"].split("/") + ret_me[obj_splitname[4]] = obj["name"] + return ret_me def list_our_results(release): @@ -91,29 +76,16 @@ def list_our_results(release): return {run_id for (run_id,) in c.fetchall()} -def fetch_one_result(url): +def fetch_one_result(container_name, object_name, swift_conn): """Download one result URL from swift and add it to the DB""" - (release, arch, _, src, run_id, _) = url.split("/")[-6:] + # modify this to use swiftclient too. + # use public-swift-creds + (release, arch, _, src, run_id, _) = object_name.split("/") test_id = get_test_id(db_con, release, arch, src) - - try: - f = urlopen(url, timeout=30) - if f.getcode() == 200: - tar_bytes = io.BytesIO(f.read()) - f.close() - else: - raise NotImplementedError( - "fetch_one_result(%s): cannot handle HTTP code %i" - % (url, f.getcode()) - ) - except IOError as e: - LOGGER.error("Failure to fetch %s: %s", url, str(e)) - # we tolerate "not found" (something went wrong on uploading the - # result), but other things indicate infrastructure problems - if hasattr(e, "code") and e.code == 404: # pylint: disable=no-member - return - sys.exit(1) - + # modify this to use swiftclient instead of urllib + # look at update-github-jobs for help + _, contents = swift_conn.get_object(container_name, object_name) + tar_bytes = io.BytesIO(contents) try: with tarfile.open(None, "r", tar_bytes) as tar: exitcode = int(tar.extractfile("exitcode").read().strip()) @@ -128,14 +100,12 @@ def fetch_one_result(url): srcver = "%s unknown" % (src) else: raise - (ressrc, ver) = srcver.split() + (_, ver) = srcver.split() testinfo = json.loads( tar.extractfile("testinfo.json").read().decode() ) test_uuid = testinfo.get("uuid", "") duration = int(tar.extractfile("duration").read().strip()) - # KeyError means the file is not there, i.e. there isn't a human - # requester try: requester = ( tar.extractfile("requester").read().decode().strip() @@ -143,16 +113,7 @@ def fetch_one_result(url): except KeyError: requester = "" except (KeyError, ValueError, tarfile.TarError) as e: - LOGGER.debug("%s is damaged, ignoring: %s", url, str(e)) - return - - if src != ressrc: - LOGGER.error( - "%s is a result for package %s, but expected package %s", - url, - ressrc, - src, - ) + LOGGER.debug("%s is damaged, ignoring: %s" % (object_name, str(e))) return # parse recorded triggers in test result @@ -161,7 +122,9 @@ def fetch_one_result(url): test_triggers = e.split("=", 1)[1] break else: - LOGGER.error("%s result has no ADT_TEST_TRIGGERS, ignoring", url) + LOGGER.error( + "%s result has no ADT_TEST_TRIGGERS, ignoring", object_name + ) return LOGGER.debug( @@ -182,63 +145,56 @@ def fetch_one_result(url): if env in testinfo.keys(): env_vars.append(spec) - while True: - try: - with ( - db_con - ): # this starts a transaction, making sure we release the lock at the end - c = db_con.cursor() - c.execute( - "INSERT INTO result VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", - ( - test_id, - run_id, - ver, - test_triggers, - duration, - exitcode, - requester, - ",".join(env_vars), - test_uuid, - ), - ) - db_con.commit() - break - except sqlite3.OperationalError as e: - if "database is locked" in str(e): - sleep_time = random.uniform(0.1, 2) - LOGGER.info( - "database is currently locked, waiting %f seconds and trying again..." - % sleep_time - ) - time.sleep(sleep_time) - else: - logging.info("insert operation failed with: %s" % str(e)) - break - except sqlite3.IntegrityError: - LOGGER.info("%s was already recorded - skipping", run_id) - break + # Insert the write request into the queue + complete_amqp = amqp_con.channel() + complete_amqp.access_request( + "/complete", active=True, read=False, write=True + ) + complete_amqp.exchange_declare( + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False + ) + write_me_msg = { + "test_id": test_id, + "run_id": run_id, + "version": ver, + "triggers": test_triggers, + "duration": duration, + "exitcode": exitcode, + "requester": requester, + "env": ",".join(env_vars), + "uuid": test_uuid, + } + complete_amqp.basic_publish( + amqp.Message(json.dumps(write_me_msg), delivery_mode=2), + WRITER_EXCHANGE_NAME, + "", + ) -def fetch_container(release, container_url): +def fetch_container(release, swift_conn): """Download new results from a swift container""" + container_name = "autopkgtest-" + release try: our_results = list_our_results(release) - known_results = list_remote_container(container_url) + known_results = list_remote_container(container_name, swift_conn) + # the keys WERE the run_id, so need to fix dis need_to_fetch = set(known_results.keys()) - our_results LOGGER.debug("Need to download %d items", len(need_to_fetch)) for run_id in need_to_fetch: fetch_one_result( - os.path.join(container_url, known_results[run_id]) + container_name=container_name, + object_name=known_results[run_id], + swift_conn=swift_conn, ) - except urllib.error.HTTPError as e: - if e.code == 401 or e.code == 404: - LOGGER.warning(f"Couldn't access {container_url} - doesn't exist?") - return + except swiftclient.ClientException as e: + LOGGER.warning( + "Something went wrong accessing container %s\nTraceback: %s" + % (container_name, str(e)) + ) raise @@ -260,6 +216,31 @@ if __name__ == "__main__": config = configparser.ConfigParser() config.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf")) + amqp_con = amqp_connect() + + swift_cfg = configparser.ConfigParser() + + with open(SWIFT_CREDS_FILE) as fp: + swift_cfg.read_file( + itertools.chain(["[swift]"], fp), source=SWIFT_CREDS_FILE + ) + + # change this to use configparser + swift_creds = { + "authurl": swift_cfg["swift"]["OS_AUTH_URL"], + "user": swift_cfg["swift"]["OS_USERNAME"], + "key": swift_cfg["swift"]["OS_PASSWORD"], + "os_options": { + "region_name": swift_cfg["swift"]["OS_REGION_NAME"], + "project_domain_name": swift_cfg["swift"][ + "OS_PROJECT_DOMAIN_NAME" + ], + "project_name": swift_cfg["swift"]["OS_PROJECT_NAME"], + "user_domain_name": swift_cfg["swift"]["OS_USER_DOMAIN_NAME"], + }, + "auth_version": 3, + } + swift_conn = swiftclient.Connection(**swift_creds) try: for release in releases: @@ -275,9 +256,7 @@ if __name__ == "__main__": c.execute("ALTER TABLE result ADD COLUMN env TEXT") fetch_container( release, - os.path.join( - config["web"]["SwiftURL"], f"autopkgtest-{release}" - ), + swift_conn=swift_conn, ) finally: if db_con: diff --git a/charms/focal/autopkgtest-web/webcontrol/download-results b/charms/focal/autopkgtest-web/webcontrol/download-results index e71d4a0..4b9b11e 100755 --- a/charms/focal/autopkgtest-web/webcontrol/download-results +++ b/charms/focal/autopkgtest-web/webcontrol/download-results @@ -4,16 +4,15 @@ import configparser import json import logging import os -import random import socket import sqlite3 -import time import urllib.parse import amqplib.client_0_8 as amqp from helpers.utils import get_test_id, init_db EXCHANGE_NAME = "testcomplete.fanout" +WRITER_EXCHANGE_NAME = "sqlite-write-me.fanout" def amqp_connect(): @@ -83,43 +82,30 @@ def process_message(msg, db_con): return test_id = get_test_id(db_con, release, arch, package) - - while True: - try: - with ( - db_con - ): # this starts a transaction, making sure we release the lock at the end - c = db_con.cursor() - c.execute( - "INSERT INTO result VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", - ( - test_id, - run_id, - version, - triggers, - duration, - exitcode, - requester, - info.get("env", ""), - test_uuid, - ), - ) - db_con.commit() - break - except sqlite3.OperationalError as e: - if "database is locked" in str(e): - sleep_time = random.uniform(0.1, 2) - logging.info( - "database is currently locked, waiting %f seconds and trying again..." - % sleep_time - ) - time.sleep(sleep_time) - else: - logging.info("insert operation failed with: %s" % str(e)) - break - except sqlite3.IntegrityError: - logging.info("...which was already recorded - skipping") - break + # add to queue instead of writing to db + complete_amqp = amqp_con.channel() + complete_amqp.access_request( + "/complete", active=True, read=False, write=True + ) + complete_amqp.exchange_declare( + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False + ) + write_me_msg = { + "test_id": test_id, + "run_id": run_id, + "version": version, + "triggers": triggers, + "duration": duration, + "exitcode": exitcode, + "requester": requester, + "env": info.get("env", ""), + "uuid": test_uuid, + } + complete_amqp.basic_publish( + amqp.Message(json.dumps(write_me_msg), delivery_mode=2), + WRITER_EXCHANGE_NAME, + "", + ) msg.channel.basic_ack(msg.delivery_tag) diff --git a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer new file mode 100755 index 0000000..4ba8bdf --- /dev/null +++ b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer @@ -0,0 +1,138 @@ +#!/usr/bin/python3 + +import configparser +import json +import logging +import os +import socket +import sqlite3 +import urllib.parse + +import amqplib.client_0_8 as amqp +from helpers.utils import init_db + +EXCHANGE_NAME = "sqlite-write-me.fanout" + +config = None +db_con = None + +INSERT_INTO_KEYS = [ + "test_id", + "run_id", + "version", + "triggers", + "duration", + "exitcode", + "requester", + "env", + "uuid", +] + + +def amqp_connect(): + """Connect to AMQP server""" + + cp = configparser.ConfigParser() + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf")) + amqp_uri = cp["amqp"]["uri"] + parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False) + amqp_con = amqp.Connection( + parts.hostname, userid=parts.username, password=parts.password + ) + logging.info( + "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname) + ) + + return amqp_con + + +def db_connect(): + """Connect to SQLite DB""" + cp = configparser.ConfigParser() + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf")) + + db_con = init_db(cp["web"]["database"]) + + return db_con + + +def check_msg(queue_msg): + required_keys = set( + [ + "test_id", + "run_id", + "version", + "triggers", + "duration", + "exitcode", + "requester", + "env", + "uuid", + ] + ) + queue_keys = set(queue_msg.keys()) + if required_keys == queue_keys: + return True + return False + + +def process_message(msg, db_con): + # We want to ack and re-send messages if insert fails? + body = msg.body + if isinstance(body, bytes): + body = body.decode("UTF-8", errors="replace") + info = json.loads(body) + logging.info("Message is: \n%s" % json.dumps(info, indent=2)) + if not check_msg(info): + logging.error( + "Message has incorrect keys! Ignoring\n%s" + % json.dumps(info, indent=2) + ) + msg.channel.basic_ack(msg.delivery_tag) + return + # insert into db + sqlite3.paramstyle = "named" + with db_con: + c = db_con.cursor() + # change this to column names + c.execute( + ( + "INSERT INTO result(test_id, run_id, version, triggers, duration, " + "exitcode, requester, env, uuid) VALUES (:test_id, :run_id, " + ":version, :triggers, :duration, :exitcode, :requester, :env, :uuid)" + ), + { + "test_id": info["test_id"], + "run_id": info["run_id"], + "version": info["version"], + "triggers": info["triggers"], + "duration": info["duration"], + "exitcode": info["exitcode"], + "requester": info["requester"], + "env": info["env"], + "uuid": info["uuid"], + }, + ) + logging.info("Inserted the following entry into the db:\n%s" % body) + + msg.channel.basic_ack(msg.delivery_tag) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + db_con = db_connect() + amqp_con = amqp_connect() + status_ch = amqp_con.channel() + status_ch.access_request("/complete", active=True, read=True, write=False) + status_ch.exchange_declare( + EXCHANGE_NAME, "fanout", durable=True, auto_delete=False + ) + queue_name = "sqlite-writer-listener-%s" % socket.getfqdn() + status_ch.queue_declare(queue_name, durable=True, auto_delete=False) + status_ch.queue_bind(queue_name, EXCHANGE_NAME, queue_name) + logging.info("Listening to requests on %s" % queue_name) + status_ch.basic_consume( + "", callback=lambda msg: process_message(msg, db_con) + ) + while status_ch.callbacks: + status_ch.wait()
-- Mailing list: https://launchpad.net/~canonical-ubuntu-qa Post to : canonical-ubuntu-qa@lists.launchpad.net Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa More help : https://help.launchpad.net/ListHelp