I'm afraid in general the use cases you describe are not things that Aurora
currently intends to fulfill.  Though, that's not to say that you could not
do this on top of Aurora if you wanted to.

Does anyone have experience with building workflows with Aurora?


I do not.  I could opine about how one might build them on top of Aurora,
by treating each step in a workflow as a job and using Aurora as a work
queue.  This may not be an ideal solution.


> How is Twitter handling batch pipelines?
>

I have limited awareness on this, but another Aurora developer who i've
prodded to chime in may be able to offer some insight.

Would the approach below make sense, or are there better suggestions?
>

The approach below could only make sense insomuch as machine-local DAGs
make sense for you.  The resource calculation hurdle sounds like something
we could address, but if you want to make these grow horizontally, we do
not offer anything to assist.


> Is there anything related to this in the roadmap or available inside
> Twitter only?


To my knowledge, there is not.


-=Bill

On Tue, Mar 10, 2015 at 8:28 AM, Lars Albertsson <lars.alberts...@gmail.com>
wrote:

> We are evaluating Aurora as a workflow management tool for batch
> processing pipelines. We basically need a tool that regularly runs
> batch processes that are connected as producers/consumers of data,
> typically stored in HDFS or S3.
>
> The alternative tools would be Azkaban, Luigi, and Oozie, but I am
> hoping that building something built on Aurora would result in a
> better solution.
>
> Does anyone have experience with building workflows with Aurora? How
> is Twitter handling batch pipelines? Would the approach below make
> sense, or are there better suggestions? Is there anything related to
> this in the roadmap or available inside Twitter only?
>
>
> In our case, the batch processes will be a mix of cluster
> computation's with Spark, and single-node computations. We want the
> latter to also be scheduled on a farm, and this is why we are
> attracted to Mesos. In the text below, I'll call each part of a
> pipeline a 'step', in order to avoid confusion with Aurora jobs and
> tasks.
>
> My unordered wishlist is:
> * Data pipelines consist of DAGs, where steps take one or more inputs,
> and generate one or more outputs.
>
> * Independent steps in the DAG execute in parallel, constrained by
> resources.
>
> * Steps can be written in different languages and frameworks, some
> clustered.
>
> * The developer code/test/debug cycle is quick, and all functional
> tests can execute on a laptop.
>
> * Developers can test integrated data pipelines, consisting of
> multiple steps, on laptops.
>
> * Steps and their intputs and outputs are parameterised, e.g. by date.
> A parameterised step is typically independent from other instances of
> the same step, e.g. join one day's impressions log with user
> demographics. In some cases, steps depend on yesterday's results, e.g.
> apply one day's user management operation log to the user dataset from
> the day before.
>
> * Data pipelines are specified in embedded DSL files (e.g. aurora
> files), kept close to the business logic code.
>
> * Batch steps should be started soon after the input files become
> available.
>
> * Steps should gracefully avoid recomputation when output files exist.
>
> * Backfilling a window back in time, e.g. 30 days, should happen
> automatically if some earlier steps have failed, or if output files
> have been deleted manually.
>
> * Continuous deployment in the sense that steps are automatically
> deployed and scheduled after 'git push'.
>
> * Step owners can get an overview of step status and history, and
> debug step execution, e.g. by accessing log files.
>
>
> I am aware that no framework will give us everything. It is a matter
> of how much we need to live without or build ourselves.
>
> I made a spike to map our desires to Aurora primitives for a toy
> example, and the aurora file pasted below is what I came up with. The
> most reasonable mapping seemed to be a single job for a pipeline, with
> a concatenated sequence of multiple tasks, pretty long due to
> backfilling. I tried building a DAG, but was bitten by Aurora adding
> up the resource requirements for parallel tasks.
> I had to push the conditional execution and data parameter mapping to
> the step program itself. I have attached it, but it is not interesting
> in itself.
>
>
> import os
>
> pkg_path = '/vagrant/test_srcs/pipe_test/data_process.py'
>
> # Can we pass parameters to an aurora script? It seems not, use an
> environment variable.
> test_mode = os.environ.get('DATA_PIPE_TEST_MODE', '') != ''
> if test_mode:
>     # For integration testing, use cluster in local vagrant box.
>     cluster = 'devcluster'
>     # Use local disk, with synthetic source files created by test harness.
>     hdfs_root = '/var/tmp/data_pipe_test'
>     # When testing, run once only
>     cron_params = {}
> else:
>     # In a production setting, this would be a production cluster
> name. While playing around, use a developer machine
>     # cluster.
>     cluster = 'devcluster'
>     # In reality, this would point to a shared file system, e.g. real HDFS.
>     hdfs_root = '/var/tmp/hdfs'
>     # Run often when playing around, in reality, we might run every
> 5-60 minutes.
>     cron_params = {'cron_schedule': '* * * * *',
> 'cron_collision_policy': 'CANCEL_NEW'}
>
> impressions_file = '{0}/impressions_%Y%m%d-%H.txt'.format(hdfs_root)
> geo_ip_file = '{0}/geo_ip_%Y%m%d-%H.txt'.format(hdfs_root)
> joined_file = '{0}/joined_%Y%m%d-%H.txt'.format(hdfs_root)
> count_file = '{0}/count_%Y%m%d-%H.txt'.format(hdfs_root)
>
> # Use minutes scale for playing around with the setup. In production,
> we would work with daily runs.
> join_process = Process(name='join_backfilled_{{hour_offset}}',
> cmdline='%s generate {{hour_offset}} %s %s %s' % (
>     pkg_path, impressions_file, geo_ip_file, joined_file))
>
> producer = Task(
>     name='join_backfilled_{{hour_offset}}',
>     processes=[join_process],
>     resources=Resources(cpu=1.0, ram=10 * MB, disk=100 * MB))
>
> count_process = Process(name='count_backfilled_{{hour_offset}}',
>                         cmdline='%s count {{hour_offset}} %s %s' %
> (pkg_path, joined_file, count_file))
> counter = Task(
>     name='count_backfilled_{{hour_offset}}',
>     processes=[count_process],
>     resources=Resources(cpu=1.0, ram=40 * MB, disk=40 * MB))
>
> counter_pipe_minutely = Tasks.concat(producer, counter)
>
> counter_pipe_backfill = Tasks.concat(*[counter_pipe_minutely.bind(
>     hour_offset=Integer(offset)) for offset in range(5)])
>
> job_template = Job(
>     role='www-data',
>     contact='lars.alberts...@gmail.com',
>     instances=1,
>     service=False,
>     task=counter_pipe_backfill,
>     **cron_params)
>
> jobs = [
>     job_template(name='serial_job', cluster=cluster, environment='prod')
> ]
>
>
> Regards,
>
> Lars Albertsson
>

Reply via email to