Create an ecommerce streaming pipeline


In this tutorial, you create a Dataflow streaming pipeline that transforms ecommerce data from Pub/Sub topics and subscriptions and outputs the data to BigQuery and Bigtable. This tutorial requires Gradle.

The tutorial provides an end-to-end ecommerce sample application that streams data from a webstore to BigQuery and Bigtable. The sample application illustrates common use cases and best practices for implementing streaming data analytics and real-time artificial intelligence (AI). Use this tutorial to learn how to respond dynamically to customer actions in order to analyze and react to events in real-time. This tutorial describes how to store, analyze, and visualize event data to get more insight into customer behavior.

The sample application is available on GitHub. To run this tutorial using Terraform, follow the steps provided with the sample application on GitHub.

Objectives

  • Validate incoming data and apply corrections to it where possible.
  • Analyze clickstream data to keep a count of the number of views per product in a given time period. Store this information in a low-latency store. The application can then use the data to provide number of people who viewed this product messages to customers on the website.
  • Use transaction data to inform inventory ordering:

    • Analyze transaction data to calculate the total number of sales for each item, both by store and globally, for a given period.
    • Analyze inventory data to calculate the incoming inventory for each item.
    • Pass this data to inventory systems on a continuous basis so it can be used for inventory purchasing decisions.
  • Validate incoming data and apply corrections to it where possible. Write any uncorrectable data to a dead-letter queue for additional analysis and processing. Make a metric that represents the percentage of incoming data that gets sent to the dead-letter queue available for monitoring and alerting.

  • Process all incoming data into a standard format and store it in a data warehouse to use for future analysis and visualization.

  • Denormalize transaction data for in-store sales so that it can include information like the latitude and longitude of the store location. Provide the store information through a slowly changing table in BigQuery, using the store ID as a key.

Data

The application processes the following types of data:

  • Clickstream data being sent by online systems to Pub/Sub.
  • Transaction data being sent by on-premises or software as a service (SaaS) systems to Pub/Sub.
  • Stock data being sent by on-premises or SaaS systems to Pub/Sub.

Task patterns

The application contains the following task patterns common to pipelines built with the Apache Beam SDK for Java:

Costs

In this document, you use the following billable components of Google Cloud:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Create a user-managed worker service account for your new pipeline and grant the necessary roles to the service account.

    1. To create the service account, run the gcloud iam service-accounts create command:

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. Grant roles to the service account. Run the following command once for each of the following IAM roles:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Replace SERVICE_ACCOUNT_ROLE with each individual role.

    3. Grant your Google Account a role that lets you create access tokens for the service account:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. If needed, download and install Gradle.

Create the example sources and sinks

This section explains how to create the following:

  • A Cloud Storage bucket to use as a temporary storage location
  • Streaming data sources using Pub/Sub
  • Datasets to load the data into BigQuery
  • A Bigtable instance

Create a Cloud Storage bucket

Begin by creating a Cloud Storage bucket. This bucket is used as a temporary storage location by the Dataflow pipeline.

Use the gcloud storage buckets create command:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Replace the following:

  • BUCKET_NAME: a name for your Cloud Storage bucket that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique.
  • LOCATION: the location for the bucket.

Create Pub/Sub topics and subscriptions

Create four Pub/Sub topics and then create three subscriptions.

To create your topics, run the gcloud pubsub topics create command once for each topic. For information about how to name a subscription, see Guidelines to name a topic or a subscription.

gcloud pubsub topics create TOPIC_NAME

Replace TOPIC_NAME with the following values, running the command four times, once for each topic:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

To create a subscription to your topic, run the gcloud pubsub subscriptions create command once for each subscription:

  1. Create a Clickstream-inbound-sub subscription:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Create a Transactions-inbound-sub subscription:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Create an Inventory-inbound-sub subscription:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

Create BigQuery datasets and table

Create a BigQuery dataset and a partitioned table with the appropriate schema for your Pub/Sub topic.

  1. Use the bq mk command to create the first dataset.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Create the second dataset.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Use the CREATE TABLE SQL statement to create a table with a schema and test data. The test data has one store with an ID value of 1. The slow update side input pattern uses this table.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Create a Bigtable instance and table

Create a Bigtable instance and table. For more information about creating Bigtable instances, see Create an instance.

  1. If needed, run the following command to install the cbt CLI:

    gcloud components install cbt
    
  2. Use the bigtable instances create command to create an instance:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Replace CLUSTER_ZONE with the zone where the cluster runs.

  3. Use the cbt createtable command to create a table:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Use the following command to add a column family to the table:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

Run the pipeline

Use Gradle to run a streaming pipeline. To view the Java code that the pipeline is using, see RetailDataProcessingPipeline.java.

  1. Use the git clone command to clone the GitHub repository:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Switch to the application directory:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. To test the pipeline, in your shell or terminal, run the following command using Gradle:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. To run the pipeline, run the following command using Gradle:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

See the pipeline source code on GitHub.

Create and run Cloud Scheduler jobs

Create and run three Cloud Scheduler jobs, one that publishes clickstream data, one for inventory data, and one for transaction data. This step generates sample data for the pipeline.

  1. To create a Cloud Scheduler job for this tutorial, use the gcloud scheduler jobs create command. This step creates a publisher for clickstream data that publishes one message per minute.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. To start the Cloud Scheduler job, use the gcloud scheduler jobs run command.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Create and run another similar publisher for inventory data that publishes one message every two minutes.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Start the second Cloud Scheduler job.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Create and run a third publisher for transaction data that publishes one message every two minutes.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Start the third Cloud Scheduler job.

    gcloud scheduler jobs run --location=LOCATION transactions
    

View your results

View data written to your BigQuery tables. Check the results in BigQuery by running the following queries. While this pipeline is running, you can see new rows appended to the BigQuery tables every minute.

You might need to wait for the tables to populate with data.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

The easiest way to eliminate billing is to delete the Google Cloud project that you created for the tutorial.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the individual resources

If you want to reuse the project, then delete the resources that you created for the tutorial.

Clean up Google Cloud project resources

  1. To delete the Cloud Scheduler jobs, use the gcloud scheduler jobs delete command.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. To delete the Pub/Sub subscriptions and topics, use the gcloud pubsub subscriptions delete and the gcloud pubsub topics delete commands.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. To delete the BigQuery table, use the bq rm command.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. Delete the BigQuery datasets. The dataset alone does not incur any charges.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. To delete the Bigtable instance, use the cbt deleteinstance command. The bucket alone does not incur any charges.

    cbt deleteinstance aggregate-tables
    
  6. To delete the Cloud Storage bucket, use the gcloud storage rm command. The bucket alone does not incur any charges.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revoke credentials

  1. Revoke the roles that you granted to the user-managed worker service account. Run the following command once for each of the following IAM roles:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

What's next