We are evaluating Aurora as a workflow management tool for batch
processing pipelines. We basically need a tool that regularly runs
batch processes that are connected as producers/consumers of data,
typically stored in HDFS or S3.

The alternative tools would be Azkaban, Luigi, and Oozie, but I am
hoping that building something built on Aurora would result in a
better solution.

Does anyone have experience with building workflows with Aurora? How
is Twitter handling batch pipelines? Would the approach below make
sense, or are there better suggestions? Is there anything related to
this in the roadmap or available inside Twitter only?


In our case, the batch processes will be a mix of cluster
computation's with Spark, and single-node computations. We want the
latter to also be scheduled on a farm, and this is why we are
attracted to Mesos. In the text below, I'll call each part of a
pipeline a 'step', in order to avoid confusion with Aurora jobs and
tasks.

My unordered wishlist is:
* Data pipelines consist of DAGs, where steps take one or more inputs,
and generate one or more outputs.

* Independent steps in the DAG execute in parallel, constrained by resources.

* Steps can be written in different languages and frameworks, some clustered.

* The developer code/test/debug cycle is quick, and all functional
tests can execute on a laptop.

* Developers can test integrated data pipelines, consisting of
multiple steps, on laptops.

* Steps and their intputs and outputs are parameterised, e.g. by date.
A parameterised step is typically independent from other instances of
the same step, e.g. join one day's impressions log with user
demographics. In some cases, steps depend on yesterday's results, e.g.
apply one day's user management operation log to the user dataset from
the day before.

* Data pipelines are specified in embedded DSL files (e.g. aurora
files), kept close to the business logic code.

* Batch steps should be started soon after the input files become available.

* Steps should gracefully avoid recomputation when output files exist.

* Backfilling a window back in time, e.g. 30 days, should happen
automatically if some earlier steps have failed, or if output files
have been deleted manually.

* Continuous deployment in the sense that steps are automatically
deployed and scheduled after 'git push'.

* Step owners can get an overview of step status and history, and
debug step execution, e.g. by accessing log files.


I am aware that no framework will give us everything. It is a matter
of how much we need to live without or build ourselves.

I made a spike to map our desires to Aurora primitives for a toy
example, and the aurora file pasted below is what I came up with. The
most reasonable mapping seemed to be a single job for a pipeline, with
a concatenated sequence of multiple tasks, pretty long due to
backfilling. I tried building a DAG, but was bitten by Aurora adding
up the resource requirements for parallel tasks.
I had to push the conditional execution and data parameter mapping to
the step program itself. I have attached it, but it is not interesting
in itself.


import os

pkg_path = '/vagrant/test_srcs/pipe_test/data_process.py'

# Can we pass parameters to an aurora script? It seems not, use an
environment variable.
test_mode = os.environ.get('DATA_PIPE_TEST_MODE', '') != ''
if test_mode:
    # For integration testing, use cluster in local vagrant box.
    cluster = 'devcluster'
    # Use local disk, with synthetic source files created by test harness.
    hdfs_root = '/var/tmp/data_pipe_test'
    # When testing, run once only
    cron_params = {}
else:
    # In a production setting, this would be a production cluster
name. While playing around, use a developer machine
    # cluster.
    cluster = 'devcluster'
    # In reality, this would point to a shared file system, e.g. real HDFS.
    hdfs_root = '/var/tmp/hdfs'
    # Run often when playing around, in reality, we might run every
5-60 minutes.
    cron_params = {'cron_schedule': '* * * * *',
'cron_collision_policy': 'CANCEL_NEW'}

impressions_file = '{0}/impressions_%Y%m%d-%H.txt'.format(hdfs_root)
geo_ip_file = '{0}/geo_ip_%Y%m%d-%H.txt'.format(hdfs_root)
joined_file = '{0}/joined_%Y%m%d-%H.txt'.format(hdfs_root)
count_file = '{0}/count_%Y%m%d-%H.txt'.format(hdfs_root)

# Use minutes scale for playing around with the setup. In production,
we would work with daily runs.
join_process = Process(name='join_backfilled_{{hour_offset}}',
cmdline='%s generate {{hour_offset}} %s %s %s' % (
    pkg_path, impressions_file, geo_ip_file, joined_file))

producer = Task(
    name='join_backfilled_{{hour_offset}}',
    processes=[join_process],
    resources=Resources(cpu=1.0, ram=10 * MB, disk=100 * MB))

count_process = Process(name='count_backfilled_{{hour_offset}}',
                        cmdline='%s count {{hour_offset}} %s %s' %
(pkg_path, joined_file, count_file))
counter = Task(
    name='count_backfilled_{{hour_offset}}',
    processes=[count_process],
    resources=Resources(cpu=1.0, ram=40 * MB, disk=40 * MB))

counter_pipe_minutely = Tasks.concat(producer, counter)

counter_pipe_backfill = Tasks.concat(*[counter_pipe_minutely.bind(
    hour_offset=Integer(offset)) for offset in range(5)])

job_template = Job(
    role='www-data',
    contact='lars.alberts...@gmail.com',
    instances=1,
    service=False,
    task=counter_pipe_backfill,
    **cron_params)

jobs = [
    job_template(name='serial_job', cluster=cluster, environment='prod')
]


Regards,

Lars Albertsson
#! /usr/bin/env python
import os

import click
import random
import time
import datetime
import sys


@click.group()
def cli():
    pass


time_format = '%Y-%m-%dT%H:%M:%S'


def target_minute(hours_offset, now):
    now_time = datetime.datetime.strptime(now, time_format)
    target_time = now_time.replace(minute=0, second=0, microsecond=0) - datetime.timedelta(hours=int(hours_offset))
    return target_time


@cli.command()
@click.option('--now', default=datetime.datetime.now().strftime(time_format))
@click.argument('hour_offset')
@click.argument('impressions')
@click.argument('geo_ip')
@click.argument('output')
def generate(now, hour_offset, impressions, geo_ip, output):
    _ = impressions, geo_ip  # Pretend that we use the input files
    target_time = target_minute(hour_offset, now)
    out_file = target_time.strftime(output)
    if os.path.exists(out_file):
        click.echo('Output already exists: {}'.format(out_file))
        return
    click.echo('Generating: {}'.format(out_file))
    time.sleep(random.randint(0, 10))
    with open(out_file, 'w') as out:
        # Generate random data
        out.write('\n'.join(
            map(str, [random.randint(0, 1000) for _ in range(target_time.hour + target_time.minute + 1)]) + ['']))
    click.echo('Generated: {}'.format(out_file))


@cli.command()
@click.option('--now', default=datetime.datetime.now().strftime(time_format))
@click.argument('hour_offset')
@click.argument('input')
@click.argument('output')
def count(now, hour_offset, input, output):
    target_time = target_minute(hour_offset, now)
    in_file = target_time.strftime(input)
    out_file = target_time.strftime(output)
    if os.path.exists(out_file):
        click.echo('Output already exists: {}'.format(out_file))
        return
    click.echo('Running count on: {}'.format(in_file))
    time.sleep(random.randint(0, 10))
    with open(in_file) as src:
        lines = src.readlines()
    result = sum(map(int, lines))
    with open(out_file, 'w') as out:
        out.write('{}\n'.format(result))
    click.echo('Created b: {}'.format(out_file))


if __name__ == '__main__':
    click.echo(' '.join(sys.argv))
    cli()

Reply via email to