This tutorial shows how to run a large language model (LLM) in a streaming Dataflow pipeline by using the Apache Beam RunInference API.
For more information about the RunInference API, see About Beam ML in the Apache Beam documentation.
The example code is available on GitHub.
Objectives
- Create Pub/Sub topics and subscriptions for the model's input and responses.
- Load the model into Cloud Storage by using a Vertex AI custom job.
- Run the pipeline.
- Ask the model a question and get a response.
Costs
In this document, you use the following billable components of Google Cloud:
To generate a cost estimate based on your projected usage,
use the pricing calculator.
When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.
Before you begin
Run this tutorial on a machine that has at least 5 GB of free disk space to install the dependencies.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/aiplatform.user
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Replace the following:
PROJECT_ID
: your project ID.PROJECT_NUMBER
: your project number. To find your project number, use thegcloud projects describe
command.SERVICE_ACCOUNT_ROLE
: each individual role.
- Copy the Google Cloud project ID. You need this value later in this tutorial.
Create the Google Cloud resources
This section explains how to create the following resources:
- A Cloud Storage bucket to use as a temporary storage location
- A Pub/Sub topic for the model's prompts
- A Pub/Sub topic and subscription for the model's responses
Create a Cloud Storage bucket
Create a Cloud Storage bucket by using the gcloud CLI. This bucket is used as a temporary storage location by the Dataflow pipeline.
To create the bucket, use the
gcloud storage buckets create
command:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Replace the following:
- BUCKET_NAME: a name for your Cloud Storage bucket that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique.
- LOCATION: the location for the bucket.
Copy the bucket name. You need this value later in this tutorial.
Create Pub/Sub topics and subscriptions
Create two Pub/Sub topics and one subscription. One topic is for the input prompts that you send to the model. The other topic and its attached subscription is for the model's responses.
To create the topics, run the
gcloud pubsub topics create
command twice, once for each topic:gcloud pubsub topics create PROMPTS_TOPIC_ID gcloud pubsub topics create RESPONSES_TOPIC_ID
Replace the following:
- PROMPTS_TOPIC_ID: the topic ID for the input
prompts to send to the model, such as
prompts
- RESPONSES_TOPIC_ID: the topic ID for the model's
responses, such as
responses
- PROMPTS_TOPIC_ID: the topic ID for the input
prompts to send to the model, such as
To create the subscription and attach it to your responses topic, use the
gcloud pubsub subscriptions create
command:gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
Replace RESPONSES_SUBSCRIPTION_ID with the subscription ID for the model's responses, such as
responses-subscription
.
Copy the topic IDs and the subscription ID. You need these values later in this tutorial.
Prepare your environment
Download the code samples and then set up your environment to run the tutorial.
The code samples in the python-docs-samples GitHub repository provide the code that you need to run this pipeline. When you are ready to build your own pipeline, you can use this sample code as a template.
You create an isolated Python virtual environment to run your pipeline project by using venv. A virtual environment lets you isolate the dependencies of one project from the dependencies of other projects. For more information about how to install Python and create a virtual environment, see Setting up a Python development environment.
Use the
git clone
command to clone the GitHub repository:git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
Navigate to the
run-inference
directory:cd python-docs-samples/dataflow/run-inference
If you're using a command prompt, check that you have Python 3 and
pip
running in your system:python --version python -m pip --version
If required, install Python 3.
If you're using Cloud Shell, you can skip this step because Cloud Shell already has Python installed.
Create a Python virtual environment:
python -m venv /tmp/env source /tmp/env/bin/activate
Install the dependencies:
pip install -r requirements.txt --no-cache-dir
Model loading code sample
The model loading code in this tutorial launches a Vertex AI
custom job that loads the model's state_dict
object into
Cloud Storage.
The starter file looks like the following:
Pipeline code sample
The pipeline code in this tutorial deploys a Dataflow pipeline that does the following things:
- Reads a prompt from Pub/Sub and encodes the text into token tensors.
- Runs the
RunInference
transform. - Decodes the output token tensors into text and writes the response to Pub/Sub.
The starter file looks like the following:
Load the model
LLMs can be very large models. Larger models that are trained with more parameters generally give better results. However, larger models require a bigger machine and more memory to run. Larger models can also be slower to run on CPUs.
Before you run a PyTorch model on Dataflow, you need to load the
model's state_dict
object. A model's
state_dict
object
stores the weights for the model.
In a Dataflow pipeline that uses the Apache Beam
RunInference
transform, the model's state_dict
object must be loaded to
Cloud Storage. The machine that you use to load thestate_dict
object
to Cloud Storage needs to have enough memory to load the model. The
machine also needs a fast internet connection to download the weights and to
upload them to Cloud Storage.
The following table shows the number of parameters for each model and the minimum memory that's needed to load each model.
Model | Parameters | Memory needed |
---|---|---|
google/flan-t5-small |
80 million | > 320 MB |
google/flan-t5-base |
250 million | > 1 GB |
google/flan-t5-large |
780 million | > 3.2 GB |
google/flan-t5-xl |
3 billion | > 12 GB |
google/flan-t5-xxl |
11 billion | > 44 GB |
google/flan-ul2 |
20 billion | > 80 GB |
Although you can load a smaller model locally, this tutorial shows how to launch a Vertex AI custom job that loads the model with an appropriately sized VM.
Because LLMs can be so large, the example in this tutorial saves the
state_dict
object as float16
format instead of the default float32
format.
With this configuration, each parameter uses 16 bits instead of 32 bits, making
the state_dict
object half the size. A smaller size minimizes the time that's
needed to load the model. However, converting the format means that the VM has
to fit both the model and the state_dict
object into memory.
The following table shows the minimum requirements to load a model after the
state_dict
object is saved as float16
format. The table also shows the
suggested machine types to load a model by using Vertex AI. The
minimum (and default) disk size for Vertex AI is 100 GB, but some
models might require a larger disk.
Model name | Memory needed | Machine type | VM memory | VM disk |
---|---|---|---|---|
google/flan-t5-small |
> 480 MB | e2-standard-4 |
16 GB | 100 GB |
google/flan-t5-base |
> 1.5 GB | e2-standard-4 |
16 GB | 100 GB |
google/flan-t5-large |
> 4.8 GB | e2-standard-4 |
16 GB | 100 GB |
google/flan-t5-xl |
> 18 GB | e2-highmem-4 |
32 GB | 100 GB |
google/flan-t5-xxl |
> 66 GB | e2-highmem-16 |
128 GB | 100 GB |
google/flan-ul2 |
> 120 GB | e2-highmem-16 |
128 GB | 150 GB |
Load the model's state_dict
object into Cloud Storage by using a
Vertex AI custom job:
python download_model.py vertex \
--model-name="MODEL_NAME" \
--state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
--job-name="Load MODEL_NAME" \
--project="PROJECT_ID" \
--bucket="BUCKET_NAME" \
--location="LOCATION" \
--machine-type="VERTEX_AI_MACHINE_TYPE" \
--disk-size-gb="DISK_SIZE_GB"
Replace the following:
- MODEL_NAME: the name of the model, such as
google/flan-t5-xl
. - VERTEX_AI_MACHINE_TYPE: the type of machine to run
the Vertex AI custom job on, such as
e2-highmem-4
. - DISK_SIZE_GB: the disk size for the VM, in GB. The minimum size is 100 GB.
Depending on the size of the model, it might take a few minutes to load the model. To view the status, go to the Vertex AI Custom jobs page.
Run the pipeline
After you load the model, you run the Dataflow pipeline. To run the pipeline, both the model and the memory used by each worker must fit into memory.
The following table shows the recommended machine types to run an inference pipeline.
Model name | Machine type | VM memory |
---|---|---|
google/flan-t5-small |
n2-highmem-2 |
16 GB |
google/flan-t5-base |
n2-highmem-2 |
16 GB |
google/flan-t5-large |
n2-highmem-4 |
32 GB |
google/flan-t5-xl |
n2-highmem-4 |
32 GB |
google/flan-t5-xxl |
n2-highmem-8 |
64 GB |
google/flan-ul2 |
n2-highmem-16 |
128 GB |
Run the pipeline:
python main.py \
--messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
--responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
--model-name="MODEL_NAME" \
--state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
--runner="DataflowRunner" \
--project="PROJECT_ID" \
--temp_location="gs://BUCKET_NAME/temp" \
--region="REGION" \
--machine_type="DATAFLOW_MACHINE_TYPE" \
--requirements_file="requirements.txt" \
--requirements_cache="skip" \
--experiments="use_sibling_sdk_workers" \
--experiments="no_use_multiple_sdk_containers"
Replace the following:
- PROJECT_ID: the project ID
- PROMPTS_TOPIC_ID: the topic ID for the input prompts to send to the model
- RESPONSES_TOPIC_ID: the topic ID for the model's responses
- MODEL_NAME: the name of the model, such as
google/flan-t5-xl
- BUCKET_NAME: the name of the bucket
- REGION: the region to deploy the
job in, such as
us-central1
- DATAFLOW_MACHINE_TYPE: the VM to run the pipeline
on, such as
n2-highmem-4
To ensure that the model is loaded only once per worker and doesn't run out of
memory, you configure workers to use a single process by setting the pipeline
option --experiments=no_use_multiple_sdk_containers
. You don't have to limit
the number of threads because the RunInference
transform shares the same model
with multiple threads.
The pipeline in this example runs with CPUs. For a larger model, more time is required to process each request. You can enable GPUs if you need faster responses.
To view the status of the pipeline, go to the Dataflow Jobs page.
Ask the model a question
After the pipeline starts running, you provide a prompt to the model and receive a response.
Send your prompt by publishing a message to Pub/Sub. Use the
gcloud pubsub topics publish
command:gcloud pubsub topics publish PROMPTS_TOPIC_ID \ --message="PROMPT_TEXT"
Replace
PROMPT_TEXT
with a string that contains the prompt that you want to provide. Surround the prompt with quotation marks.Use your own prompt, or try one of the following examples:
Translate to Spanish: My name is Luka
Complete this sentence: Once upon a time, there was a
Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
To get the response, use the
gcloud pubsub subscriptions pull
command.Depending on the size of the model, it might take a few minutes for the model to generate a response. Larger models take longer to deploy and to generate a response.
gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
Replace
RESPONSES_SUBSCRIPTION_ID
with the subscription ID for the model's responses.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Delete the project
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Delete individual resources
-
Exit the Python virtual environment:
deactivate
-
Stop the pipeline:
-
List the job IDs for the Dataflow jobs that are running, and then note the job ID for the tutorial's job:
gcloud dataflow jobs list --region=REGION --status=active
-
Cancel the job:
gcloud dataflow jobs cancel JOB_ID --region=REGION
-
-
Delete the bucket and anything inside of it:
gcloud storage rm gs://BUCKET_NAME --recursive
-
Delete the topics and the subscription:
gcloud pubsub topics delete PROMPTS_TOPIC_ID gcloud pubsub topics delete RESPONSES_TOPIC_ID gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
-
Revoke the roles that you granted to the Compute Engine default service account. Run the following command once for each of the following IAM roles:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/aiplatform.user
gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
Optional: Revoke roles from your Google Account.
gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
What's next
- Explore Dataflow ML.
- Learn more about the RunInference API.
- Get in-depth information about using ML with Apache Beam in the Apache Beam AI/ML pipelines documentation.
- Work through the notebook Use RunInference for Generative AI.
- Explore reference architectures, diagrams, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.