We are evaluating Aurora as a workflow management tool for batch processing pipelines. We basically need a tool that regularly runs batch processes that are connected as producers/consumers of data, typically stored in HDFS or S3.
The alternative tools would be Azkaban, Luigi, and Oozie, but I am hoping that building something built on Aurora would result in a better solution. Does anyone have experience with building workflows with Aurora? How is Twitter handling batch pipelines? Would the approach below make sense, or are there better suggestions? Is there anything related to this in the roadmap or available inside Twitter only? In our case, the batch processes will be a mix of cluster computation's with Spark, and single-node computations. We want the latter to also be scheduled on a farm, and this is why we are attracted to Mesos. In the text below, I'll call each part of a pipeline a 'step', in order to avoid confusion with Aurora jobs and tasks. My unordered wishlist is: * Data pipelines consist of DAGs, where steps take one or more inputs, and generate one or more outputs. * Independent steps in the DAG execute in parallel, constrained by resources. * Steps can be written in different languages and frameworks, some clustered. * The developer code/test/debug cycle is quick, and all functional tests can execute on a laptop. * Developers can test integrated data pipelines, consisting of multiple steps, on laptops. * Steps and their intputs and outputs are parameterised, e.g. by date. A parameterised step is typically independent from other instances of the same step, e.g. join one day's impressions log with user demographics. In some cases, steps depend on yesterday's results, e.g. apply one day's user management operation log to the user dataset from the day before. * Data pipelines are specified in embedded DSL files (e.g. aurora files), kept close to the business logic code. * Batch steps should be started soon after the input files become available. * Steps should gracefully avoid recomputation when output files exist. * Backfilling a window back in time, e.g. 30 days, should happen automatically if some earlier steps have failed, or if output files have been deleted manually. * Continuous deployment in the sense that steps are automatically deployed and scheduled after 'git push'. * Step owners can get an overview of step status and history, and debug step execution, e.g. by accessing log files. I am aware that no framework will give us everything. It is a matter of how much we need to live without or build ourselves. I made a spike to map our desires to Aurora primitives for a toy example, and the aurora file pasted below is what I came up with. The most reasonable mapping seemed to be a single job for a pipeline, with a concatenated sequence of multiple tasks, pretty long due to backfilling. I tried building a DAG, but was bitten by Aurora adding up the resource requirements for parallel tasks. I had to push the conditional execution and data parameter mapping to the step program itself. I have attached it, but it is not interesting in itself. import os pkg_path = '/vagrant/test_srcs/pipe_test/data_process.py' # Can we pass parameters to an aurora script? It seems not, use an environment variable. test_mode = os.environ.get('DATA_PIPE_TEST_MODE', '') != '' if test_mode: # For integration testing, use cluster in local vagrant box. cluster = 'devcluster' # Use local disk, with synthetic source files created by test harness. hdfs_root = '/var/tmp/data_pipe_test' # When testing, run once only cron_params = {} else: # In a production setting, this would be a production cluster name. While playing around, use a developer machine # cluster. cluster = 'devcluster' # In reality, this would point to a shared file system, e.g. real HDFS. hdfs_root = '/var/tmp/hdfs' # Run often when playing around, in reality, we might run every 5-60 minutes. cron_params = {'cron_schedule': '* * * * *', 'cron_collision_policy': 'CANCEL_NEW'} impressions_file = '{0}/impressions_%Y%m%d-%H.txt'.format(hdfs_root) geo_ip_file = '{0}/geo_ip_%Y%m%d-%H.txt'.format(hdfs_root) joined_file = '{0}/joined_%Y%m%d-%H.txt'.format(hdfs_root) count_file = '{0}/count_%Y%m%d-%H.txt'.format(hdfs_root) # Use minutes scale for playing around with the setup. In production, we would work with daily runs. join_process = Process(name='join_backfilled_{{hour_offset}}', cmdline='%s generate {{hour_offset}} %s %s %s' % ( pkg_path, impressions_file, geo_ip_file, joined_file)) producer = Task( name='join_backfilled_{{hour_offset}}', processes=[join_process], resources=Resources(cpu=1.0, ram=10 * MB, disk=100 * MB)) count_process = Process(name='count_backfilled_{{hour_offset}}', cmdline='%s count {{hour_offset}} %s %s' % (pkg_path, joined_file, count_file)) counter = Task( name='count_backfilled_{{hour_offset}}', processes=[count_process], resources=Resources(cpu=1.0, ram=40 * MB, disk=40 * MB)) counter_pipe_minutely = Tasks.concat(producer, counter) counter_pipe_backfill = Tasks.concat(*[counter_pipe_minutely.bind( hour_offset=Integer(offset)) for offset in range(5)]) job_template = Job( role='www-data', contact='lars.alberts...@gmail.com', instances=1, service=False, task=counter_pipe_backfill, **cron_params) jobs = [ job_template(name='serial_job', cluster=cluster, environment='prod') ] Regards, Lars Albertsson
#! /usr/bin/env python import os import click import random import time import datetime import sys @click.group() def cli(): pass time_format = '%Y-%m-%dT%H:%M:%S' def target_minute(hours_offset, now): now_time = datetime.datetime.strptime(now, time_format) target_time = now_time.replace(minute=0, second=0, microsecond=0) - datetime.timedelta(hours=int(hours_offset)) return target_time @cli.command() @click.option('--now', default=datetime.datetime.now().strftime(time_format)) @click.argument('hour_offset') @click.argument('impressions') @click.argument('geo_ip') @click.argument('output') def generate(now, hour_offset, impressions, geo_ip, output): _ = impressions, geo_ip # Pretend that we use the input files target_time = target_minute(hour_offset, now) out_file = target_time.strftime(output) if os.path.exists(out_file): click.echo('Output already exists: {}'.format(out_file)) return click.echo('Generating: {}'.format(out_file)) time.sleep(random.randint(0, 10)) with open(out_file, 'w') as out: # Generate random data out.write('\n'.join( map(str, [random.randint(0, 1000) for _ in range(target_time.hour + target_time.minute + 1)]) + [''])) click.echo('Generated: {}'.format(out_file)) @cli.command() @click.option('--now', default=datetime.datetime.now().strftime(time_format)) @click.argument('hour_offset') @click.argument('input') @click.argument('output') def count(now, hour_offset, input, output): target_time = target_minute(hour_offset, now) in_file = target_time.strftime(input) out_file = target_time.strftime(output) if os.path.exists(out_file): click.echo('Output already exists: {}'.format(out_file)) return click.echo('Running count on: {}'.format(in_file)) time.sleep(random.randint(0, 10)) with open(in_file) as src: lines = src.readlines() result = sum(map(int, lines)) with open(out_file, 'w') as out: out.write('{}\n'.format(result)) click.echo('Created b: {}'.format(out_file)) if __name__ == '__main__': click.echo(' '.join(sys.argv)) cli()