This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 73a87f8885 Add concept doc for Dynamic Task Mapping (#22867)
73a87f8885 is described below
commit 73a87f8885d1a72723e403f4b3cd868199e308fd
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Mon Apr 11 15:28:40 2022 +0100
Add concept doc for Dynamic Task Mapping (#22867)
Co-authored-by: Daniel Standish
<[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: eladkal <[email protected]>
---
.../concepts/dynamic-task-mapping.rst | 261 +++++++++++++++++++++
docs/apache-airflow/concepts/index.rst | 1 +
.../howto/dynamic-dag-generation.rst | 2 +
docs/apache-airflow/img/mapping-simple-graph.png | Bin 0 -> 7676 bytes
docs/apache-airflow/img/mapping-simple-grid.png | Bin 0 -> 179670 bytes
5 files changed, 264 insertions(+)
diff --git a/docs/apache-airflow/concepts/dynamic-task-mapping.rst
b/docs/apache-airflow/concepts/dynamic-task-mapping.rst
new file mode 100644
index 0000000000..b326a31672
--- /dev/null
+++ b/docs/apache-airflow/concepts/dynamic-task-mapping.rst
@@ -0,0 +1,261 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+====================
+Dynamic Task Mapping
+====================
+
+Dynamic Task Mapping allows a way for a workflow to create a number of tasks
at runtime based upon current data, rather than the DAG author having to know
in advance how many tasks would be needed.
+
+This is similar to defining your tasks in a for loop, but instead of having
the DAG file fetch the data and do that itself, the scheduler can do this based
on the output of a previous task. Right before a mapped task is executed the
scheduler will create *n* copies of the task, one for each input.
+
+It is also possible to have a task operate on the collected output of a mapped
task, commonly known as map and reduce.
+
+Simple mapping
+==============
+
+In its simplest form you can map over a list defined directly in your DAG file
using the ``expand()`` function instead of calling your task directly.
+
+.. code-block:: python
+
+ from datetime import datetime
+
+ from airflow import DAG
+ from airflow.decorators import task
+
+
+ with DAG(dag_id="simple_mapping", start_date=datetime(2022, 3, 4)) as dag:
+
+ @task
+ def add_one(x: int):
+ return x + 1
+
+ @task
+ def sum_it(values):
+ total = sum(values)
+ print(f"Total was {total}")
+
+ added_values = add_one.expand(x=[1, 2, 3])
+ sum_it(added_values)
+
+This will show ``Total was 9`` in the task logs when executed.
+
+This is the resulting DAG structure:
+
+.. image:: /img/mapping-simple-graph.png
+
+The grid view also provides visibility into your mapped tasks in the details
panel:
+
+.. image:: /img/mapping-simple-grid.png
+
+.. note:: A reduce task is not required.
+
+ Although we show a "reduce" task here (``sum_it``) you don't have to have
one, the mapped tasks will still be executed even if they have no downstream
tasks.
+
+Repeated Mapping
+================
+
+The result of one mapped task can also be used as input to the next mapped
task.
+
+.. code-block:: python
+
+ with DAG(dag_id="repeated_mapping", start_date=datetime(2022, 3, 4)) as
dag:
+
+ @task
+ def add_one(x: int):
+ return x + 1
+
+ first = add_one.expand(x=[1, 2, 3])
+ second = add_one.expand(x=first)
+
+This would have a result of ``[3, 4, 5]``.
+
+Constant parameters
+===================
+
+As well as passing arguments that get expanded at run-time, it is possible to
pass arguments that don't change – in order to clearly differentiate between
the two kinds we use different functions, ``expand()`` for mapped arguments,
and ``partial()`` for unmapped ones.
+
+.. code-block:: python
+
+ @task
+ def add(x: int, y: int):
+ return x + y
+
+
+ added_values = add.partial(y=10).expand(x=[1, 2, 3])
+ # This results in add function being expanded to
+ # add(x=1, y=10)
+ # add(x=2, y=10)
+ # add(x=3, y=10)
+
+This would result in values of 11, 12, and 13.
+
+This is also useful for passing things such as connection IDs, database table
names, or bucket names to tasks.
+
+Mapping over multiple parameters
+================================
+
+As well as a single parameter it is possible to pass multiple parameters to
expand. This will have the effect of creating a "cross product", calling the
mapped task with each combination of parameters.
+
+.. code-block:: python
+
+ @task
+ def add(x: int, y: int):
+ return x + y
+
+
+ added_values = add.expand(x=[2, 4, 8], y=[5, 10])
+ # This results in the add function being called with
+ # add(x=2, y=5)
+ # add(x=2, y=10)
+ # add(x=4, y=5)
+ # add(x=4, y=10)
+ # add(x=8, y=5)
+ # add(x=8, y=10)
+
+This would result in the add task being called 6 times. Please note however
that the order of expansion is not guaranteed.
+
+It is not possible to achieve an effect similar to Python's ``zip`` function
with mapped arguments.
+
+Task-generated Mapping
+======================
+
+Up until now the examples we've shown could all be achieved with a ``for``
loop in the DAG file, but the real power of dynamic task mapping comes from
being able to have a task generate the list to iterate over.
+
+.. code-block:: python
+
+ @task
+ def make_list():
+ # This can also be from an API call, checking a database, -- almost
anything you like, as long as the
+ # resulting list/dictionary can be stored in the current XCom backend.
+ return [1, 2, {"a": "b"}, "str"]
+
+
+ @task
+ def consumer(arg):
+ print(repr(arg))
+
+
+ with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
+ consumer.expand(arg=make_list())
+
+The ``make_list`` task runs as a normal task and must return a list or dict
(see `What data types can be expanded?`_), and then the ``consumer`` task will
be called four times, once with each value in the return of ``make_list``.
+
+Mapping with non-TaskFlow operators
+===================================
+
+It is possible to use ``partial`` and ``expand`` with classic style operators
as well. Some arguments are not mappable and must be passed to ``partial()``,
such as ``task_id``, ``queue``, ``pool``, and most other arguments to
``BaseOperator``.
+
+
+.. code-block:: python
+
+ BashOperator.partial(task_id="bash", do_xcom_push=False).expand(
+ bash_command=["echo 1", "echo 2"]
+ )
+
+Mapping over result of classic operators
+----------------------------------------
+
+If you want to map over the result of a classic operator you will need to
create an ``XComArg`` object manually.
+
+.. code-block:: python
+
+ from airflow import XComArg
+
+ task = MyOperator(task_id="source")
+
+ downstream =
MyOperator2.partial(task_id="consumer").expand(input=XComArg(task))
+
+Putting it all together
+=======================
+
+In this example you have a regular data delivery to an S3 bucket and want to
apply the same processing to every file that arrives, no matter how many arrive
each time.
+
+.. code-block:: python
+
+ from datetime import datetime
+
+ from airflow import DAG, XComArg
+ from airflow.decorators import task
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+ from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
+
+
+ with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
+ files = S3ListOperator(
+ task_id="get_input",
+ bucket="example-bucket",
+ prefix='incoming/provider_a/{{
data_interval_start.strftime("%Y-%m-%d") }}',
+ )
+
+ @task
+ def count_lines(aws_conn_id, bucket, file):
+ hook = S3Hook(aws_conn_id=aws_conn_id)
+
+ return len(hook.read_key(file, bucket).splitlines())
+
+ @task
+ def total(lines):
+ return sum(lines)
+
+ counts = count_lines.partial(aws_conn_id="aws_default",
bucket=files.bucket).expand(
+ file=XComArg(files)
+ )
+ total(lines=counts)
+
+What data types can be expanded?
+================================
+
+Currently it is only possible to map against a dict, a list, or one of those
types stored in XCom as the result of a task.
+
+If an upstream task returns an unmappable type, the mapped task will fail at
run-time with an ``UnmappableXComTypePushed`` exception. For instance, you
can't have the upstream task return a plain string – it must be a list or a
dict.
+
+Placing limits on mapped tasks
+==============================
+
+There are two limits that you can place on a task:
+
+ #. the number of mapped task instances can be created as the result of
expansion.
+ #. The number of the mapped task can run at once.
+
+- **Limiting number of mapped task**
+
+ The [core] ``max_map_length`` config option is the maximum number of tasks
that ``expand`` can create – the default value is 1024.
+
+ If a source task (``make_list`` in our earlier example) returns a list
longer than this it will result in *that* task failing.
+
+- **Limiting parallel copies of a mapped task**
+
+ If you wish to not have a large mapped task consume all available runner
slots you can use the ``max_active_tis_per_dag`` setting on the task to
restrict how many can be running at the same time.
+
+ Note however that this applies to all copies of that task against all active
DagRuns, not just to this one specific DagRun.
+
+ .. code-block:: python
+
+ @task(max_active_tis_per_dag=16)
+ def add_one(x: int):
+ return x + 1
+
+
+ BashOperator.partial(task_id="my_task",
max_active_tis_per_dag=16).expand(
+ bash_command=commands
+ )
+
+Automatically skipping zero-length maps
+=======================================
+
+If the input is empty (zero length), no new tasks will be created and the
mapped task will be marked as ``SKIPPED``.
diff --git a/docs/apache-airflow/concepts/index.rst
b/docs/apache-airflow/concepts/index.rst
index 4d99555be1..f4f0cb3b65 100644
--- a/docs/apache-airflow/concepts/index.rst
+++ b/docs/apache-airflow/concepts/index.rst
@@ -36,6 +36,7 @@ Here you can find detailed documentation about each one of
Airflow's core concep
dags
tasks
operators
+ dynamic-task-mapping
sensors
deferring
smart-sensors
diff --git a/docs/apache-airflow/howto/dynamic-dag-generation.rst
b/docs/apache-airflow/howto/dynamic-dag-generation.rst
index 185c577d18..fb86abbe54 100644
--- a/docs/apache-airflow/howto/dynamic-dag-generation.rst
+++ b/docs/apache-airflow/howto/dynamic-dag-generation.rst
@@ -20,6 +20,8 @@
Dynamic DAG Generation
======================
+To have a task repeated based on the output/result of a previous task see
:doc:`/concepts/dynamic-task-mapping`.
+
Dynamic DAGs with environment variables
.......................................
diff --git a/docs/apache-airflow/img/mapping-simple-graph.png
b/docs/apache-airflow/img/mapping-simple-graph.png
new file mode 100644
index 0000000000..9e01d027ec
Binary files /dev/null and b/docs/apache-airflow/img/mapping-simple-graph.png
differ
diff --git a/docs/apache-airflow/img/mapping-simple-grid.png
b/docs/apache-airflow/img/mapping-simple-grid.png
new file mode 100644
index 0000000000..59f5e6f71c
Binary files /dev/null and b/docs/apache-airflow/img/mapping-simple-grid.png
differ