Orchestrate pipelines

This page explains pipeline orchestration with Cloud Composer and triggers. Cloud Data Fusion recommends using Cloud Composer to orchestrate pipelines. If you require a simpler way to manage orchestration, use triggers.

Composer

Orchestrate pipelines with Cloud Composer

Orchestrating pipeline execution in Cloud Data Fusion with Cloud Composer provides following benefits:

  • Centralized workflow management: uniformly manage the execution of multiple Cloud Data Fusion pipelines.
  • Dependency management: to ensure proper execution order, define dependencies between pipelines.
  • Monitoring and alerting: Cloud Composer provides monitoring capabilities and alerts for failures.
  • Integration with other services: Cloud Composer lets you orchestrate workflows that span across Cloud Data Fusion and other Google Cloud services.

To orchestrate Cloud Data Fusion pipelines using Cloud Composer, follow this process:

  1. Set up the Cloud Composer environment.

    • Create a Cloud Composer environment. If you don't have one, provision the environment in your Google Cloud project. This environment is your orchestration workspace.
    • Give permissions. Ensure the Cloud Composer service account has the necessary permissions to access Cloud Data Fusion (such as permission to start, stop, and list pipelines).
  2. Define Directed Acyclic Graphs (DAG) for orchestration.

    • Create a DAG: In Cloud Composer, create a DAG that defines the orchestration workflow for your Cloud Data Fusion pipelines.
    • Cloud Data Fusion Operators: Use Cloud Composer's Cloud Data Fusion Operators within your DAG. These operators let you interact programmatically with Cloud Data Fusion.

Cloud Data Fusion operators

Cloud Data Fusion pipeline orchestration has the following operators:

CloudDataFusionStartPipelineOperator

Triggers the execution of a Cloud Data Fusion pipeline by its ID. It has the following parameters:

  • Pipeline ID
  • Location (Google Cloud region)
  • Pipeline namespace
  • Runtime arguments (optional)
  • Wait for completion (optional)
  • Timeout (optional)
CloudDataFusionStopPipelineOperator

Lets you stop a running Cloud Data Fusion pipeline.

CloudDataFusionDeletePipelineOperator

Deletes a Cloud Data Fusion pipeline.

Build the DAG workflow

When you build the DAG workflow, consider the following:

  • Defining dependencies: Use the DAG structure to define dependencies between tasks. For example, you might have a task that waits for a pipeline in one namespace to complete successfully before triggering another pipeline in a different namespace.
  • Scheduling: Schedule the DAG to run at specific intervals, such as daily or hourly, or set it to be triggered manually.

For more information, see the Cloud Composer overview.

Triggers

Orchestrate pipelines with triggers

Cloud Data Fusion triggers let you automatically execute a downstream pipeline upon the completion (success, failure, or any specified condition) of one or more upstream pipelines.

Triggers are useful for the following tasks:

  • Cleaning your data once, and then making it available to multiple downstream pipelines for consumption.
  • Sharing information, such as runtime arguments and plugin configurations, between pipelines. This task is called payload configuration.
  • Having a set of dynamic pipelines that run using the data from the hour, day, week, or month, instead of a static pipeline that must be updated for every run.

For example, you have a dataset that contains all information about your company's shipments. Based on this data, you want to answer several business questions. To do this, you create one pipeline that cleanses the raw data about shipments, called Shipments Data Cleaning. Then you create a second pipeline, Delayed Shipments USA, which reads the cleansed data and finds the shipments within the USA that were delayed by more than a specified threshold. The Delayed Shipments USA pipeline can be triggered as soon as the upstream Shipments Data Cleaning pipeline successfully completes.

Additionally, since the downstream pipeline consumes the output of the upstream pipeline, you must specify that when the downstream pipeline runs using this trigger, it also receives the input directory to read from (which is the directory where the upstream pipeline generated its output). This process is called passing payload configuration, which you define with runtime arguments. It lets you have a set of dynamic pipelines that run using the data of the hour, day, week, or month (not a static pipeline, which must be updated for every run).

To orchestrate pipelines with triggers, follow this process:

  1. Create upstream and downstream pipelines.

    • In the Cloud Data Fusion Studio, design and deploy the pipelines that form your orchestration chain.
    • Consider which pipeline's completion will activate the next pipeline (downstream) in your workflow.
  2. Optional: pass runtime arguments for upstream pipelines.

  3. Create an inbound trigger on the downstream pipeline.

    • In the Cloud Data Fusion Studio, go to the List page. In the Deployed tab, click the name of the downstream pipeline. The Deploy view for that pipeline appears.
    • On the middle left side of the page, click Inbound triggers. A list of available pipelines appears.
    • Click the upstream pipeline. Select one or more of the upstream pipeline completion states (Succeeds, Fails, or Stops) as the condition for when the downstream pipeline should run.
    • If you want the upstream pipeline to share information (called payload configuration) with the downstream pipeline, click Trigger config and then follow the steps to pass payload configuration as runtime arguments. Otherwise, click Enable trigger.
  4. Test the trigger.

    • Initiate a run of the upstream pipeline.
    • If the trigger is configured correctly, the downstream pipeline automatically executes upon completion of the upstream pipelines, based on your configured condition.

Pass payload configuration as runtime arguments

Payload configuration allows sharing of information from the upstream pipeline to the downstream pipeline. This information can be, for example, the output directory, the data format, or the day the pipeline was run. This information is then used by the downstream pipeline for decisions such as determining the right dataset to read from.

To pass information from the upstream pipeline to the downstream pipeline, you set the runtime arguments of the downstream pipeline with the values of either the runtime arguments or the configuration of any plugin in the upstream pipeline.

Whenever the downstream pipeline triggers and runs, its payload configuration is set using the runtime arguments of the particular run of the upstream pipeline that triggered the downstream pipeline.

To pass payload configuration as runtime arguments, follow these steps:

  1. Picking up where you left off in the Creating an inbound trigger, after clicking Trigger config, any runtime arguments you previously set for your upstream pipeline will appear. Choose the runtime arguments to pass from the upstream pipeline to the downstream pipeline when this trigger executes.
  2. Click the Plugin config tab to see a list of what will be passed from your upstream pipeline to your downstream pipeline when it is triggered.
  3. Click Configure and Enable Trigger.