Interesting way to combine taskflow + celery.

I didn't expect it to be used like this, but the more power to you!

Taskflow itself has some similar capabilities via http://docs.openstack.org/developer/taskflow/workers.html#design but anyway what u've done is pretty neat as well.

I am assuming this isn't an openstack project (due to usage of celery), any details on what's being worked on (am curious here)?

pnkk wrote:
Thanks for the nice documentation.

To my knowledge celery is widely used for distributed task processing.
This fits our requirement perfectly where we want to return immediate
response to the user from our API server and run long running task in
background. Celery also gives flexibility with the worker
types(process(can overcome GIL problems too)/evetlet...) and it also
provides nice message brokers(rabbitmq,redis...)

We used both celery and taskflow for our core processing to leverage the
benefits of both. Taskflow provides nice primitives like(execute,
revert, pre,post stuf) which takes off the load from the application.

As far as the actual issue is concerned, I found one way to solve it by
using celery "retry" option. This along with late_acks makes the
application highly fault tolerant.

http://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry

Regards,
Kanthi


On Sat, May 28, 2016 at 1:51 AM, Joshua Harlow <[email protected]
<mailto:[email protected]>> wrote:

    Seems like u could just use
    http://docs.openstack.org/developer/taskflow/jobs.html (it appears
    that you may not be?); the job itself would when failed be then
    worked on by a different job consumer.

    Have u looked at those? It almost appears that u are using celery as
    a job distribution system (similar to the jobs.html link mentioned
    above)? Is that somewhat correct (I haven't seen anyone try this,
    wondering how u are using it and the choices that directed u to
    that, aka, am curious)?

    -Josh

    pnkk wrote:

        To be specific, we hit this issue when the node running our
        service is
        rebooted.
        Our solution is designed in a way that each and every job is a
        celery
        task and inside celery task, we create taskflow flow.

        We enabled late_acks in celery(uses rabbitmq as message broker),
        so if
        our service/node goes down, other healthy service can pick the
        job and
        completes it.
        This works fine, but we just hit this rare case where the node was
        rebooted just when taskflow is updating something to the database.

        In this case, it raises an exception and the job is marked
        failed. Since
        it is complete(with failure), message is removed from the
        rabbitmq and
        other worker would not be able to process it.
        Can taskflow handle such I/O errors gracefully or should
        application try
        to catch this exception? If application has to handle it what would
        happen to that particular database transaction which failed just
        when
        the node is rebooted? Who will retry this transaction?

        Thanks,
        Kanthi

        On Fri, May 27, 2016 at 5:39 PM, pnkk <[email protected]
        <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>> wrote:

             Hi,

             When taskflow engine is executing a job, the execution
        failed due to
             IO error(traceback pasted below).

             2016-05-25 19:45:21.717 7119 ERROR
             taskflow.engines.action_engine.engine 127.0.1.1 [-]  Engine
             execution has failed, something bad must of happened (last 10
             machine transitions were [('SCHEDULING', 'WAITING'),
        ('WAITING',
        'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING',
        'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING', 'SCHEDULING'),
             ('SCHEDULING', 'WAITING'), ('WAITING', 'ANALYZING'),
        ('ANALYZING',
        'GAME_OVER'), ('GAME_OVER', 'FAILURE')])
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine Traceback (most
        recent call last):
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py",
             line 269, in run_iter
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
             failure.Failure.reraise_if_any(memory.failures)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
             line 336, in reraise_if_any
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     failures[0].reraise()
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
             line 343, in reraise
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
          six.reraise(*self._exc_info)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",
             line 94, in schedule
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
             futures.add(scheduler.schedule(atom))
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",
             line 67, in schedule
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     return
             self._task_action.schedule_execution(task)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",
             line 99, in schedule_execution
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
          self.change_state(task,
             states.RUNNING, progress=0.0)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",
             line 67, in change_state
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
             self._storage.set_atom_state(task.name <http://task.name>
        <http://task.name>, state)

             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/fasteners/lock.py",
             line 85, in wrapper
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     return f(self, *args,
             **kwargs)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",
             line 486, in set_atom_state
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
             self._with_connection(self._save_atom_detail, source, clone)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",
             line 341, in _with_connection
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     return functor(conn,
             *args, **kwargs)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",
             line 471, in _save_atom_detail
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine

        original_atom_detail.update(conn.update_atom_details(atom_detail))
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/persistence/backends/impl_sqlalchemy.py",
             line 427, in update_atom_details
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     row =
        conn.execute(q).first()
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
             line 914, in execute
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     return meth(self,
             multiparams, params)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
             line 323, in _execute_on_connection
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     return
             connection._execute_clauseelement(self, multiparams, params)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
             line 1003, in _execute_clauseelement
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
             inline=len(distilled_params) > 1)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File "<string>",
        line 1, in
        <lambda>
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
             line 494, in compile
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     return
             self._compiler(dialect, bind=bind, **kw)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
             line 500, in _compiler
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     return
             dialect.statement_compiler(dialect, self, **kw)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",
             line 392, in __init__
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
          Compiled.__init__(self,
             dialect, statement, **kwargs)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",
             line 190, in __init__
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     self.string =
             self.process(self.statement, **compile_kwargs)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",
             line 213, in process
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     return
             obj._compiler_dispatch(self, **kwargs)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/visitors.py",
             line 81, in _compiler_dispatch
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     return meth(self,
        **kw)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",
             line 1579, in visit_select
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     for name, column in
             select._columns_plus_names
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",
             line 1347, in _label_select_column
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
             add_to_result_map=add_to_result_map
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/celery/apps/worker.py",
             line 288, in _handle_request
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine     safe_say('worker: {0}
             shutdown (MainProcess)'.format(how))
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine   File
        
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/celery/apps/worker.py",
             line 73, in safe_say
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine
          print('\n{0}'.format(msg),
             file=sys.__stderr__)
             2016-05-25 19:45:21.717 7119 TRACE
             taskflow.engines.action_engine.engine IOError: [Errno 5]
             Input/output error
             2016-05-25 19:45:21.717 7119 TRACE
        taskflow.engines.action_engine.engine

             There could be a transient network issue which prevents
        taskflow
             from reaching the mysql node.
             Can you please suggest a graceful way of handling it and
        continue
             processing the execution?

             Thanks,
             Kanthi


        
__________________________________________________________________________
        OpenStack Development Mailing List (not for usage questions)
        Unsubscribe:
        [email protected]?subject:unsubscribe
        <http://[email protected]?subject:unsubscribe>
        http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


    __________________________________________________________________________
    OpenStack Development Mailing List (not for usage questions)
    Unsubscribe:
    [email protected]?subject:unsubscribe
    <http://[email protected]?subject:unsubscribe>
    http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


__________________________________________________________________________
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: [email protected]?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev

__________________________________________________________________________
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: [email protected]?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev

Reply via email to