This page describes how to publish Cloud Data Fusion pipeline events, such as pipeline status, to Pub/Sub topics. It also describes how to create Cloud Run functions that process the Pub/Sub messages and take actions, such as identifying and retrying failed pipelines.
Before you begin
- Create a topic where Pub/Sub can publish Cloud Data Fusion pipeline events.
Required roles
To ensure that the Cloud Data Fusion Service Account has the necessary
permissions to publish pipeline events to a Pub/Sub topic,
ask your administrator to grant the Cloud Data Fusion Service Account the
Pub/Sub Publisher (roles/pubsub.publisher
) IAM role on the project where you create the Pub/Sub topic.
For more information about granting roles, see Manage access to projects, folders, and organizations.
Your administrator might also be able to give the Cloud Data Fusion Service Account the required permissions through custom roles or other predefined roles.
Manage event publishing in a Cloud Data Fusion instance
You can manage event publishing in new and existing Cloud Data Fusion instances using the REST API in versions 6.7.0 and later.
Publish events in a new instance
Create a new instance and include the EventPublishConfig
field. For more
information about required fields for new instances, see the
Instances resource
reference.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Replace the following:
PROJECT_ID
: the Google Cloud project IDLOCATION
: the location of your projectINSTANCE_ID
: the ID of your Cloud Data Fusion instanceVERSION_NUMBER
: The version of Cloud Data Fusion where you create the instance–for example,6.10.1
TOPIC_ID
: the ID of the Pub/Sub topic
Enable event publishing in an existing Cloud Data Fusion instance
Update the
EventPublishConfig
field in an existing Cloud Data Fusion instance:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Replace the following:
PROJECT_ID
: the Google Cloud project IDLOCATION
: the location of your projectINSTANCE_ID
: the ID of your Cloud Data Fusion instanceTOPIC_ID
: the ID of the Pub/Sub topic
Remove event publishing from an instance
To remove event publishing from an instance, update the
event publishing enabled
value to false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Create functions to read Pub/Sub messages
Cloud Run functions can read Pub/Sub messages and act on them, such as retrying failed pipelines. To make a Cloud Run functions, do the following:
In the Google Cloud console, go to the Cloud Run functions page.
Click Create function.
Enter a function name and region.
In the Trigger type field, select Cloud Pub/Sub.
Enter the Pub/Sub topic ID.
Click Next.
Add functions to read the Pub/Sub messages and take other actions. For example, you can add functions for the following use cases:
- Send alerts for pipeline failures.
- Send alerts for KPIs, such as record count or run information.
- Restart a failed pipeline that hasn't been rerun.
For Cloud Run function examples, see the use case section.
Click Deploy. For more information, see Deploy a Cloud Run function.
Use case: Document pipeline status and retry failed pipelines
The following example Cloud Run functions read Pub/Sub messages about the pipeline run status, and then retry the failed pipelines in Cloud Data Fusion.
These example functions refer to the following Google Cloud components:
- Google Cloud project: the project where Cloud Run functions and Pub/Sub topics are created
- Pub/Sub topic: the Pub/Sub topic linked to your Cloud Data Fusion instance
- Cloud Data Fusion instance: the Cloud Data Fusion instance where you design and execute pipelines
- BigQuery table: the BigQuery table that captures the pipeline status and the run and rerun details
- Cloud Run function: the Cloud Run function where you deploy the code that retries failed pipelines
The following Cloud Run function example reads the Pub/Sub messages about Cloud Data Fusion status events.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
The following example function creates and saves a BigQuery table, and queries the pipeline run details.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
The following example function checks for pipelines that have failed and whether they were rerun in the last hour.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
If the failed pipeline hasn't run recently, the following example function reruns the failed pipeline.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
What's next
- Learn how to write Cloud Run functions.