I should have a read up on AMQP so I can review the queue stuff; in the 
meantime, I've left one comment on the handling of failed INSERTs

Diff comments:

> diff --git a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer 
> b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
> new file mode 100755
> index 0000000..3939bde
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
> @@ -0,0 +1,136 @@
> +#!/usr/bin/python3
> +
> +import configparser
> +import json
> +import logging
> +import os
> +import socket
> +import sqlite3
> +import urllib.parse
> +
> +import amqplib.client_0_8 as amqp
> +from helpers.utils import init_db
> +
> +EXCHANGE_NAME = "sqlite-write-me.fanout"
> +
> +config = None
> +db_con = None
> +
> +INSERT_INTO_KEYS = [
> +    "test_id",
> +    "run_id",
> +    "version",
> +    "triggers",
> +    "duration",
> +    "exitcode",
> +    "requester",
> +    "env",
> +    "uuid",
> +]
> +
> +
> +def amqp_connect():
> +    """Connect to AMQP server"""
> +
> +    cp = configparser.ConfigParser()
> +    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +    amqp_uri = cp["amqp"]["uri"]
> +    parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
> +    amqp_con = amqp.Connection(
> +        parts.hostname, userid=parts.username, password=parts.password
> +    )
> +    logging.info(
> +        "Connected to AMQP server at %s@%s" % (parts.username, 
> parts.hostname)
> +    )
> +
> +    return amqp_con
> +
> +
> +def db_connect():
> +    """Connect to SQLite DB"""
> +    cp = configparser.ConfigParser()
> +    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +
> +    db_con = init_db(cp["web"]["database"])
> +
> +    return db_con
> +
> +
> +def check_msg(queue_msg):
> +    required_keys = [
> +        "test_id",
> +        "run_id",
> +        "version",
> +        "triggers",
> +        "duration",
> +        "exitcode",
> +        "requester",
> +        "env",
> +        "uuid",
> +    ]
> +    queue_msg_keys = list(queue_msg.keys())
> +    required_keys.sort()
> +    queue_msg_keys.sort()
> +    if queue_msg_keys == required_keys:
> +        return True
> +    return False
> +
> +
> +def process_message(msg, db_con):
> +    # aight, time to test this with download-results and 
> download-all-results now.
> +    body = msg.body
> +    if isinstance(body, bytes):
> +        body = body.decide("UTF-8", errors="replace")
> +    info = json.loads(body)
> +    logging.info("Message is: \n%s" % json.dumps(info, indent=2))
> +    print(check_msg(info))
> +    if not check_msg(info):
> +        logging.info(
> +            "Message has incorrect keys!\n%s" % json.dumps(info, indent=2)
> +        )
> +    # insert into db
> +    try:
> +        with db_con:
> +            c = db_con.cursor()
> +            c.execute(
> +                "INSERT INTO result VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
> +                (
> +                    info["test_id"],
> +                    info["run_id"],
> +                    info["version"],
> +                    info["triggers"],
> +                    info["duration"],
> +                    info["exitcode"],
> +                    info["requester"],
> +                    info["env"],
> +                    info["uuid"],
> +                ),
> +            )
> +            db_con.commit()
> +        logging.info("Inserted the following entry into the db:\n%s" % body)
> +    except sqlite3.OperationalError as e:
> +        logging.info("Insert operation failed with: %s" % str(e))
> +    except sqlite3.IntegrityError as e:
> +        logging.info("Insert operation failed with: %s" % str(e))

I'm not convinced the problematic ones will stay in the queue. Having failed 
the INSERT, and logged the error, we just continue onto basic_ack() the 
message, which should remove the message from queue storage and reply to the 
client that we've successfully dealt with their message. In fact, crashing out 
here would preserve the message because we *wouldn't* ack it.

You could choose to only ack those messages that we successfully INSERT, which 
should preserve the ones that fail (assuming that's not a violation of the AMQP 
protocol -- it's not one I'm overly familiar with). But consider the likely 
INSERT failure scenarios. If it starts failing, it likely won't be a case of 
"some succeed and some fail"; it'll be something like the database has become 
inaccessible (due to storage failure) or the database has changed to an 
incompatible structure.

In either case we can pretty much expect all future INSERTs to fail, so whether 
we crash out with the relevant error, or just start log spamming tons of 
failures to the log, the effect on the incoming queue would be the same: it 
builds and builds until either someone fixes stuff or it runs out of storage.

> +
> +    msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":
> +    logging.basicConfig(level=logging.INFO)
> +    db_con = db_connect()
> +    amqp_con = amqp_connect()
> +    status_ch = amqp_con.channel()
> +    status_ch.access_request("/complete", active=True, read=True, 
> write=False)
> +    status_ch.exchange_declare(
> +        EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> +    )
> +    queue_name = "sqlite-writer-listener-%s" % socket.getfqdn()
> +    status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> +    status_ch.queue_bind(queue_name, EXCHANGE_NAME, queue_name)
> +    logging.info("Listening to requests on %s" % queue_name)
> +    status_ch.basic_consume(
> +        "", callback=lambda msg: process_message(msg, db_con)
> +    )
> +    while status_ch.callbacks:
> +        status_ch.wait()


-- 
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/460847
Your team Canonical's Ubuntu QA is requested to review the proposed merge of 
~andersson123/autopkgtest-cloud:d-r-d-a-r-merging into autopkgtest-cloud:master.


-- 
Mailing list: https://launchpad.net/~canonical-ubuntu-qa
Post to     : canonical-ubuntu-qa@lists.launchpad.net
Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa
More help   : https://help.launchpad.net/ListHelp

Reply via email to