On Thu, Sep 21, 2023 at 4:08 AM Tobias Hagelborn <tobias.hagelb...@axis.com> wrote: > > From: Tobias Hagelborn <tobia...@axis.com> > > Retry insert operations in case the database is locked by > an external process. For instance an external cleanup or data > retention transaction. Use async sleep to not block the event loop.
sqlite already handles this internally with a timeout specified in sqlite3.connect(). The default is 5 seconds; I think it would be better to add a command line option to the server that allows a longer timeout to be specified instead of manually retrying. Allowing multiple queries to run in parallel (a side effect of async) might mess up the cursor.lastrowid tracking, so I'm a little leary of doing that. The long blocks should only actually happen when you are doing long maintenance operations, so an option for a longer timeout on the server is probably better (and, maybe rework your cleanup to not lock the database for so long) > > Signed-off-by: Tobias Hagelborn <tobias.hagelb...@axis.com> > --- > lib/hashserv/server.py | 41 +++++++++++++++++++++++++++-------------- > 1 file changed, 27 insertions(+), 14 deletions(-) > > diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py > index d40a2ab8..c898be3f 100644 > --- a/lib/hashserv/server.py > +++ b/lib/hashserv/server.py > @@ -9,6 +9,7 @@ import enum > import asyncio > import logging > import math > +import sqlite3 > import time > from . import create_async_client, UNIHASH_TABLE_COLUMNS, > OUTHASH_TABLE_COLUMNS > import bb.asyncrpc > @@ -114,7 +115,7 @@ class Resolve(enum.Enum): > REPLACE = enum.auto() > > > -def insert_table(cursor, table, data, on_conflict): > +async def insert_table(cursor, table, data, on_conflict): > resolve = { > Resolve.FAIL: "", > Resolve.IGNORE: " OR IGNORE", > @@ -129,7 +130,19 @@ def insert_table(cursor, table, data, on_conflict): > values=", ".join(":" + k for k in keys), > ) > prevrowid = cursor.lastrowid > - cursor.execute(query, data) > + > + RETRIES = 5 > + for x in range(RETRIES): > + try: > + cursor.execute(query, data) > + except sqlite3.OperationalError as e: > + if "database is locked" in str(e): > + await asyncio.sleep(1) > + finally: > + break > + else: > + cursor.execute(query, data) > + > logging.debug( > "Inserting %r into %s, %s", > data, > @@ -138,17 +151,17 @@ def insert_table(cursor, table, data, on_conflict): > ) > return (cursor.lastrowid, cursor.lastrowid != prevrowid) > > -def insert_unihash(cursor, data, on_conflict): > - return insert_table(cursor, "unihashes_v2", data, on_conflict) > +async def insert_unihash(cursor, data, on_conflict): > + return await insert_table(cursor, "unihashes_v2", data, on_conflict) > > -def insert_outhash(cursor, data, on_conflict): > - return insert_table(cursor, "outhashes_v2", data, on_conflict) > +async def insert_outhash(cursor, data, on_conflict): > + return await insert_table(cursor, "outhashes_v2", data, on_conflict) > > async def copy_unihash_from_upstream(client, db, method, taskhash): > d = await client.get_taskhash(method, taskhash) > if d is not None: > with closing(db.cursor()) as cursor: > - insert_unihash( > + await insert_unihash( > cursor, > {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, > Resolve.IGNORE, > @@ -260,7 +273,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): > elif self.upstream_client is not None: > d = await self.upstream_client.get_taskhash(method, taskhash) > d = {k: v for k, v in d.items() if k in > UNIHASH_TABLE_COLUMNS} > - insert_unihash(cursor, d, Resolve.IGNORE) > + await insert_unihash(cursor, d, Resolve.IGNORE) > self.db.commit() > > return d > @@ -301,16 +314,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): > > return d > > - def update_unified(self, cursor, data): > + async def update_unified(self, cursor, data): > if data is None: > return > > - insert_unihash( > + await insert_unihash( > cursor, > {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, > Resolve.IGNORE > ) > - insert_outhash( > + await insert_outhash( > cursor, > {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, > Resolve.IGNORE > @@ -386,7 +399,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): > outhash_data[k] = data[k] > > # Insert the new entry, unless it already exists > - (rowid, inserted) = insert_outhash(cursor, outhash_data, > Resolve.IGNORE) > + (rowid, inserted) = await insert_outhash(cursor, outhash_data, > Resolve.IGNORE) > > if inserted: > # If this row is new, check if it is equivalent to another > @@ -427,7 +440,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): > unihash = upstream_data['unihash'] > > > - insert_unihash( > + await insert_unihash( > cursor, > { > 'method': data['method'], > @@ -460,7 +473,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): > 'taskhash': data['taskhash'], > 'unihash': data['unihash'], > } > - insert_unihash(cursor, insert_data, Resolve.IGNORE) > + await insert_unihash(cursor, insert_data, Resolve.IGNORE) > self.db.commit() > > # Fetch the unihash that will be reported for the taskhash. If > the > -- > 2.30.2 > > > >
-=-=-=-=-=-=-=-=-=-=-=- Links: You receive all messages sent to this group. View/Reply Online (#188019): https://lists.openembedded.org/g/openembedded-core/message/188019 Mute This Topic: https://lists.openembedded.org/mt/101496954/21656 Group Owner: openembedded-core+ow...@lists.openembedded.org Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [arch...@mail-archive.com] -=-=-=-=-=-=-=-=-=-=-=-