Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
This guide shows you how to write an Apache Airflow directed acyclic graph (DAG) that runs in a Cloud Composer environment.
Because Apache Airflow does not provide strong DAG and task isolation, we recommend that you use separate production and test environments to prevent DAG interference. For more information, see Testing DAGs.
Structuring an Airflow DAG
An Airflow DAG is defined in a Python file and is composed of the following components:
- DAG definition
- Airflow operators
- Operator relationships
The following code snippets show examples of each component out of context.
A DAG definition
The following example demonstrates an Airflow DAG definition:
Operators and tasks
Airflow Operators describe the work to be done. A task task is a specific instance of an operator.
Task relationships
Task relationships describe the order in which the work must be completed.
Full DAG workflow example in Python
The following workflow is a complete working DAG template that is composed of
two tasks: a hello_python
task and a goodbye_bash
task:
For more information about defining Airflow DAGs, see the Airflow tutorial and Airflow concepts.
Airflow operators
The following examples show a few popular Airflow operators. For an authoritative reference of Airflow operators, see the Operators and Hooks Reference and Providers index.
BashOperator
Use the BashOperator to run command-line programs.
Cloud Composer runs the provided commands in a Bash script on an Airflow worker. The worker is a Debian-based Docker container and includes several packages.
gcloud
command, including thegcloud storage
sub-command for working with Cloud Storage buckets.bq
commandkubectl
command
PythonOperator
Use the PythonOperator to run arbitrary Python code.
Cloud Composer runs the Python code in a container that includes packages for the Cloud Composer image version used in your environment.
To install additional Python packages, see Installing Python Dependencies.
Google Cloud Operators
To run tasks that use Google Cloud products, use the Google Cloud Airflow operators. For example, BigQuery operators query and process data in BigQuery.
There are many more Airflow operators for Google Cloud and individual services provided by Google Cloud. See Google Cloud Operators for the full list.
EmailOperator
Use the EmailOperator to send email from a DAG. To send email from a Cloud Composer environment, configure your environment to use SendGrid.
Notifications on operator failure
Set email_on_failure
to True
to send an email notification when an operator
in the DAG fails. To send email notifications from a Cloud Composer
environment, you must
configure your environment to use SendGrid.
DAG workflow guidelines
Place any custom Python libraries in a DAG's ZIP archive in a nested directory. Do not place libraries at the top level of the DAGs directory.
When Airflow scans the
dags/
folder, Airflow only checks for DAGs in Python modules that are in the top-level of the DAGs folder and in the top level of a ZIP archive also located in the top-leveldags/
folder. If Airflow encounters a Python module in a ZIP archive that does not contain bothairflow
andDAG
substrings, Airflow stops processing the ZIP archive. Airflow returns only the DAGs found up to that point.For fault tolerance, do not define multiple DAG objects in the same Python module.
Do not use SubDAGs. Instead, group tasks inside DAGs.
Place files that are required at DAG parse time into
dags/
folder, not in thedata/
folder.Test developed or modified DAGs as recommended in instructions for testing DAGs.
Composer Local Development CLI tool streamlines Apache Airflow DAG development for Cloud Composer 2 by running an Airflow environment locally. This local Airflow environment uses an image of a specific Cloud Composer 2 version.
Verify that developed DAGs do not increase DAG parse times too much.
Airflow tasks can fail for multiple reasons. To avoid failures of whole DAG runs, we recommend to enable task retries. Setting maximum retries to
0
means that no retries are performed.We recommend to override the
default_task_retries
option with a value for the task retries other than0
. In addition, you can set theretries
parameter at the task level.If you want to use GPU in your Airflow tasks then create a separate GKE cluster based on nodes using machines with GPUs. Use GKEStartPodOperator to run your tasks.
Avoid running CPU- and memory-heavy tasks in the cluster's node pool where other Airflow components (schedulers, workers, web servers) are running. Instead, use KubernetesPodOperator or GKEStartPodOperator instead.
When deploying DAGs into an environment, upload only the files that are absolutely necessary for interpreting and executing DAGs into the
/dags
folder.Limit the number of DAG files in
/dags
folder.Airflow is continuously parsing DAGs in
/dags
folder. The parsing is a process that loops through the DAGs folder and the number of files that need to be loaded (with their dependencies) makes impacts the performance of DAG parsing and task scheduling. It is much more efficient to use 100 files with 100 DAGs each than 10000 files with 1 DAG each and so such optimization is recommended. This optimization is a balance between parsing time and efficiency of DAG authoring and management.You can also consider, for example, to deploy 10000 DAG files you could create 100 zip files each containing 100 DAG files.
In addition to hints above, if you have more than 10000 DAG files then generating DAGs in a programamtic way might be a good option. For example, you can implement a single Python DAG file that generates some number of DAG objects (for example, 20, 100 DAG objects).
FAQs for writing DAGs
How do I minimize code repetition if I want to run the same or similar tasks in multiple DAGs?
We suggest defining libraries and wrappers to minimize the code repetition.
How do I reuse code between DAG files?
Put your utility functions in a
local Python library
and import the functions. You can reference the functions in any DAG located
in the dags/
folder in your environment's bucket.
How do I minimize the risk of different definitions arising?
For example, you have two teams that want to aggregate raw data into revenue metrics. The teams write two slightly different tasks that accomplish the same thing. Define libraries to work with the revenue data so that the DAG implementers must clarify the definition of revenue that's being aggregated.
How do I set dependencies between DAGs?
This depends on how you want to define the dependency.
If you have two DAGs (DAG A and DAG B) and you want DAG B to trigger after DAG
A, you can put a
TriggerDagRunOperator
at the end of DAG A.
If DAG B depends only on an artifact that DAG A generates, such as a Pub/Sub message, then a sensor might work better.
If DAG B is integrated closely with DAG A, you might be able to merge the two DAGs into one DAG.
How do I pass unique run IDs to a DAG and its tasks?
For example, you want to pass Dataproc cluster names and file paths.
You can generate a random unique ID by returning str(uuid.uuid4())
in
a PythonOperator
. This places the ID into
XComs
so that you can refer to the ID in other operators
via templated fields.
Before generating a uuid
, consider whether a DagRun-specific ID would be
more valuable. You can also reference these IDs in Jinja substitutions by
using macros.
How do I separate tasks in a DAG?
Each task should be an idempotent unit of work. Consequently, you should avoid
encapsulating a multi-step workflow within a single task, such as a complex
program running in a PythonOperator
.
Should I define multiple tasks in a single DAG to aggregate data from multiple sources?
For example, you have multiple tables with raw data and want to create daily aggregates for each table. The tasks are not dependent on each other. Should you create one task and DAG for each table or create one general DAG?
If you are okay with each task sharing the same DAG-level properties, such as
schedule_interval
, then it makes sense to define multiple tasks in a single
DAG. Otherwise, to minimize code repetition, multiple DAGs can be generated
from a single Python module by placing them into the module's globals()
.
How do I limit the number of concurrent tasks running in a DAG?
For example, you want to avoid exceeding API usage limits/quotas or avoid running too many simultaneous processes.
You can define Airflow pools in the Airflow web UI and associate tasks with existing pools in your DAGs.
FAQs for using operators
Should I use the DockerOperator
?
We do not recommend using
the DockerOperator
, unless it's used to launch
containers on a remote Docker installation (not within an environment's
cluster). In a Cloud Composer environment the operator does not have
access to Docker daemons.
Instead, use KubernetesPodOperator
or
GKEStartPodOperator
. These operators launch Kubernetes pods into
Kubernetes or GKE clusters respectively. Note that we don't
recommend launching pods into an environment's cluster, because this can lead
to resource competition.
Should I use the SubDagOperator
?
We do not recommend using the SubDagOperator
.
Use alternatives as suggested in Grouping tasks.
Should I run Python code only in PythonOperators
to fully separate Python operators?
Depending on your goal, you have a few options.
If your only concern is maintaining separate Python dependencies, you
can use the PythonVirtualenvOperator
.
Consider using the KubernetesPodOperator
. This operator allows you
to define Kubernetes pods and run the pods in other clusters.
How do I add custom binary or non-PyPI packages?
You can install packages hosted in private package repositories.
How do I uniformly pass arguments to a DAG and its tasks?
You can use Airflow's built-in support for Jinja templating to pass arguments that can be used in templated fields.
When does template substitution happen?
Template substitution occurs on Airflow workers just before the pre_execute
function of an operator is called. In practice, this means that templates are
not substituted until just before a task runs.
How do I know which operator arguments support template substitution?
Operator arguments that support Jinja2 template substitution are explicitly marked as such.
Look for the template_fields
field in the Operator definition,
which contains a list of argument names that undergo template substitution.
For example, see
the BashOperator
, which supports templating for
the bash_command
and env
arguments.
What's next
- Troubleshooting DAGs
- Troubleshooting Scheduler
- Google Operators
- Google Cloud Operators
- Apache Airflow Tutorial