I'm trying some variations but it seems that the culprit is assigning
and retrieving task_scheduled in the same process.

I don't know dal internals with transactions, locking and commits... a
hint though (my 2 cents): I added, just to check, a line after 245

...
if task:
    if task.assigned_worker_name != self.worker_name:
        logging.info('Someone stole my task!')
        return False
    logging.info('running task %s' % task.name)
...

and it never gets actually printed.
So it's not a problem of "I assigned a task to me, and before it gets
executed another one picked that task", at least I think.

Right now it seems working ok only if

def assign_next_task_new(self, group_names=['main']):
        """
        find next task that needs to be executed
        """
        db = self.db
        row =
db(db.task_scheduled.assigned_worker_name==self.worker_name).select(limitby=(0,1)).first()
        return row

is used as a replacement for assign_next_task.

I don't know if it's viable to run a single "assigner" and several
workers. In python maybe the "assigner" could fetch the task_scheduled
in waiting state list (with a sane limitby clause) and split evenly
the list assigning to alive workers....

http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python

For sql maniacs a simple script can be run as assigner (works only
with windowing functions, so check if your database supports it)

for postgres this works like a charm:

update task_scheduled
set assigned_worker_name = worker.name,
status='running',
last_run_time=now()
from
        (
        select ntile((select count(*)::int from worker_heartbeat)) OVER
(order by id) as t_id, *
        from task_scheduled
        WHERE 1 = 1
        AND status = 'queued'
        AND ((assigned_worker_name IS NULL) OR (assigned_worker_name = ''))
        ) sched,
        (
        select ntile((select count(*)::int from worker_heartbeat)) OVER
(order by id) as w_id, name
        from worker_heartbeat
        ) worker
WHERE worker.w_id = sched.t_id
and sched.id = task_scheduled.id

PS: I noticed another "fixable" aspect.... worker_heartbeat gets
polluted, it would be ok to eliminate the record when ctrl+c is
pressed (or process is killed gracefully)

Reply via email to