Here's a common scenario. I'm looking for the best implementation using the
scheduler.
I want to support a set of background tasks (task1, task2...), where each
task:
• processes a queue of items
• waits a few seconds
It's safe to have task1 and task2 running in parallel, but I cannot have
two task1s running in parallel. They will duplicately process the same
queue of items.
I found the scheduler supports this nicely with parameters like:
db.scheduler_task.insert(function_name='task1',
task_name='task1',
stop_time = now + timedelta(days=90000),
repeats=0,
period=10)
I can launch 3 workers, and they coordinate amongst themselves to make sure
that only one will run the task at a time. Great! This task will last
forever...
...but now we encounter my problem...
What happens if it crashes, or passes stop_time? Then the task will turn
off, and the queue is no longer processed. Or what happens if I reset the
database, or install this code on a new server? It isn't nice if I have to
re-run the insert function by hand.
So how can I ensure there is always EXACTLY ONE of each task in the
database?
I tried putting this code into models:
def initialize_task_queue(task_name):
num_tasks = db((db.scheduler_task.function_name == task_name)
& ((db.scheduler_task.status == 'QUEUED')
| (db.scheduler_task.status == 'ASSIGNED')
| (db.scheduler_task.status == 'RUNNING')
| (db.scheduler_task.status == 'ACTIVE'))).count()
# Add a task if there isn't one already
if num_tasks < 1:
db.scheduler_task.insert(function_name=task_name,
task_name=task_name,
stop_time = now + timedelta(days=90000),
repeats=0,
period=period)
db.commit()
initialize_task_queue('task1')
initialize_task_queue('task2')
initialize_task_queue('task3')
This worked, except it introduces a race condition! If you start three
web2py processes simultaneously (e.g., for three scheduler processes), they
will insert duplicate tasks:
process 1: count number of 'task1' tasks
process 2: count number of 'task1' tasks
process 1: there are less than 1, insert a 'task1' task
process 2: there are less than 1, insert a 'task1' task
I was counting on postgresql's MVCC transaction support to make each of
these atomic. Unfortunately, that's not how it works. I do not understand
why. As a workaround, I'm currently wrapping the code inside
"initialize_task_queue" with postgresql advisory lock:
if not db.executesql('select pg_try_advisory_lock(1);')[0][0]:
return
... count tasks, add one if needed ...
db.executesql('select pg_advisory_unlock(1);')
But this sucks.
What's a better way to ensure there is always 1 infinite-repeat task in the
scheduler? Or... am I using the wrong design entirely?